This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d099b79fddc5e254884e44f2167c625744079a4
Author: Zhu Zhu <[email protected]>
AuthorDate: Tue Jul 6 11:34:53 2021 +0800

    [FLINK-22677][runtime] DefaultScheduler supports async registration of 
produced partitions
    
    This closes #16383.
---
 .../flink/runtime/executiongraph/Execution.java    |  50 +++----
 .../flink/runtime/scheduler/DefaultScheduler.java  |  91 +++++++++----
 .../runtime/scheduler/DefaultSchedulerFactory.java |   3 +-
 .../flink/runtime/shuffle/ShuffleMaster.java       |   3 -
 .../runtime/executiongraph/ExecutionTest.java      |  56 --------
 .../runtime/scheduler/DefaultSchedulerTest.java    | 145 ++++++++++++++++++++-
 .../runtime/scheduler/SchedulerTestingUtils.java   |   3 +-
 .../TestExecutionVertexOperationsDecorator.java    |  14 ++
 .../runtime/shuffle/TestingShuffleMaster.java      | 144 ++++++++++++++++++++
 9 files changed, 388 insertions(+), 121 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3264c0d..b047229 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -48,7 +48,6 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -404,7 +403,7 @@ public class Execution
     //  Actions
     // 
--------------------------------------------------------------------------------------------
 
-    public CompletableFuture<Execution> registerProducedPartitions(
+    public CompletableFuture<Void> registerProducedPartitions(
             TaskManagerLocation location, boolean 
notifyPartitionDataAvailable) {
 
         assertRunningInJobMasterMainThread();
@@ -415,34 +414,27 @@ public class Execution
                 
vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor(),
                 producedPartitionsCache -> {
                     producedPartitions = producedPartitionsCache;
-                    startTrackingPartitions(
-                            location.getResourceID(), 
producedPartitionsCache.values());
-                    return this;
+
+                    if (getState() == SCHEDULED) {
+                        startTrackingPartitions(
+                                location.getResourceID(), 
producedPartitionsCache.values());
+                    } else {
+                        LOG.info(
+                                "Discarding late registered partitions for {} 
task {}.",
+                                getState(),
+                                attemptId);
+                        for (ResultPartitionDeploymentDescriptor desc :
+                                producedPartitionsCache.values()) {
+                            getVertex()
+                                    .getExecutionGraphAccessor()
+                                    .getShuffleMaster()
+                                    
.releasePartitionExternally(desc.getShuffleDescriptor());
+                        }
+                    }
+                    return null;
                 });
     }
 
-    /**
-     * Register producedPartitions to {@link ShuffleMaster}
-     *
-     * <p>HACK: Please notice that this method simulates asynchronous 
registration in a synchronous
-     * way by making sure the returned {@link CompletableFuture} from {@link
-     * ShuffleMaster#registerPartitionWithProducer} is completed immediately.
-     *
-     * <p>{@link Execution#producedPartitions} are registered through an 
asynchronous interface
-     * {@link ShuffleMaster#registerPartitionWithProducer} to {@link 
ShuffleMaster}, however they
-     * are not always accessed through callbacks. So, it is possible that 
{@link
-     * Execution#producedPartitions} have not been available yet when accessed 
(in {@link
-     * Execution#deploy} for example).
-     *
-     * <p>Since the only implementation of {@link ShuffleMaster} is {@link 
NettyShuffleMaster},
-     * which indeed registers producedPartition in a synchronous way, this 
method enforces
-     * synchronous registration under an asynchronous interface for now.
-     *
-     * <p>TODO: If asynchronous registration is needed in the future, use 
callbacks to access {@link
-     * Execution#producedPartitions}.
-     *
-     * @return completed future of partition deployment descriptors.
-     */
     @VisibleForTesting
     static CompletableFuture<
                     Map<IntermediateResultPartitionID, 
ResultPartitionDeploymentDescriptor>>
@@ -470,10 +462,6 @@ public class Execution
                             .getShuffleMaster()
                             
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
 
