This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0d099b79fddc5e254884e44f2167c625744079a4 Author: Zhu Zhu <[email protected]> AuthorDate: Tue Jul 6 11:34:53 2021 +0800 [FLINK-22677][runtime] DefaultScheduler supports async registration of produced partitions This closes #16383. --- .../flink/runtime/executiongraph/Execution.java | 50 +++---- .../flink/runtime/scheduler/DefaultScheduler.java | 91 +++++++++---- .../runtime/scheduler/DefaultSchedulerFactory.java | 3 +- .../flink/runtime/shuffle/ShuffleMaster.java | 3 - .../runtime/executiongraph/ExecutionTest.java | 56 -------- .../runtime/scheduler/DefaultSchedulerTest.java | 145 ++++++++++++++++++++- .../runtime/scheduler/SchedulerTestingUtils.java | 3 +- .../TestExecutionVertexOperationsDecorator.java | 14 ++ .../runtime/shuffle/TestingShuffleMaster.java | 144 ++++++++++++++++++++ 9 files changed, 388 insertions(+), 121 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 3264c0d..b047229 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ProducerDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; @@ -404,7 +403,7 @@ public class Execution // Actions // -------------------------------------------------------------------------------------------- - public CompletableFuture<Execution> registerProducedPartitions( + public CompletableFuture<Void> registerProducedPartitions( TaskManagerLocation location, boolean notifyPartitionDataAvailable) { assertRunningInJobMasterMainThread(); @@ -415,34 +414,27 @@ public class Execution vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor(), producedPartitionsCache -> { producedPartitions = producedPartitionsCache; - startTrackingPartitions( - location.getResourceID(), producedPartitionsCache.values()); - return this; + + if (getState() == SCHEDULED) { + startTrackingPartitions( + location.getResourceID(), producedPartitionsCache.values()); + } else { + LOG.info( + "Discarding late registered partitions for {} task {}.", + getState(), + attemptId); + for (ResultPartitionDeploymentDescriptor desc : + producedPartitionsCache.values()) { + getVertex() + .getExecutionGraphAccessor() + .getShuffleMaster() + .releasePartitionExternally(desc.getShuffleDescriptor()); + } + } + return null; }); } - /** - * Register producedPartitions to {@link ShuffleMaster} - * - * <p>HACK: Please notice that this method simulates asynchronous registration in a synchronous - * way by making sure the returned {@link CompletableFuture} from {@link - * ShuffleMaster#registerPartitionWithProducer} is completed immediately. - * - * <p>{@link Execution#producedPartitions} are registered through an asynchronous interface - * {@link ShuffleMaster#registerPartitionWithProducer} to {@link ShuffleMaster}, however they - * are not always accessed through callbacks. So, it is possible that {@link - * Execution#producedPartitions} have not been available yet when accessed (in {@link - * Execution#deploy} for example). - * - * <p>Since the only implementation of {@link ShuffleMaster} is {@link NettyShuffleMaster}, - * which indeed registers producedPartition in a synchronous way, this method enforces - * synchronous registration under an asynchronous interface for now. - * - * <p>TODO: If asynchronous registration is needed in the future, use callbacks to access {@link - * Execution#producedPartitions}. - * - * @return completed future of partition deployment descriptors. - */ @VisibleForTesting static CompletableFuture< Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> @@ -470,10 +462,6 @@ public class Execution .getShuffleMaster() .registerPartitionWithProducer(partitionDescriptor, producerDescriptor); - // temporary hack; the scheduler does not handle incomplete futures properly - Preconditions.checkState( - shuffleDescriptorFuture.isDone(), "ShuffleDescriptor future is incomplete."); - CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration = shuffleDescriptorFuture.thenApply( shuffleDescriptor -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 4ab7dbd..65281f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -100,6 +101,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio private final ShuffleMaster<?> shuffleMaster; + private final Time rpcTimeout; + DefaultScheduler( final Logger log, final JobGraph jobGraph, @@ -120,7 +123,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final ComponentMainThreadExecutor mainThreadExecutor, final JobStatusListener jobStatusListener, final ExecutionGraphFactory executionGraphFactory, - final ShuffleMaster<?> shuffleMaster) + final ShuffleMaster<?> shuffleMaster, + final Time rpcTimeout) throws Exception { super( @@ -143,6 +147,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio this.userCodeLoader = checkNotNull(userCodeLoader); this.executionVertexOperations = checkNotNull(executionVertexOperations); this.shuffleMaster = checkNotNull(shuffleMaster); + this.rpcTimeout = checkNotNull(rpcTimeout); final FailoverStrategy failoverStrategy = failoverStrategyFactory.create( @@ -429,21 +434,33 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) { FutureUtils.assertNoException( - assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles))); + assignAllResourcesAndRegisterProducedPartitions(deploymentHandles) + .handle(deployAll(deploymentHandles))); } - private CompletableFuture<Void> assignAllResources( + private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions( final List<DeploymentHandle> deploymentHandles) { - final List<CompletableFuture<Void>> slotAssignedFutures = new ArrayList<>(); + final List<CompletableFuture<Void>> resultFutures = new ArrayList<>(); for (DeploymentHandle deploymentHandle : deploymentHandles) { - final CompletableFuture<Void> slotAssigned = + final CompletableFuture<Void> resultFuture = deploymentHandle .getSlotExecutionVertexAssignment() .getLogicalSlotFuture() - .handle(assignResourceOrHandleError(deploymentHandle)); - slotAssignedFutures.add(slotAssigned); + .handle(assignResource(deploymentHandle)) + .thenCompose(registerProducedPartitions(deploymentHandle)) + .handle( + (ignore, throwable) -> { + if (throwable != null) { + handleTaskDeploymentFailure( + deploymentHandle.getExecutionVertexId(), + throwable); + } + return null; + }); + + resultFutures.add(resultFuture); } - return FutureUtils.waitForAll(slotAssignedFutures); + return FutureUtils.waitForAll(resultFutures); } private BiFunction<Void, Throwable, Void> deployAll( @@ -470,7 +487,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio } } - private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError( + private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource( final DeploymentHandle deploymentHandle) { final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion(); @@ -478,28 +495,56 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio return (logicalSlot, throwable) -> { if (executionVertexVersioner.isModified(requiredVertexVersion)) { - log.debug( - "Refusing to assign slot to execution vertex {} because this deployment was " - + "superseded by another deployment", - executionVertexId); - releaseSlotIfPresent(logicalSlot); + if (throwable == null) { + log.debug( + "Refusing to assign slot to execution vertex {} because this deployment was " + + "superseded by another deployment", + executionVertexId); + releaseSlotIfPresent(logicalSlot); + } return null; } - if (throwable == null) { + // throw exception only if the execution version is not outdated. + // this ensures that canceling a pending slot request does not fail + // a task which is about to cancel in #restartTasksWithDelay(...) + if (throwable != null) { + throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable)); + } + + final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); + executionVertex.tryAssignResource(logicalSlot); + return logicalSlot; + }; + } + + private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions( + final DeploymentHandle deploymentHandle) { + final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId(); + + return logicalSlot -> { + // a null logicalSlot means the slot assignment is skipped, in which case + // the produced partition registration process can be skipped as well + if (logicalSlot != null) { final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); final boolean notifyPartitionDataAvailable = deploymentHandle.getDeploymentOption().notifyPartitionDataAvailable(); - executionVertex - .getCurrentExecutionAttempt() - .registerProducedPartitions( - logicalSlot.getTaskManagerLocation(), notifyPartitionDataAvailable); - executionVertex.tryAssignResource(logicalSlot); + + final CompletableFuture<Void> partitionRegistrationFuture = + executionVertex + .getCurrentExecutionAttempt() + .registerProducedPartitions( + logicalSlot.getTaskManagerLocation(), + notifyPartitionDataAvailable); + + return FutureUtils.orTimeout( + partitionRegistrationFuture, + rpcTimeout.toMilliseconds(), + TimeUnit.MILLISECONDS, + getMainThreadExecutor()); } else { - handleTaskDeploymentFailure( - executionVertexId, maybeWrapWithNoResourceAvailableException(throwable)); + return FutureUtils.completedVoidFuture(); } - return null; }; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index 7fc7464..157844b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -134,7 +134,8 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { mainThreadExecutor, jobStatusListener, executionGraphFactory, - shuffleMaster); + shuffleMaster, + rpcTimeout); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java index 6b59c3c..735669f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java @@ -35,9 +35,6 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> { /** * Asynchronously register a partition and its producer with the shuffle service. * - * <p>IMPORTANT: the returned future must be completed due to limitations in the default - * scheduler. - * * <p>The returned shuffle descriptor is an internal handle which identifies the partition * internally within the shuffle service. The descriptor should provide enough information to * read from or write data to the partition. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index c724087..d3de904 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -22,21 +22,14 @@ import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider; -import org.apache.flink.runtime.shuffle.PartitionDescriptor; -import org.apache.flink.runtime.shuffle.ProducerDescriptor; -import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.runtime.shuffle.ShuffleMaster; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -48,8 +41,6 @@ import javax.annotation.Nonnull; import java.util.concurrent.CompletableFuture; -import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; -import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -239,53 +230,6 @@ public class ExecutionTest extends TestLogger { }); } - /** - * Tests that incomplete futures returned by {@link ShuffleMaster#registerPartitionWithProducer} - * are rejected. - */ - @Test - public void testIncompletePartitionRegistrationFutureIsRejected() throws Exception { - final ShuffleMaster<ShuffleDescriptor> shuffleMaster = new TestingShuffleMaster(); - final JobVertex source = new JobVertex("source"); - final JobVertex target = new JobVertex("target"); - - source.setInvokableClass(AbstractInvokable.class); - target.setInvokableClass(AbstractInvokable.class); - target.connectNewDataSetAsInput(source, POINTWISE, PIPELINED); - - final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source, target); - ExecutionGraph executionGraph = - TestingDefaultExecutionGraphBuilder.newBuilder() - .setJobGraph(jobGraph) - .setShuffleMaster(shuffleMaster) - .build(); - - final ExecutionVertex sourceVertex = - executionGraph.getAllVertices().get(source.getID()).getTaskVertices()[0]; - - boolean incompletePartitionRegistrationRejected = false; - try { - Execution.registerProducedPartitions( - sourceVertex, new LocalTaskManagerLocation(), new ExecutionAttemptID(), false); - } catch (IllegalStateException e) { - incompletePartitionRegistrationRejected = true; - } - - assertTrue(incompletePartitionRegistrationRejected); - } - - private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> { - - @Override - public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer( - PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { - return new CompletableFuture<>(); - } - - @Override - public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {} - } - @Nonnull private JobVertex createNoOpJobVertex() { final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 65f72c6..6cdc8cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.testutils.ScheduledTask; @@ -41,7 +42,9 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRe import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; @@ -59,6 +62,7 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy; +import org.apache.flink.runtime.shuffle.TestingShuffleMaster; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -103,6 +107,7 @@ import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableChe import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -114,7 +119,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -144,6 +148,12 @@ public class DefaultSchedulerTest extends TestLogger { private TestExecutionSlotAllocator testExecutionSlotAllocator; + private TestingShuffleMaster shuffleMaster; + + private TestingJobMasterPartitionTracker partitionTracker; + + private Time timeout; + @Before public void setUp() throws Exception { executor = Executors.newSingleThreadExecutor(); @@ -160,6 +170,11 @@ public class DefaultSchedulerTest extends TestLogger { executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(); testExecutionSlotAllocator = executionSlotAllocatorFactory.getTestExecutionSlotAllocator(); + + shuffleMaster = new TestingShuffleMaster(); + partitionTracker = new TestingJobMasterPartitionTracker(); + + timeout = Time.seconds(60); } @After @@ -1303,6 +1318,118 @@ public class DefaultSchedulerTest extends TestLogger { executionVertex1.getCurrentAssignedResourceLocation()))); } + @Test + public void testDeploymentWaitForProducedPartitionRegistration() { + shuffleMaster.setAutoCompleteRegistration(false); + + final List<ResultPartitionID> trackedPartitions = new ArrayList<>(); + partitionTracker.setStartTrackingPartitionsConsumer( + (resourceID, resultPartitionDeploymentDescriptor) -> + trackedPartitions.add( + resultPartitionDeploymentDescriptor + .getShuffleDescriptor() + .getResultPartitionID())); + + final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); + + createSchedulerAndStartScheduling(jobGraph); + + assertThat(trackedPartitions, hasSize(0)); + assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0)); + + shuffleMaster.completeAllPendingRegistrations(); + assertThat(trackedPartitions, hasSize(1)); + assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(2)); + } + + @Test + public void testFailedProducedPartitionRegistration() { + shuffleMaster.setAutoCompleteRegistration(false); + + final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); + + createSchedulerAndStartScheduling(jobGraph); + + assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(0)); + assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(0)); + + shuffleMaster.failAllPendingRegistrations(); + assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2)); + assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1)); + } + + @Test + public void testDirectExceptionOnProducedPartitionRegistration() { + shuffleMaster.setThrowExceptionalOnRegistration(true); + + final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); + + createSchedulerAndStartScheduling(jobGraph); + + assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2)); + assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1)); + } + + @Test + public void testProducedPartitionRegistrationTimeout() throws Exception { + ScheduledExecutorService scheduledExecutorService = null; + try { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + scheduledExecutorService); + + shuffleMaster.setAutoCompleteRegistration(false); + + final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); + + timeout = Time.milliseconds(1); + createSchedulerAndStartScheduling(jobGraph, mainThreadExecutor); + + Thread.sleep(100); + + assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2)); + assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1)); + } finally { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + } + + @Test + public void testLateRegisteredPartitionsWillBeReleased() { + shuffleMaster.setAutoCompleteRegistration(false); + + final List<ResultPartitionID> trackedPartitions = new ArrayList<>(); + partitionTracker.setStartTrackingPartitionsConsumer( + (resourceID, resultPartitionDeploymentDescriptor) -> + trackedPartitions.add( + resultPartitionDeploymentDescriptor + .getShuffleDescriptor() + .getResultPartitionID())); + + final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ArchivedExecutionVertex sourceExecutionVertex = + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices() + .iterator() + .next(); + final ExecutionAttemptID attemptId = + sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); + scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId)); + + // late registered partitions will not be tracked and will be released + shuffleMaster.completeAllPendingRegistrations(); + assertThat(trackedPartitions, hasSize(0)); + assertThat(shuffleMaster.getExternallyReleasedPartitions(), hasSize(1)); + } + private static TaskExecutionState createFailedTaskExecutionState( ExecutionAttemptID executionAttemptID) { return new TaskExecutionState( @@ -1389,16 +1516,19 @@ public class DefaultSchedulerTest extends TestLogger { } private DefaultScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) { + return createSchedulerAndStartScheduling( + jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()); + } + + private DefaultScheduler createSchedulerAndStartScheduling( + final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) { final SchedulingStrategyFactory schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory(); try { final DefaultScheduler scheduler = - createScheduler( - jobGraph, - ComponentMainThreadExecutorServiceAdapter.forMainThread(), - schedulingStrategyFactory); - scheduler.startScheduling(); + createScheduler(jobGraph, mainThreadExecutor, schedulingStrategyFactory); + mainThreadExecutor.execute(scheduler::startScheduling); return scheduler; } catch (Exception e) { throw new RuntimeException(e); @@ -1450,6 +1580,9 @@ public class DefaultSchedulerTest extends TestLogger { .setExecutionVertexOperations(testExecutionVertexOperations) .setExecutionVertexVersioner(executionVertexVersioner) .setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory) + .setShuffleMaster(shuffleMaster) + .setPartitionTracker(partitionTracker) + .setRpcTimeout(timeout) .build(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index 48adac5..bf3a9f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -567,7 +567,8 @@ public class SchedulerTestingUtils { mainThreadExecutor, jobStatusListener, executionGraphFactory, - shuffleMaster); + shuffleMaster, + rpcTimeout); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java index 64c431e..9e4a695 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java @@ -40,6 +40,10 @@ public class TestExecutionVertexOperationsDecorator implements ExecutionVertexOp private final List<ExecutionVertexID> deployedVertices = new ArrayList<>(); + private final List<ExecutionVertexID> canceledVertices = new ArrayList<>(); + + private final List<ExecutionVertexID> failedVertices = new ArrayList<>(); + private boolean failDeploy; public TestExecutionVertexOperationsDecorator(final ExecutionVertexOperations delegate) { @@ -59,11 +63,13 @@ public class TestExecutionVertexOperationsDecorator implements ExecutionVertexOp @Override public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) { + canceledVertices.add(executionVertex.getID()); return delegate.cancel(executionVertex); } @Override public void markFailed(ExecutionVertex executionVertex, Throwable cause) { + failedVertices.add(executionVertex.getID()); delegate.markFailed(executionVertex, cause); } @@ -78,4 +84,12 @@ public class TestExecutionVertexOperationsDecorator implements ExecutionVertexOp public List<ExecutionVertexID> getDeployedVertices() { return Collections.unmodifiableList(deployedVertices); } + + public List<ExecutionVertexID> getCanceledVertices() { + return Collections.unmodifiableList(canceledVertices); + } + + public List<ExecutionVertexID> getFailedVertices() { + return Collections.unmodifiableList(failedVertices); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java new file mode 100644 index 0000000..c912953 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/TestingShuffleMaster.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link ShuffleMaster} implementation for tests. */ +public class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> { + + boolean autoCompleteRegistration = true; + + boolean throwExceptionalOnRegistration = false; + + private final Queue<Tuple2<PartitionDescriptor, ProducerDescriptor>> + pendingPartitionRegistrations = new ArrayBlockingQueue<>(4); + + private final Queue<CompletableFuture<ShuffleDescriptor>> + pendingPartitionRegistrationResponses = new ArrayBlockingQueue<>(4); + + private final Queue<ShuffleDescriptor> externallyReleasedPartitions = + new ArrayBlockingQueue<>(4); + + @Override + public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer( + PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + if (throwExceptionalOnRegistration) { + throw new RuntimeException("Forced partition registration failure"); + } else if (autoCompleteRegistration) { + return CompletableFuture.completedFuture( + createShuffleDescriptor(partitionDescriptor, producerDescriptor)); + } else { + CompletableFuture<ShuffleDescriptor> response = new CompletableFuture<>(); + pendingPartitionRegistrations.add( + new Tuple2<>(partitionDescriptor, producerDescriptor)); + pendingPartitionRegistrationResponses.add(response); + return response; + } + } + + private ShuffleDescriptor createShuffleDescriptor( + PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + + ResultPartitionID resultPartitionId = + new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + return new TestingShuffleDescriptor( + resultPartitionId, producerDescriptor.getProducerLocation()); + } + + @Override + public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) { + externallyReleasedPartitions.add(shuffleDescriptor); + } + + public Queue<ShuffleDescriptor> getExternallyReleasedPartitions() { + return externallyReleasedPartitions; + } + + public void setAutoCompleteRegistration(boolean autoCompleteRegistration) { + this.autoCompleteRegistration = autoCompleteRegistration; + } + + public void setThrowExceptionalOnRegistration(boolean throwExceptionalOnRegistration) { + this.throwExceptionalOnRegistration = throwExceptionalOnRegistration; + } + + public void completeAllPendingRegistrations() { + processPendingRegistrations( + (response, tuple) -> + response.complete(createShuffleDescriptor(tuple.f0, tuple.f1))); + } + + public void failAllPendingRegistrations() { + processPendingRegistrations( + (response, ignore) -> + response.completeExceptionally( + new Exception("Forced partition registration failure"))); + } + + private void processPendingRegistrations( + BiConsumer< + CompletableFuture<ShuffleDescriptor>, + Tuple2<PartitionDescriptor, ProducerDescriptor>> + processor) { + + checkState( + pendingPartitionRegistrationResponses.size() + == pendingPartitionRegistrations.size()); + + Tuple2<PartitionDescriptor, ProducerDescriptor> tuple; + while ((tuple = pendingPartitionRegistrations.poll()) != null) { + processor.accept(pendingPartitionRegistrationResponses.poll(), tuple); + } + } + + private static class TestingShuffleDescriptor implements ShuffleDescriptor { + + private final ResultPartitionID resultPartitionId; + + private final ResourceID location; + + TestingShuffleDescriptor(ResultPartitionID resultPartitionId, ResourceID location) { + this.resultPartitionId = resultPartitionId; + this.location = location; + } + + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionId; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.of(location); + } + } +}
