This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80df36b51af791f67126e07e182015ea6ea73fd2 Author: kevin.cyj <[email protected]> AuthorDate: Mon Jul 5 09:42:18 2021 +0800 [FLINK-22675][runtime] Add lifecycle methods to ShuffleMaster This patch adds some lifecycle methods to ShuffleMaster including open, close, registerJob and unregisterJob. This closes #16465. --- .../partition/JobMasterPartitionTracker.java | 13 +- .../partition/JobMasterPartitionTrackerImpl.java | 13 +- .../jobmaster/JobManagerSharedServices.java | 7 + .../apache/flink/runtime/jobmaster/JobMaster.java | 19 ++ .../flink/runtime/jobmaster/JobMasterGateway.java | 8 + .../flink/runtime/shuffle/JobShuffleContext.java | 42 ++++ .../runtime/shuffle/JobShuffleContextImpl.java | 52 +++++ .../flink/runtime/shuffle/ShuffleMaster.java | 34 ++- .../partition/NoOpJobMasterPartitionTracker.java | 2 +- .../TestingJobMasterPartitionTracker.java | 3 +- .../jobmaster/utils/TestingJobMasterGateway.java | 6 + .../flink/runtime/shuffle/ShuffleMasterTest.java | 251 +++++++++++++++++++++ 12 files changed, 442 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java index 967a446..cb7c2be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java @@ -39,7 +39,18 @@ public interface JobMasterPartitionTracker ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor); /** Releases the given partitions and stop the tracking of partitions that were released. */ - void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds); + default void stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> resultPartitionIds) { + stopTrackingAndReleasePartitions(resultPartitionIds, true); + } + + /** + * Releases the given partitions and stop the tracking of partitions that were released. The + * boolean flag indicates whether we need to notify the ShuffleMaster to release all external + * resources or not. + */ + void stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster); /** * Releases the job partitions and promotes the cluster partitions, and stops the tracking of diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java index cbc5a95..f1ed2e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java @@ -99,10 +99,12 @@ public class JobMasterPartitionTrackerImpl } @Override - public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds) { + public void stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster) { stopTrackingAndHandlePartitions( resultPartitionIds, - (tmID, partitionDescs) -> internalReleasePartitions(tmID, partitionDescs)); + (tmID, partitionDescs) -> + internalReleasePartitions(tmID, partitionDescs, releaseOnShuffleMaster)); } @Override @@ -138,11 +140,14 @@ public class JobMasterPartitionTrackerImpl private void internalReleasePartitions( ResourceID potentialPartitionLocation, - Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { + Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors, + boolean releaseOnShuffleMaster) { internalReleasePartitionsOnTaskExecutor( potentialPartitionLocation, partitionDeploymentDescriptors); - internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors.stream()); + if (releaseOnShuffleMaster) { + internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors.stream()); + } } private void internalReleaseOrPromotePartitions( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java index 1e0f0ae..1700722 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java @@ -102,6 +102,12 @@ public class JobManagerSharedServices { firstException = t; } + try { + shuffleMaster.close(); + } catch (Throwable t) { + firstException = firstException == null ? t : firstException; + } + libraryCacheManager.shutdown(); if (firstException != null) { @@ -151,6 +157,7 @@ public class JobManagerSharedServices { final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(config) .createShuffleMaster(shuffleMasterContext); + shuffleMaster.start(); return new JobManagerSharedServices( futureExecutor, libraryCacheManager, shuffleMaster, blobServer); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index dbb0e4a..4464bd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -74,6 +74,8 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.shuffle.JobShuffleContext; +import org.apache.flink.runtime.shuffle.JobShuffleContextImpl; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.state.KeyGroupRange; @@ -837,6 +839,19 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> } } + @Override + public CompletableFuture<?> stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> partitionIds) { + CompletableFuture<?> future = new CompletableFuture<>(); + try { + partitionTracker.stopTrackingAndReleasePartitions(partitionIds, false); + future.complete(null); + } catch (Throwable throwable) { + future.completeExceptionally(throwable); + } + return future; + } + // ---------------------------------------------------------------------------------------------- // Internal methods // ---------------------------------------------------------------------------------------------- @@ -847,6 +862,9 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> private void startJobExecution() throws Exception { validateRunsInMainThread(); + JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this); + shuffleMaster.registerJob(context); + startJobMasterServices(); log.info( @@ -913,6 +931,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> return FutureUtils.runAfterwards( terminationFuture, () -> { + shuffleMaster.unregisterJob(jobGraph.getJobID()); disconnectTaskManagerResourceManagerConnections(cause); stopJobMasterServices(); }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 69fdc92..83ff6d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -291,4 +291,12 @@ public interface JobMasterGateway OperatorID operatorId, SerializedValue<CoordinationRequest> serializedRequest, @RpcTimeout Time timeout); + + /** + * Notifies the {@link org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker} + * to stop tracking the target result partitions and release the locally occupied resources on + * {@link org.apache.flink.runtime.taskexecutor.TaskExecutor}s if any. + */ + CompletableFuture<?> stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> partitionIds); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java new file mode 100644 index 0000000..3cfa996 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Job level shuffle context which can offer some job information like job ID and through it, the + * shuffle plugin notify the job to stop tracking the lost result partitions. + */ +public interface JobShuffleContext { + + /** @return the corresponding {@link JobID}. */ + JobID getJobId(); + + /** + * Notifies the job to stop tracking and release the target result partitions, which means these + * partitions will be removed and will be reproduced if used afterwards. + */ + CompletableFuture<?> stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> partitionIds); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java new file mode 100644 index 0000000..d056897 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The default implementation of {@link JobShuffleContext}. */ +public class JobShuffleContextImpl implements JobShuffleContext { + + private final JobID jobId; + + private final JobMasterGateway jobMasterGateway; + + public JobShuffleContextImpl(JobID jobId, JobMasterGateway jobMasterGateway) { + this.jobId = checkNotNull(jobId); + this.jobMasterGateway = checkNotNull(jobMasterGateway); + } + + @Override + public JobID getJobId() { + return jobId; + } + + @Override + public CompletableFuture<?> stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> partitionIds) { + return jobMasterGateway.stopTrackingAndReleasePartitions(partitionIds); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java index b0220e9..82aa453 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java @@ -31,7 +31,39 @@ import java.util.concurrent.CompletableFuture; * @param <T> partition shuffle descriptor used for producer/consumer deployment and their data * exchange. */ -public interface ShuffleMaster<T extends ShuffleDescriptor> { +public interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable { + + /** + * Starts this shuffle master as a service. One can do some initialization here, for example + * getting access and connecting to the external system. + */ + default void start() throws Exception {} + + /** + * Closes this shuffle master service which should release all resources. A shuffle master will + * only be closed when the cluster is shut down. + */ + @Override + default void close() throws Exception {} + + /** + * Registers the target job together with the corresponding {@link JobShuffleContext} to this + * shuffle master. Through the shuffle context, one can obtain some basic information like job + * ID, job configuration. It enables ShuffleMaster to notify JobMaster about lost result + * partitions, so that JobMaster can identify and reproduce unavailable partitions earlier. + * + * @param context the corresponding shuffle context of the target job. + */ + default void registerJob(JobShuffleContext context) {} + + /** + * Unregisters the target job from this shuffle master, which means the corresponding job has + * reached a global termination state and all the allocated resources except for the cluster + * partitions can be cleared. + * + * @param jobID ID of the target job to be unregistered. + */ + default void unregisterJob(JobID jobID) {} /** * Asynchronously register a partition and its producer with the shuffle service. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java index 4fa9176..8572f2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java @@ -42,7 +42,7 @@ public enum NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker { @Override public void stopTrackingAndReleasePartitions( - Collection<ResultPartitionID> resultPartitionIds) {} + Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster) {} @Override public void stopTrackingAndReleaseOrPromotePartitions( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java index 99ceaef..8aecfd8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java @@ -103,7 +103,8 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack } @Override - public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds) { + public void stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster) { stopTrackingAndReleasePartitionsConsumer.accept(resultPartitionIds); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 020ea17..7373849 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -517,4 +517,10 @@ public class TestingJobMasterGateway implements JobMasterGateway { Time timeout) { return deliverCoordinationRequestFunction.apply(operatorId, serializedRequest); } + + @Override + public CompletableFuture<?> stopTrackingAndReleasePartitions( + Collection<ResultPartitionID> partitionIds) { + return CompletableFuture.completedFuture(null); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java new file mode 100644 index 0000000..615a290 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link ShuffleMaster}. */ +public class ShuffleMasterTest extends TestLogger { + + private static final String STOP_TRACKING_PARTITION_KEY = "stop_tracking_partition_key"; + + private static final String PARTITION_REGISTRATION_EVENT = "registerPartitionWithProducer"; + + private static final String EXTERNAL_PARTITION_RELEASE_EVENT = "releasePartitionExternally"; + + @Before + public void before() { + TestShuffleMaster.partitionEvents.clear(); + } + + @Test + public void testShuffleMasterLifeCycle() throws Exception { + try (MiniCluster cluster = new MiniCluster(createClusterConfiguration(false))) { + cluster.start(); + cluster.executeJobBlocking(createJobGraph()); + } + assertTrue(TestShuffleMaster.currentInstance.get().closed.get()); + + String[] expectedPartitionEvents = + new String[] { + PARTITION_REGISTRATION_EVENT, + PARTITION_REGISTRATION_EVENT, + EXTERNAL_PARTITION_RELEASE_EVENT, + EXTERNAL_PARTITION_RELEASE_EVENT, + }; + assertArrayEquals(expectedPartitionEvents, TestShuffleMaster.partitionEvents.toArray()); + } + + @Test + public void testStopTrackingPartition() throws Exception { + try (MiniCluster cluster = new MiniCluster(createClusterConfiguration(true))) { + cluster.start(); + cluster.executeJobBlocking(createJobGraph()); + } + assertTrue(TestShuffleMaster.currentInstance.get().closed.get()); + + String[] expectedPartitionEvents = + new String[] { + PARTITION_REGISTRATION_EVENT, + PARTITION_REGISTRATION_EVENT, + PARTITION_REGISTRATION_EVENT, + PARTITION_REGISTRATION_EVENT, + EXTERNAL_PARTITION_RELEASE_EVENT, + EXTERNAL_PARTITION_RELEASE_EVENT, + }; + assertArrayEquals(expectedPartitionEvents, TestShuffleMaster.partitionEvents.toArray()); + } + + private MiniClusterConfiguration createClusterConfiguration(boolean stopTrackingPartition) { + Configuration configuration = new Configuration(); + configuration.setString( + ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS, + TestShuffleServiceFactory.class.getName()); + configuration.setString(RestOptions.BIND_PORT, "0"); + configuration.setBoolean(STOP_TRACKING_PARTITION_KEY, stopTrackingPartition); + return new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(1) + .setConfiguration(configuration) + .build(); + } + + private JobGraph createJobGraph() throws Exception { + JobVertex source = new JobVertex("source"); + source.setParallelism(2); + source.setInvokableClass(NoOpInvokable.class); + + JobVertex sink = new JobVertex("sink"); + sink.setParallelism(2); + sink.setInvokableClass(NoOpInvokable.class); + + sink.connectNewDataSetAsInput( + source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink); + ExecutionConfig config = new ExecutionConfig(); + config.setRestartStrategy(fixedDelayRestart(2, Time.seconds(2))); + jobGraph.setExecutionConfig(config); + return jobGraph; + } + + /** An {@link TestShuffleServiceFactory} implementation for testing. */ + public static class TestShuffleServiceFactory extends NettyShuffleServiceFactory { + @Override + public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext shuffleMasterContext) { + return new TestShuffleMaster(shuffleMasterContext.getConfiguration()); + } + } + + /** An {@link ShuffleMaster} implementation for testing. */ + private static class TestShuffleMaster extends NettyShuffleMaster { + + private static final AtomicReference<TestShuffleMaster> currentInstance = + new AtomicReference<>(); + + private static final BlockingQueue<String> partitionEvents = new LinkedBlockingQueue<>(); + + private final AtomicBoolean started = new AtomicBoolean(); + + private final AtomicBoolean closed = new AtomicBoolean(); + + private final BlockingQueue<ResultPartitionID> partitions = new LinkedBlockingQueue<>(); + + private final AtomicReference<JobShuffleContext> jobContext = new AtomicReference<>(); + + private final boolean stopTrackingPartition; + + public TestShuffleMaster(Configuration conf) { + super(conf); + this.stopTrackingPartition = conf.getBoolean(STOP_TRACKING_PARTITION_KEY, false); + currentInstance.set(this); + } + + @Override + public void start() throws Exception { + assertFalse(started.get()); + assertFalse(closed.get()); + started.set(true); + super.start(); + } + + @Override + public void close() throws Exception { + assertShuffleMasterAlive(); + closed.set(true); + super.close(); + } + + @Override + public void registerJob(JobShuffleContext context) { + assertShuffleMasterAlive(); + assertTrue(jobContext.compareAndSet(null, context)); + super.registerJob(context); + } + + @Override + public void unregisterJob(JobID jobID) { + assertJobRegistered(); + jobContext.set(null); + super.unregisterJob(jobID); + } + + @Override + public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer( + JobID jobID, + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { + assertJobRegistered(); + partitionEvents.add(PARTITION_REGISTRATION_EVENT); + + CompletableFuture<NettyShuffleDescriptor> future = new CompletableFuture<>(); + try { + NettyShuffleDescriptor shuffleDescriptor = + super.registerPartitionWithProducer( + jobID, partitionDescriptor, producerDescriptor) + .get(); + // stop tracking the first registered partition when registering the second + // partition and trigger the failure of the second task, it is expected that + // the first partition will be reproduced + if (partitions.size() == 1 && stopTrackingPartition) { + jobContext + .get() + .stopTrackingAndReleasePartitions( + Collections.singletonList(partitions.peek())) + .thenRun(() -> future.completeExceptionally(new Exception("Test"))); + } else { + future.complete(shuffleDescriptor); + } + partitions.add(shuffleDescriptor.getResultPartitionID()); + } catch (Throwable throwable) { + future.completeExceptionally(throwable); + } + return future; + } + + @Override + public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) { + assertJobRegistered(); + partitionEvents.add(EXTERNAL_PARTITION_RELEASE_EVENT); + + super.releasePartitionExternally(shuffleDescriptor); + } + + private void assertShuffleMasterAlive() { + assertFalse(closed.get()); + assertTrue(started.get()); + } + + private void assertJobRegistered() { + assertShuffleMasterAlive(); + assertNotNull(jobContext.get()); + } + } +}