-            // temporary hack; the scheduler does not handle incomplete 
futures properly
-            Preconditions.checkState(
-                    shuffleDescriptorFuture.isDone(), "ShuffleDescriptor 
future is incomplete.");
-
             CompletableFuture<ResultPartitionDeploymentDescriptor> 
partitionRegistration =
                     shuffleDescriptorFuture.thenApply(
                             shuffleDescriptor ->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 4ab7dbd..65281f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -100,6 +101,8 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     private final ShuffleMaster<?> shuffleMaster;
 
+    private final Time rpcTimeout;
+
     DefaultScheduler(
             final Logger log,
             final JobGraph jobGraph,
@@ -120,7 +123,8 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
             final ComponentMainThreadExecutor mainThreadExecutor,
             final JobStatusListener jobStatusListener,
             final ExecutionGraphFactory executionGraphFactory,
-            final ShuffleMaster<?> shuffleMaster)
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout)
             throws Exception {
 
         super(
@@ -143,6 +147,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         this.userCodeLoader = checkNotNull(userCodeLoader);
         this.executionVertexOperations = 
checkNotNull(executionVertexOperations);
         this.shuffleMaster = checkNotNull(shuffleMaster);
+        this.rpcTimeout = checkNotNull(rpcTimeout);
 
         final FailoverStrategy failoverStrategy =
                 failoverStrategyFactory.create(
@@ -429,21 +434,33 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> 
deploymentHandles) {
         FutureUtils.assertNoException(
-                
assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
+                
assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
+                        .handle(deployAll(deploymentHandles)));
     }
 
-    private CompletableFuture<Void> assignAllResources(
+    private CompletableFuture<Void> 
assignAllResourcesAndRegisterProducedPartitions(
             final List<DeploymentHandle> deploymentHandles) {
-        final List<CompletableFuture<Void>> slotAssignedFutures = new 
ArrayList<>();
+        final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
         for (DeploymentHandle deploymentHandle : deploymentHandles) {
-            final CompletableFuture<Void> slotAssigned =
+            final CompletableFuture<Void> resultFuture =
                     deploymentHandle
                             .getSlotExecutionVertexAssignment()
                             .getLogicalSlotFuture()
-                            
.handle(assignResourceOrHandleError(deploymentHandle));
-            slotAssignedFutures.add(slotAssigned);
+                            .handle(assignResource(deploymentHandle))
+                            
.thenCompose(registerProducedPartitions(deploymentHandle))
+                            .handle(
+                                    (ignore, throwable) -> {
+                                        if (throwable != null) {
+                                            handleTaskDeploymentFailure(
+                                                    
deploymentHandle.getExecutionVertexId(),
+                                                    throwable);
+                                        }
+                                        return null;
+                                    });
+
+            resultFutures.add(resultFuture);
         }
-        return FutureUtils.waitForAll(slotAssignedFutures);
+        return FutureUtils.waitForAll(resultFutures);
     }
 
     private BiFunction<Void, Throwable, Void> deployAll(
@@ -470,7 +487,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         }
     }
 
-    private BiFunction<LogicalSlot, Throwable, Void> 
assignResourceOrHandleError(
+    private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
             final DeploymentHandle deploymentHandle) {
         final ExecutionVertexVersion requiredVertexVersion =
                 deploymentHandle.getRequiredVertexVersion();
@@ -478,28 +495,56 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
         return (logicalSlot, throwable) -> {
             if (executionVertexVersioner.isModified(requiredVertexVersion)) {
-                log.debug(
-                        "Refusing to assign slot to execution vertex {} 
because this deployment was "
-                                + "superseded by another deployment",
-                        executionVertexId);
-                releaseSlotIfPresent(logicalSlot);
+                if (throwable == null) {
+                    log.debug(
+                            "Refusing to assign slot to execution vertex {} 
because this deployment was "
+                                    + "superseded by another deployment",
+                            executionVertexId);
+                    releaseSlotIfPresent(logicalSlot);
+                }
                 return null;
             }
 
-            if (throwable == null) {
+            // throw exception only if the execution version is not outdated.
+            // this ensures that canceling a pending slot request does not fail
+            // a task which is about to cancel in #restartTasksWithDelay(...)
+            if (throwable != null) {
+                throw new 
CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
+            }
+
+            final ExecutionVertex executionVertex = 
getExecutionVertex(executionVertexId);
+            executionVertex.tryAssignResource(logicalSlot);
+            return logicalSlot;
+        };
+    }
+
+    private Function<LogicalSlot, CompletableFuture<Void>> 
registerProducedPartitions(
+            final DeploymentHandle deploymentHandle) {
+        final ExecutionVertexID executionVertexId = 
deploymentHandle.getExecutionVertexId();
+
+        return logicalSlot -> {
+            // a null logicalSlot means the slot assignment is skipped, in 
which case
+            // the produced partition registration process can be skipped as 
well
+            if (logicalSlot != null) {
                 final ExecutionVertex executionVertex = 
getExecutionVertex(executionVertexId);
                 final boolean notifyPartitionDataAvailable =
                         
deploymentHandle.getDeploymentOption().notifyPartitionDataAvailable();
-                executionVertex
-                        .getCurrentExecutionAttempt()
-                        .registerProducedPartitions(
-                                logicalSlot.getTaskManagerLocation(), 
notifyPartitionDataAvailable);
-                executionVertex.tryAssignResource(logicalSlot);
+
+                final CompletableFuture<Void> partitionRegistrationFuture =
+                        executionVertex
+                                .getCurrentExecutionAttempt()
+                                .registerProducedPartitions(
+                                        logicalSlot.getTaskManagerLocation(),
+                                        notifyPartitionDataAvailable);
+
+                return FutureUtils.orTimeout(
+                        partitionRegistrationFuture,
+                        rpcTimeout.toMilliseconds(),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());
             } else {
-                handleTaskDeploymentFailure(
-                        executionVertexId, 
maybeWrapWithNoResourceAvailableException(throwable));
+                return FutureUtils.completedVoidFuture();
             }
-            return null;
         };
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 7fc7464..157844b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -134,7 +134,8 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                 mainThreadExecutor,
                 jobStatusListener,
                 executionGraphFactory,
-                shuffleMaster);
+                shuffleMaster,
+                rpcTimeout);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index 6b59c3c..735669f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -35,9 +35,6 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
     /**
      * Asynchronously register a partition and its producer with the shuffle 
service.
      *
-     * <p>IMPORTANT: the returned future must be completed due to limitations 
in the default
-     * scheduler.
-     *
      * <p>The returned shuffle descriptor is an internal handle which 
identifies the partition
      * internally within the shuffle service. The descriptor should provide 
enough information to
      * read from or write data to the partition.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index c724087..d3de904 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -22,21 +22,14 @@ import 
org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
-import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ProducerDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -48,8 +41,6 @@ import javax.annotation.Nonnull;
 
 import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -239,53 +230,6 @@ public class ExecutionTest extends TestLogger {
                 });
     }
 
-    /**
-     * Tests that incomplete futures returned by {@link 
ShuffleMaster#registerPartitionWithProducer}
-     * are rejected.
-     */
-    @Test
-    public void testIncompletePartitionRegistrationFutureIsRejected() throws 
Exception {
-        final ShuffleMaster<ShuffleDescriptor> shuffleMaster = new 
TestingShuffleMaster();
-        final JobVertex source = new JobVertex("source");
-        final JobVertex target = new JobVertex("target");
-
-        source.setInvokableClass(AbstractInvokable.class);
-        target.setInvokableClass(AbstractInvokable.class);
-        target.connectNewDataSetAsInput(source, POINTWISE, PIPELINED);
-
-        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source, 
target);
-        ExecutionGraph executionGraph =
-                TestingDefaultExecutionGraphBuilder.newBuilder()
-                        .setJobGraph(jobGraph)
-                        .setShuffleMaster(shuffleMaster)
-                        .build();
-
-        final ExecutionVertex sourceVertex =
-                
executionGraph.getAllVertices().get(source.getID()).getTaskVertices()[0];
-
-        boolean incompletePartitionRegistrationRejected = false;
-        try {
-            Execution.registerProducedPartitions(
-                    sourceVertex, new LocalTaskManagerLocation(), new 
ExecutionAttemptID(), false);
-        } catch (IllegalStateException e) {
-            incompletePartitionRegistrationRejected = true;
-        }
-
-        assertTrue(incompletePartitionRegistrationRejected);
-    }
-
-    private static class TestingShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
-
-        @Override
-        public CompletableFuture<ShuffleDescriptor> 
registerPartitionWithProducer(
-                PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
-            return new CompletableFuture<>();
-        }
-
-        @Override
-        public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {}
-    }
-
     @Nonnull
     private JobVertex createNoOpJobVertex() {
         final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 65f72c6..6cdc8cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.testutils.ScheduledTask;
@@ -41,7 +42,9 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRe
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import 
org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
@@ -59,6 +62,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -103,6 +107,7 @@ import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableChe
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
@@ -114,7 +119,6 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -144,6 +148,12 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private TestExecutionSlotAllocator testExecutionSlotAllocator;
 
+    private TestingShuffleMaster shuffleMaster;
+
+    private TestingJobMasterPartitionTracker partitionTracker;
+
+    private Time timeout;
+
     @Before
     public void setUp() throws Exception {
         executor = Executors.newSingleThreadExecutor();
@@ -160,6 +170,11 @@ public class DefaultSchedulerTest extends TestLogger {
 
         executionSlotAllocatorFactory = new 
TestExecutionSlotAllocatorFactory();
         testExecutionSlotAllocator = 
executionSlotAllocatorFactory.getTestExecutionSlotAllocator();
+
+        shuffleMaster = new TestingShuffleMaster();
+        partitionTracker = new TestingJobMasterPartitionTracker();
+
+        timeout = Time.seconds(60);
     }
 
     @After
@@ -1303,6 +1318,118 @@ public class DefaultSchedulerTest extends TestLogger {
                                 
executionVertex1.getCurrentAssignedResourceLocation())));
     }
 
