This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c6521a0a62ba302cd821f1fecb12191e6e913e7
Author: Zhu Zhu <[email protected]>
AuthorDate: Wed Dec 23 16:11:36 2020 +0800

    [FLINK-20439][runtime] Rework scheduleOrUpdateConsumers in ExecutionGraph 
and its sub-components
    
    notifyPartitionDataAvailable() is introduced to replace 
scheduleOrUpdateConsumers() in the RPC code path.
    Execution#scheduleOrUpdateConsumers is renamed to #updatePartitionConsumers 
to be aligned with what id does.
---
 .../flink/runtime/executiongraph/Execution.java    |  21 +--
 .../runtime/executiongraph/ExecutionGraph.java     |  23 +--
 .../runtime/executiongraph/ExecutionVertex.java    |  31 +---
 .../flink/runtime/scheduler/SchedulerBase.java     |   7 +-
 .../ExecutionGraphVariousFailuesTest.java          |   8 +-
 .../utils/SimpleAckingTaskManagerGateway.java      |   9 ++
 .../scheduler/ScheduleOrUpdateConsumersTest.java   | 174 --------------------
 .../scheduler/UpdatePartitionConsumersTest.java    | 178 +++++++++++++++++++++
 8 files changed, 211 insertions(+), 240 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 7792794..98d428c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -70,7 +70,6 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -671,17 +670,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                return releaseFuture;
        }
 