+    @Test
+    public void testDeploymentWaitForProducedPartitionRegistration() {
+        shuffleMaster.setAutoCompleteRegistration(false);
+
+        final List<ResultPartitionID> trackedPartitions = new ArrayList<>();
+        partitionTracker.setStartTrackingPartitionsConsumer(
+                (resourceID, resultPartitionDeploymentDescriptor) ->
+                        trackedPartitions.add(
+                                resultPartitionDeploymentDescriptor
+                                        .getShuffleDescriptor()
+                                        .getResultPartitionID()));
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+
+        createSchedulerAndStartScheduling(jobGraph);
+
+        assertThat(trackedPartitions, hasSize(0));
+        assertThat(testExecutionVertexOperations.getDeployedVertices(), 
hasSize(0));
+
+        shuffleMaster.completeAllPendingRegistrations();
+        assertThat(trackedPartitions, hasSize(1));
+        assertThat(testExecutionVertexOperations.getDeployedVertices(), 
hasSize(2));
+    }
+
+    @Test
+    public void testFailedProducedPartitionRegistration() {
+        shuffleMaster.setAutoCompleteRegistration(false);
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+
+        createSchedulerAndStartScheduling(jobGraph);
+
+        assertThat(testExecutionVertexOperations.getCanceledVertices(), 
hasSize(0));
+        assertThat(testExecutionVertexOperations.getFailedVertices(), 
hasSize(0));
+
+        shuffleMaster.failAllPendingRegistrations();
+        assertThat(testExecutionVertexOperations.getCanceledVertices(), 
hasSize(2));
+        assertThat(testExecutionVertexOperations.getFailedVertices(), 
hasSize(1));
+    }
+
+    @Test
+    public void testDirectExceptionOnProducedPartitionRegistration() {
+        shuffleMaster.setThrowExceptionalOnRegistration(true);
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+
+        createSchedulerAndStartScheduling(jobGraph);
+
+        assertThat(testExecutionVertexOperations.getCanceledVertices(), 
hasSize(2));
+        assertThat(testExecutionVertexOperations.getFailedVertices(), 
hasSize(1));
+    }
+
+    @Test
+    public void testProducedPartitionRegistrationTimeout() throws Exception {
+        ScheduledExecutorService scheduledExecutorService = null;
+        try {
+            scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+            final ComponentMainThreadExecutor mainThreadExecutor =
+                    
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                            scheduledExecutorService);
+
+            shuffleMaster.setAutoCompleteRegistration(false);
+
+            final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+
+            timeout = Time.milliseconds(1);
+            createSchedulerAndStartScheduling(jobGraph, mainThreadExecutor);
+
+            Thread.sleep(100);
+
+            assertThat(testExecutionVertexOperations.getCanceledVertices(), 
hasSize(2));
+            assertThat(testExecutionVertexOperations.getFailedVertices(), 
hasSize(1));
+        } finally {
+            if (scheduledExecutorService != null) {
+                scheduledExecutorService.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void testLateRegisteredPartitionsWillBeReleased() {
+        shuffleMaster.setAutoCompleteRegistration(false);
+
+        final List<ResultPartitionID> trackedPartitions = new ArrayList<>();
+        partitionTracker.setStartTrackingPartitionsConsumer(
+                (resourceID, resultPartitionDeploymentDescriptor) ->
+                        trackedPartitions.add(
+                                resultPartitionDeploymentDescriptor
+                                        .getShuffleDescriptor()
+                                        .getResultPartitionID()));
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ArchivedExecutionVertex sourceExecutionVertex =
+                scheduler
+                        .requestJob()
+                        .getArchivedExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator()
+                        .next();
+        final ExecutionAttemptID attemptId =
+                
sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
+        
scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
+
+        // late registered partitions will not be tracked and will be released
+        shuffleMaster.completeAllPendingRegistrations();
+        assertThat(trackedPartitions, hasSize(0));
+        assertThat(shuffleMaster.getExternallyReleasedPartitions(), 
hasSize(1));
+    }
+
     private static TaskExecutionState createFailedTaskExecutionState(
             ExecutionAttemptID executionAttemptID) {
         return new TaskExecutionState(
@@ -1389,16 +1516,19 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     private DefaultScheduler createSchedulerAndStartScheduling(final JobGraph 
jobGraph) {
+        return createSchedulerAndStartScheduling(
+                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread());
+    }
+
+    private DefaultScheduler createSchedulerAndStartScheduling(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
         final SchedulingStrategyFactory schedulingStrategyFactory =
                 new PipelinedRegionSchedulingStrategy.Factory();
 
         try {
             final DefaultScheduler scheduler =
-                    createScheduler(
-                            jobGraph,
-                            
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                            schedulingStrategyFactory);
-            scheduler.startScheduling();
+                    createScheduler(jobGraph, mainThreadExecutor, 
schedulingStrategyFactory);
+            mainThreadExecutor.execute(scheduler::startScheduling);
             return scheduler;
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -1450,6 +1580,9 @@ public class DefaultSchedulerTest extends TestLogger {
                 .setExecutionVertexOperations(testExecutionVertexOperations)
                 .setExecutionVertexVersioner(executionVertexVersioner)
                 
.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+                .setShuffleMaster(shuffleMaster)
+                .setPartitionTracker(partitionTracker)
+                .setRpcTimeout(timeout)
                 .build();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 48adac5..bf3a9f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -567,7 +567,8 @@ public class SchedulerTestingUtils {
                     mainThreadExecutor,
                     jobStatusListener,
                     executionGraphFactory,
-                    shuffleMaster);
+                    shuffleMaster,
+                    rpcTimeout);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
index 64c431e..9e4a695 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
@@ -40,6 +40,10 @@ public class TestExecutionVertexOperationsDecorator 
implements ExecutionVertexOp
 
     private final List<ExecutionVertexID> deployedVertices = new ArrayList<>();
 
+    private final List<ExecutionVertexID> canceledVertices = new ArrayList<>();
+
+    private final List<ExecutionVertexID> failedVertices = new ArrayList<>();
+
     private boolean failDeploy;
 
     public TestExecutionVertexOperationsDecorator(final 
ExecutionVertexOperations delegate) {
@@ -59,11 +63,13 @@ public class TestExecutionVertexOperationsDecorator 
implements ExecutionVertexOp
 
     @Override
     public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
+        canceledVertices.add(executionVertex.getID());
         return delegate.cancel(executionVertex);
     }
 
     @Override
     public void markFailed(ExecutionVertex executionVertex, Throwable cause) {
+        failedVertices.add(executionVertex.getID());
         delegate.markFailed(executionVertex, cause);
     }
 
@@ -78,4 +84,12 @@ public class TestExecutionVertexOperationsDecorator 
implements ExecutionVertexOp
     public List<ExecutionVertexID> getDeployedVertices() {
         return Collections.unmodifiableList(deployedVertices);
     }
+
+    public List<ExecutionVertexID> getCanceledVertices() {
+        return Collections.unmodifiableList(canceledVertices);
+    }
+
+    public List<ExecutionVertexID> getFailedVertices() {
+        return Collections.unmodifiableList(failedVertices);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
new file mode 100644
index 0000000..c912953
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A {@link ShuffleMaster} implementation for tests. */
+public class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
+
+    boolean autoCompleteRegistration = true;
+
+    boolean throwExceptionalOnRegistration = false;
+
+    private final Queue<Tuple2<PartitionDescriptor, ProducerDescriptor>>
+            pendingPartitionRegistrations = new ArrayBlockingQueue<>(4);
+
+    private final Queue<CompletableFuture<ShuffleDescriptor>>
+            pendingPartitionRegistrationResponses = new 
ArrayBlockingQueue<>(4);
+
+    private final Queue<ShuffleDescriptor> externallyReleasedPartitions =
+            new ArrayBlockingQueue<>(4);
+
+    @Override
+    public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(
+            PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+        if (throwExceptionalOnRegistration) {
+            throw new RuntimeException("Forced partition registration 
failure");
+        } else if (autoCompleteRegistration) {
+            return CompletableFuture.completedFuture(
+                    createShuffleDescriptor(partitionDescriptor, 
producerDescriptor));
+        } else {
+            CompletableFuture<ShuffleDescriptor> response = new 
CompletableFuture<>();
+            pendingPartitionRegistrations.add(
+                    new Tuple2<>(partitionDescriptor, producerDescriptor));
+            pendingPartitionRegistrationResponses.add(response);
+            return response;
+        }
+    }
+
+    private ShuffleDescriptor createShuffleDescriptor(
+            PartitionDescriptor partitionDescriptor, ProducerDescriptor 
producerDescriptor) {
+
+        ResultPartitionID resultPartitionId =
+                new ResultPartitionID(
+                        partitionDescriptor.getPartitionId(),
+                        producerDescriptor.getProducerExecutionId());
+        return new TestingShuffleDescriptor(
+                resultPartitionId, producerDescriptor.getProducerLocation());
+    }
+
+    @Override
+    public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
+        externallyReleasedPartitions.add(shuffleDescriptor);
+    }
+
+    public Queue<ShuffleDescriptor> getExternallyReleasedPartitions() {
+        return externallyReleasedPartitions;
+    }
+
+    public void setAutoCompleteRegistration(boolean autoCompleteRegistration) {
+        this.autoCompleteRegistration = autoCompleteRegistration;
+    }
+
+    public void setThrowExceptionalOnRegistration(boolean 
throwExceptionalOnRegistration) {
+        this.throwExceptionalOnRegistration = throwExceptionalOnRegistration;
+    }
+
+    public void completeAllPendingRegistrations() {
+        processPendingRegistrations(
+                (response, tuple) ->
+                        response.complete(createShuffleDescriptor(tuple.f0, 
tuple.f1)));
+    }
+
+    public void failAllPendingRegistrations() {
+        processPendingRegistrations(
+                (response, ignore) ->
+                        response.completeExceptionally(
+                                new Exception("Forced partition registration 
failure")));
+    }
+
+    private void processPendingRegistrations(
+            BiConsumer<
+                            CompletableFuture<ShuffleDescriptor>,
+                            Tuple2<PartitionDescriptor, ProducerDescriptor>>
+                    processor) {
+
+        checkState(
+                pendingPartitionRegistrationResponses.size()
+                        == pendingPartitionRegistrations.size());
+
+        Tuple2<PartitionDescriptor, ProducerDescriptor> tuple;
+        while ((tuple = pendingPartitionRegistrations.poll()) != null) {
+            processor.accept(pendingPartitionRegistrationResponses.poll(), 
tuple);
+        }
+    }
+
+    private static class TestingShuffleDescriptor implements ShuffleDescriptor 
{
+
+        private final ResultPartitionID resultPartitionId;
+
+        private final ResourceID location;
+
+        TestingShuffleDescriptor(ResultPartitionID resultPartitionId, 
ResourceID location) {
+            this.resultPartitionId = resultPartitionId;
+            this.location = location;
+        }
+
+        @Override
+        public ResultPartitionID getResultPartitionID() {
+            return resultPartitionId;
+        }
+
+        @Override
+        public Optional<ResourceID> storesLocalResourcesOn() {
+            return Optional.of(location);
+        }
+    }
+}

Reply via email to