-       void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
-               assertRunningInJobMasterMainThread();
-
-               final HashSet<ExecutionVertex> consumerDeduplicator = new 
HashSet<>();
-               scheduleOrUpdateConsumers(allConsumers, consumerDeduplicator);
-       }
-
-       private void scheduleOrUpdateConsumers(
-                       final List<List<ExecutionEdge>> allConsumers,
-                       final HashSet<ExecutionVertex> consumerDeduplicator) {
-
+       private void updatePartitionConsumers(final List<List<ExecutionEdge>> 
allConsumers) {
                if (allConsumers.size() == 0) {
                        return;
                }
@@ -888,7 +877,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                if (transitionState(current, FINISHED)) {
                                        try {
-                                               
finishPartitionsAndScheduleOrUpdateConsumers();
+                                               
finishPartitionsAndUpdateConsumers();
                                                
updateAccumulatorsAndMetrics(userAccumulators, metrics);
                                                releaseAssignedResource(null);
                                                
vertex.getExecutionGraph().deregisterExecution(this);
@@ -919,20 +908,18 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       private void finishPartitionsAndScheduleOrUpdateConsumers() {
+       private void finishPartitionsAndUpdateConsumers() {
                final List<IntermediateResultPartition> newlyFinishedResults = 
getVertex().finishAllBlockingPartitions();
                if (newlyFinishedResults.isEmpty()) {
                        return;
                }
 
-               final HashSet<ExecutionVertex> consumerDeduplicator = new 
HashSet<>();
-
                for (IntermediateResultPartition finishedPartition : 
newlyFinishedResults) {
                        final IntermediateResultPartition[] 
allPartitionsOfNewlyFinishedResults =
                                        
finishedPartition.getIntermediateResult().getPartitions();
 
                        for (IntermediateResultPartition partition : 
allPartitionsOfNewlyFinishedResults) {
-                               
scheduleOrUpdateConsumers(partition.getConsumers(), consumerDeduplicator);
+                               
updatePartitionConsumers(partition.getConsumers());
                        }
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e1daefc..4e3b54a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1320,27 +1320,20 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        }
 
        /**
-        * Schedule or updates consumers of the given result partition.
+        * Mark the data of a result partition to be available. Note that only 
PIPELINED partitions are accepted
+        * because it is for the case that a TM side PIPELINED result partition 
has data produced and notifies JM.
         *
-        * @param partitionId specifying the result partition whose consumer 
shall be scheduled or updated
-        * @throws ExecutionGraphException if the schedule or update consumers 
operation could not be executed
+        * @param partitionId specifying the result partition whose data have 
become available
         */
-       public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) 
throws ExecutionGraphException {
-
+       public void notifyPartitionDataAvailable(ResultPartitionID partitionId) 
{
                assertRunningInJobMasterMainThread();
 
                final Execution execution = 
currentExecutions.get(partitionId.getProducerId());
 
-               if (execution == null) {
-                       throw new ExecutionGraphException("Cannot find 
execution for execution Id " +
-                               partitionId.getPartitionId() + '.');
-               }
-               else if (execution.getVertex() == null){
-                       throw new ExecutionGraphException("Execution with 
execution Id " +
-                               partitionId.getPartitionId() + " has no vertex 
assigned.");
-               } else {
-                       
execution.getVertex().scheduleOrUpdateConsumers(partitionId);
-               }
+               checkState(execution != null, "Cannot find execution for 
execution Id " +
+                       partitionId.getPartitionId() + ".");
+
+               execution.getVertex().notifyPartitionDataAvailable(partitionId);
        }
 
        public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 55727f4..c70da77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -59,6 +59,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
@@ -705,34 +707,15 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                currentExecution.markFailed(t);
        }
 
-       /**
-        * Schedules or updates the consumer tasks of the result partition with 
the given ID.
-        */
-       void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
-
-               final Execution execution = currentExecution;
-
-               // Abort this request if there was a concurrent reset
-               if 
(!partitionId.getProducerId().equals(execution.getAttemptId())) {
-                       return;
-               }
+       void notifyPartitionDataAvailable(ResultPartitionID partitionId) {
+               
checkArgument(partitionId.getProducerId().equals(currentExecution.getAttemptId()));
 
                final IntermediateResultPartition partition = 
resultPartitions.get(partitionId.getPartitionId());
-
-               if (partition == null) {
-                       throw new IllegalStateException("Unknown partition " + 
partitionId + ".");
-               }
+               checkState(partition != null, "Unknown partition " + 
partitionId + ".");
+               checkState(partition.getResultType().isPipelined(), "partition 
data available notification is " +
+                       "only valid for pipelined partitions.");
 
                partition.markDataProduced();
-
-               if 
(partition.getIntermediateResult().getResultType().isPipelined()) {
-                       // Schedule or update receivers of this partition
-                       
execution.scheduleOrUpdateConsumers(partition.getConsumers());
-               }
-               else {
-                       throw new 
IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
-                                       "pipelined partitions.");
-               }
        }
 
        void cachePartitionInfo(PartitionInfo partitionInfo){
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 93da3f4..a5738cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -689,11 +688,7 @@ public abstract class SchedulerBase implements SchedulerNG 
{
        public final void scheduleOrUpdateConsumers(final ResultPartitionID 
partitionId) {
                mainThreadExecutor.assertRunningInMainThread();
 
-               try {
-                       executionGraph.scheduleOrUpdateConsumers(partitionId);
-               } catch (ExecutionGraphException e) {
-                       throw new RuntimeException(e);
-               }
+               executionGraph.notifyPartitionDataAvailable(partitionId);
 
                scheduleOrUpdateConsumersInternal(partitionId.getPartitionId());
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
index b20c0ce..493415b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -60,10 +60,10 @@ public class ExecutionGraphVariousFailuesTest extends 
TestLogger {
 
                try {
                        scheduler.scheduleOrUpdateConsumers(resultPartitionId);
-                       fail("Expected ExecutionGraphException.");
-               } catch (RuntimeException e) {
+                       fail("Error expected.");
+               } catch (IllegalStateException e) {
                        // we've expected this exception to occur
-                       assertThat(e.getCause(), 
instanceOf(ExecutionGraphException.class));
+                       assertThat(e.getMessage(), containsString("Cannot find 
execution for execution Id"));
                }
 
                assertEquals(JobStatus.RUNNING, eg.getState());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 59aee81..d23c5ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskBackPressureResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriConsumer;
 
 import java.util.Collection;
 import java.util.Set;
@@ -66,6 +67,9 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
                checkpointOptions,
                advanceToEndOfEventTime) -> { };
 
+       private TriConsumer<ExecutionAttemptID, Iterable<PartitionInfo>, Time> 
updatePartitionsConsumer =
+               (ignore1, ignore2, ignore3) -> { };
+
        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> 
submitConsumer) {
                this.submitConsumer = submitConsumer;
        }
@@ -86,6 +90,10 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
                this.checkpointConsumer = checkpointConsumer;
        }
 
+       public void setUpdatePartitionsConsumer(TriConsumer<ExecutionAttemptID, 
Iterable<PartitionInfo>, Time> updatePartitionsConsumer) {
+               this.updatePartitionsConsumer = updatePartitionsConsumer;
+       }
+
        @Override
        public String getAddress() {
                return address;
@@ -113,6 +121,7 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
 
        @Override
        public CompletableFuture<Acknowledge> 
updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> 
partitionInfos, Time timeout) {
+               updatePartitionsConsumer.accept(executionAttemptID, 
partitionInfos, timeout);
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
deleted file mode 100644
index ece1c80..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.List;
-
-import static 
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
-
-/**
- * Tests for the lazy scheduling/updating of consumers depending on the
- * producers result.
- */
-public class ScheduleOrUpdateConsumersTest extends TestLogger {
-
-       private static final int NUMBER_OF_TMS = 2;
-       private static final int NUMBER_OF_SLOTS_PER_TM = 2;
-       private static final int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
-
-       @ClassRule
-       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
-               new MiniClusterResourceConfiguration.Builder()
-                       .setConfiguration(getFlinkConfiguration())
-                       .setNumberTaskManagers(NUMBER_OF_TMS)
-                       .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM)
-                       .build());
-
-       private static Configuration getFlinkConfiguration() {
-               final Configuration config = new Configuration();
-               config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-
-               return config;
-       }
-
-       /**
-        * Tests notifications of multiple receivers when a task produces both 
a pipelined and blocking
-        * result.
-        *
-        * <pre>
-        *                             +----------+
-        *            +-- pipelined -> | Receiver |
-        * +--------+ |                +----------+
-        * | Sender |-|
-        * +--------+ |                +----------+
-        *            +-- blocking --> | Receiver |
-        *                             +----------+
-        * </pre>
-        *
-        * <p>The pipelined receiver gets deployed after the first buffer is 
available and the blocking
-        * one after all subtasks are finished.
-        */
-       @Test
-       public void testMixedPipelinedAndBlockingResults() throws Exception {
-               final JobVertex sender = new JobVertex("Sender");
-               
sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
-               
sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY,
 PARALLELISM);
-               sender.setParallelism(PARALLELISM);
-
-               final JobVertex pipelinedReceiver = new JobVertex("Pipelined 
Receiver");
-               
pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
-               pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
-               pipelinedReceiver.setParallelism(PARALLELISM);
-
-               pipelinedReceiver.connectNewDataSetAsInput(
-                               sender,
-                               DistributionPattern.ALL_TO_ALL,
-                               ResultPartitionType.PIPELINED);
-
-               final JobVertex blockingReceiver = new JobVertex("Blocking 
Receiver");
-               
blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
-               blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, 
PARALLELISM);
-               blockingReceiver.setParallelism(PARALLELISM);
-
-               blockingReceiver.connectNewDataSetAsInput(sender,
-                               DistributionPattern.ALL_TO_ALL,
-                               ResultPartitionType.BLOCKING);
-
-               SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-
-               sender.setSlotSharingGroup(slotSharingGroup);
-               pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
-               blockingReceiver.setSlotSharingGroup(slotSharingGroup);
-
-               final JobGraph jobGraph = new JobGraph(
-                               "Mixed pipelined and blocking result",
-                               sender,
-                               pipelinedReceiver,
-                               blockingReceiver);
-
-               
MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
-       }
-
-       // 
---------------------------------------------------------------------------------------------
-
-       /**
-        * Invokable which writes a configurable number of events to a pipelined
-        * and blocking partition alternatingly.
-        */
-       public static class BinaryRoundRobinSubtaskIndexSender extends 
AbstractInvokable {
-
-               static final String CONFIG_KEY = "number-of-times-to-send";
-
-               public BinaryRoundRobinSubtaskIndexSender(Environment 
environment) {
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() throws Exception {
-                       List<RecordWriter<IntValue>> writers = 
Lists.newArrayListWithCapacity(2);
-
-                       // The order of intermediate result creation in the job 
graph specifies which produced
-                       // result partition is pipelined/blocking.
-                       final RecordWriter<IntValue> pipelinedWriter = new 
RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
-                       final RecordWriter<IntValue> blockingWriter = new 
RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(1));
-                       writers.add(pipelinedWriter);
-                       writers.add(blockingWriter);
-
-                       final int numberOfTimesToSend = 
getTaskConfiguration().getInteger(CONFIG_KEY, 0);
-
-                       final IntValue subtaskIndex = new IntValue(
-                                       
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-
-                       // Produce the first intermediate result and then the 
second in a serial fashion.
-                       for (RecordWriter<IntValue> writer : writers) {
-                               try {
-                                       for (int i = 0; i < 
numberOfTimesToSend; i++) {
-                                               writer.emit(subtaskIndex);
-                                       }
-                                       writer.flushAll();
-                               }
-                               finally {
-                                       writer.close();
-                               }
-                       }
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
new file mode 100644
index 0000000..13ed1c5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the updating of consumers depending on the
+ * producers result.
+ */
+public class UpdatePartitionConsumersTest extends TestLogger {
+
+       private static final long TIMEOUT = 5000L;
+
+       private JobGraph jobGraph;
+       private JobVertex v1;
+       private JobVertex v2;
+       private JobVertex v3;
+       private JobVertex v4;
+
+       @Before
+       public void setUp() {
+               buildJobGraphWithBlockingEdgeWithinRegion();
+       }
+
+       /**
+        * Build a graph which allows consumer vertex v4 to be deployed before 
its BLOCKING input v3 finishes.
+        *
+        * <pre>
+        *                         +----+
+        *        +-- pipelined -> | v2 | -- pipelined -+
+        * +----+ |                +----+               |    +----+
+        * | v1 |-|                                     | -> | v4 |
+        * +----+ |                +----+               |    +----+
+        *        +-- pipelined -> | v3 | -- blocking --+
+        *                         +----+
+        * </pre>
+        */
+       private void buildJobGraphWithBlockingEdgeWithinRegion() {
+               v1 = new JobVertex("v1");
+               v1.setInvokableClass(AbstractInvokable.class);
+               v1.setParallelism(1);
+
+               v2 = new JobVertex("v2");
+               v2.setInvokableClass(AbstractInvokable.class);
+               v2.setParallelism(1);
+
+               v3 = new JobVertex("v3");
+               v3.setInvokableClass(AbstractInvokable.class);
+               v3.setParallelism(1);
+
+               v4 = new JobVertex("v4");
+               v4.setInvokableClass(AbstractInvokable.class);
+               v4.setParallelism(1);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+               jobGraph = new JobGraph(v1, v2, v3, v4);
+       }
+
+       /**
+        * Test BLOCKING partition information are properly updated to 
consumers when its producer finishes.
+        */
+       @Test
+       public void testUpdatePartitionConsumers() throws Exception {
+               final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+               final SchedulerBase scheduler = SchedulerTestingUtils
+                       .newSchedulerBuilder(jobGraph)
+                       .setExecutionSlotAllocatorFactory(new 
TestExecutionSlotAllocatorFactory(taskManagerGateway))
+                       .build();
+               
scheduler.initialize(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               final ExecutionVertex ev1 = scheduler.getExecutionVertex(new 
ExecutionVertexID(v1.getID(), 0));
+               final ExecutionVertex ev2 = scheduler.getExecutionVertex(new 
ExecutionVertexID(v2.getID(), 0));
+               final ExecutionVertex ev3 = scheduler.getExecutionVertex(new 
ExecutionVertexID(v3.getID(), 0));
+               final ExecutionVertex ev4 = scheduler.getExecutionVertex(new 
ExecutionVertexID(v4.getID(), 0));
+
+               final CompletableFuture<TaskDeploymentDescriptor> ev4TddFuture 
= new CompletableFuture<>();
+               taskManagerGateway.setSubmitConsumer(tdd -> {
+                       if 
(tdd.getExecutionAttemptId().equals(ev4.getCurrentExecutionAttempt().getAttemptId()))
 {
+                               ev4TddFuture.complete(tdd);
+                       }
+               });
+
+               scheduler.startScheduling();
+
+               assertThat(ev1.getExecutionState(), 
is(ExecutionState.DEPLOYING));
+               assertThat(ev2.getExecutionState(), 
is(ExecutionState.DEPLOYING));
+               assertThat(ev3.getExecutionState(), 
is(ExecutionState.DEPLOYING));
+               assertThat(ev4.getExecutionState(), 
is(ExecutionState.DEPLOYING));
+
+               updateState(scheduler, ev1, ExecutionState.RUNNING);
+               updateState(scheduler, ev2, ExecutionState.RUNNING);
+               updateState(scheduler, ev3, ExecutionState.RUNNING);
+               updateState(scheduler, ev4, ExecutionState.RUNNING);
+
+               final InputGateDeploymentDescriptor ev4Igdd2 = 
ev4TddFuture.get(TIMEOUT, TimeUnit.MILLISECONDS).getInputGates().get(1);
+               assertThat(ev4Igdd2.getShuffleDescriptors()[0], 
instanceOf(UnknownShuffleDescriptor.class));
+
+               final CompletableFuture<Void> updatePartitionFuture = new 
CompletableFuture<>();
+               taskManagerGateway.setUpdatePartitionsConsumer((attemptId, 
partitionInfos, time) -> {
+                       assertThat(attemptId, 
equalTo(ev4.getCurrentExecutionAttempt().getAttemptId()));
+                       final List<PartitionInfo> partitionInfoList = 
IterableUtils.toStream(partitionInfos).collect(Collectors.toList());
+                       assertThat(partitionInfoList, hasSize(1));
+                       final PartitionInfo partitionInfo = 
partitionInfoList.get(0);
+                       assertThat(partitionInfo.getIntermediateDataSetID(), 
equalTo(v3.getProducedDataSets().get(0).getId()));
+                       assertThat(partitionInfo.getShuffleDescriptor(), 
instanceOf(NettyShuffleDescriptor.class));
+                       updatePartitionFuture.complete(null);
+               });
+
+               updateState(scheduler, ev1, ExecutionState.FINISHED);
+               updateState(scheduler, ev3, ExecutionState.FINISHED);
+
+               updatePartitionFuture.get(TIMEOUT, TimeUnit.MILLISECONDS);
+       }
+
+       private void updateState(SchedulerBase scheduler, ExecutionVertex 
vertex, ExecutionState state) {
+               scheduler.updateTaskExecutionState(
+                       new TaskExecutionState(
+                               jobGraph.getJobID(),
+                               
vertex.getCurrentExecutionAttempt().getAttemptId(),
+                               state));
+       }
+}

Reply via email to