This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80df36b51af791f67126e07e182015ea6ea73fd2
Author: kevin.cyj <[email protected]>
AuthorDate: Mon Jul 5 09:42:18 2021 +0800

    [FLINK-22675][runtime] Add lifecycle methods to ShuffleMaster
    
    This patch adds some lifecycle methods to ShuffleMaster including open, 
close, registerJob and unregisterJob.
    
    This closes #16465.
---
 .../partition/JobMasterPartitionTracker.java       |  13 +-
 .../partition/JobMasterPartitionTrackerImpl.java   |  13 +-
 .../jobmaster/JobManagerSharedServices.java        |   7 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  19 ++
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   8 +
 .../flink/runtime/shuffle/JobShuffleContext.java   |  42 ++++
 .../runtime/shuffle/JobShuffleContextImpl.java     |  52 +++++
 .../flink/runtime/shuffle/ShuffleMaster.java       |  34 ++-
 .../partition/NoOpJobMasterPartitionTracker.java   |   2 +-
 .../TestingJobMasterPartitionTracker.java          |   3 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   6 +
 .../flink/runtime/shuffle/ShuffleMasterTest.java   | 251 +++++++++++++++++++++
 12 files changed, 442 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
index 967a446..cb7c2be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
@@ -39,7 +39,18 @@ public interface JobMasterPartitionTracker
             ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor);
 
     /** Releases the given partitions and stop the tracking of partitions that 
were released. */
-    void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds);
+    default void stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> resultPartitionIds) {
+        stopTrackingAndReleasePartitions(resultPartitionIds, true);
+    }
+
+    /**
+     * Releases the given partitions and stop the tracking of partitions that 
were released. The
+     * boolean flag indicates whether we need to notify the ShuffleMaster to 
release all external
+     * resources or not.
+     */
+    void stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> resultPartitionIds, boolean 
releaseOnShuffleMaster);
 
     /**
      * Releases the job partitions and promotes the cluster partitions, and 
stops the tracking of
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
index cbc5a95..f1ed2e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
@@ -99,10 +99,12 @@ public class JobMasterPartitionTrackerImpl
     }
 
     @Override
-    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+    public void stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> resultPartitionIds, boolean 
releaseOnShuffleMaster) {
         stopTrackingAndHandlePartitions(
                 resultPartitionIds,
-                (tmID, partitionDescs) -> internalReleasePartitions(tmID, 
partitionDescs));
+                (tmID, partitionDescs) ->
+                        internalReleasePartitions(tmID, partitionDescs, 
releaseOnShuffleMaster));
     }
 
     @Override
@@ -138,11 +140,14 @@ public class JobMasterPartitionTrackerImpl
 
     private void internalReleasePartitions(
             ResourceID potentialPartitionLocation,
-            Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors) {
+            Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors,
+            boolean releaseOnShuffleMaster) {
 
         internalReleasePartitionsOnTaskExecutor(
                 potentialPartitionLocation, partitionDeploymentDescriptors);
-        
internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors.stream());
+        if (releaseOnShuffleMaster) {
+            
internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors.stream());
+        }
     }
 
     private void internalReleaseOrPromotePartitions(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index 1e0f0ae..1700722 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -102,6 +102,12 @@ public class JobManagerSharedServices {
             firstException = t;
         }
 
+        try {
+            shuffleMaster.close();
+        } catch (Throwable t) {
+            firstException = firstException == null ? t : firstException;
+        }
+
         libraryCacheManager.shutdown();
 
         if (firstException != null) {
@@ -151,6 +157,7 @@ public class JobManagerSharedServices {
         final ShuffleMaster<?> shuffleMaster =
                 ShuffleServiceLoader.loadShuffleServiceFactory(config)
                         .createShuffleMaster(shuffleMasterContext);
+        shuffleMaster.start();
 
         return new JobManagerSharedServices(
                 futureExecutor, libraryCacheManager, shuffleMaster, 
blobServer);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index dbb0e4a..4464bd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -74,6 +74,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -837,6 +839,19 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
         }
     }
 
+    @Override
+    public CompletableFuture<?> stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> partitionIds) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        try {
+            partitionTracker.stopTrackingAndReleasePartitions(partitionIds, 
false);
+            future.complete(null);
+        } catch (Throwable throwable) {
+            future.completeExceptionally(throwable);
+        }
+        return future;
+    }
+
     // 
----------------------------------------------------------------------------------------------
     // Internal methods
     // 
----------------------------------------------------------------------------------------------
@@ -847,6 +862,9 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
     private void startJobExecution() throws Exception {
         validateRunsInMainThread();
 
+        JobShuffleContext context = new 
JobShuffleContextImpl(jobGraph.getJobID(), this);
+        shuffleMaster.registerJob(context);
+
         startJobMasterServices();
 
         log.info(
@@ -913,6 +931,7 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
         return FutureUtils.runAfterwards(
                 terminationFuture,
                 () -> {
+                    shuffleMaster.unregisterJob(jobGraph.getJobID());
                     disconnectTaskManagerResourceManagerConnections(cause);
                     stopJobMasterServices();
                 });
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 69fdc92..83ff6d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -291,4 +291,12 @@ public interface JobMasterGateway
             OperatorID operatorId,
             SerializedValue<CoordinationRequest> serializedRequest,
             @RpcTimeout Time timeout);
+
+    /**
+     * Notifies the {@link 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker}
+     * to stop tracking the target result partitions and release the locally 
occupied resources on
+     * {@link org.apache.flink.runtime.taskexecutor.TaskExecutor}s if any.
+     */
+    CompletableFuture<?> stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> partitionIds);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java
new file mode 100644
index 0000000..3cfa996
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Job level shuffle context which can offer some job information like job ID 
and through it, the
+ * shuffle plugin notify the job to stop tracking the lost result partitions.
+ */
+public interface JobShuffleContext {
+
+    /** @return the corresponding {@link JobID}. */
+    JobID getJobId();
+
+    /**
+     * Notifies the job to stop tracking and release the target result 
partitions, which means these
+     * partitions will be removed and will be reproduced if used afterwards.
+     */
+    CompletableFuture<?> stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> partitionIds);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java
new file mode 100644
index 0000000..d056897
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContextImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The default implementation of {@link JobShuffleContext}. */
+public class JobShuffleContextImpl implements JobShuffleContext {
+
+    private final JobID jobId;
+
+    private final JobMasterGateway jobMasterGateway;
+
+    public JobShuffleContextImpl(JobID jobId, JobMasterGateway 
jobMasterGateway) {
+        this.jobId = checkNotNull(jobId);
+        this.jobMasterGateway = checkNotNull(jobMasterGateway);
+    }
+
+    @Override
+    public JobID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public CompletableFuture<?> stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> partitionIds) {
+        return jobMasterGateway.stopTrackingAndReleasePartitions(partitionIds);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index b0220e9..82aa453 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -31,7 +31,39 @@ import java.util.concurrent.CompletableFuture;
  * @param <T> partition shuffle descriptor used for producer/consumer 
deployment and their data
  *     exchange.
  */
-public interface ShuffleMaster<T extends ShuffleDescriptor> {
+public interface ShuffleMaster<T extends ShuffleDescriptor> extends 
AutoCloseable {
+
+    /**
+     * Starts this shuffle master as a service. One can do some initialization 
here, for example
+     * getting access and connecting to the external system.
+     */
+    default void start() throws Exception {}
+
+    /**
+     * Closes this shuffle master service which should release all resources. 
A shuffle master will
+     * only be closed when the cluster is shut down.
+     */
+    @Override
+    default void close() throws Exception {}
+
+    /**
+     * Registers the target job together with the corresponding {@link 
JobShuffleContext} to this
+     * shuffle master. Through the shuffle context, one can obtain some basic 
information like job
+     * ID, job configuration. It enables ShuffleMaster to notify JobMaster 
about lost result
+     * partitions, so that JobMaster can identify and reproduce unavailable 
partitions earlier.
+     *
+     * @param context the corresponding shuffle context of the target job.
+     */
+    default void registerJob(JobShuffleContext context) {}
+
+    /**
+     * Unregisters the target job from this shuffle master, which means the 
corresponding job has
+     * reached a global termination state and all the allocated resources 
except for the cluster
+     * partitions can be cleared.
+     *
+     * @param jobID ID of the target job to be unregistered.
+     */
+    default void unregisterJob(JobID jobID) {}
 
     /**
      * Asynchronously register a partition and its producer with the shuffle 
service.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
index 4fa9176..8572f2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
@@ -42,7 +42,7 @@ public enum NoOpJobMasterPartitionTracker implements 
JobMasterPartitionTracker {
 
     @Override
     public void stopTrackingAndReleasePartitions(
-            Collection<ResultPartitionID> resultPartitionIds) {}
+            Collection<ResultPartitionID> resultPartitionIds, boolean 
releaseOnShuffleMaster) {}
 
     @Override
     public void stopTrackingAndReleaseOrPromotePartitions(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
index 99ceaef..8aecfd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
@@ -103,7 +103,8 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
     }
 
     @Override
-    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+    public void stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> resultPartitionIds, boolean 
releaseOnShuffleMaster) {
         stopTrackingAndReleasePartitionsConsumer.accept(resultPartitionIds);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 020ea17..7373849 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -517,4 +517,10 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
             Time timeout) {
         return deliverCoordinationRequestFunction.apply(operatorId, 
serializedRequest);
     }
+
+    @Override
+    public CompletableFuture<?> stopTrackingAndReleasePartitions(
+            Collection<ResultPartitionID> partitionIds) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
new file mode 100644
index 0000000..615a290
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ShuffleMaster}. */
+public class ShuffleMasterTest extends TestLogger {
+
+    private static final String STOP_TRACKING_PARTITION_KEY = 
"stop_tracking_partition_key";
+
+    private static final String PARTITION_REGISTRATION_EVENT = 
"registerPartitionWithProducer";
+
+    private static final String EXTERNAL_PARTITION_RELEASE_EVENT = 
"releasePartitionExternally";
+
+    @Before
+    public void before() {
+        TestShuffleMaster.partitionEvents.clear();
+    }
+
+    @Test
+    public void testShuffleMasterLifeCycle() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(false))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    PARTITION_REGISTRATION_EVENT,
+                    PARTITION_REGISTRATION_EVENT,
+                    EXTERNAL_PARTITION_RELEASE_EVENT,
+                    EXTERNAL_PARTITION_RELEASE_EVENT,
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+    }
+
+    @Test
+    public void testStopTrackingPartition() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(true))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    PARTITION_REGISTRATION_EVENT,
+                    PARTITION_REGISTRATION_EVENT,
+                    PARTITION_REGISTRATION_EVENT,
+                    PARTITION_REGISTRATION_EVENT,
+                    EXTERNAL_PARTITION_RELEASE_EVENT,
+                    EXTERNAL_PARTITION_RELEASE_EVENT,
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+    }
+
+    private MiniClusterConfiguration createClusterConfiguration(boolean 
stopTrackingPartition) {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS,
+                TestShuffleServiceFactory.class.getName());
+        configuration.setString(RestOptions.BIND_PORT, "0");
+        configuration.setBoolean(STOP_TRACKING_PARTITION_KEY, 
stopTrackingPartition);
+        return new MiniClusterConfiguration.Builder()
+                .setNumTaskManagers(1)
+                .setNumSlotsPerTaskManager(1)
+                .setConfiguration(configuration)
+                .build();
+    }
+
+    private JobGraph createJobGraph() throws Exception {
+        JobVertex source = new JobVertex("source");
+        source.setParallelism(2);
+        source.setInvokableClass(NoOpInvokable.class);
+
+        JobVertex sink = new JobVertex("sink");
+        sink.setParallelism(2);
+        sink.setInvokableClass(NoOpInvokable.class);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
+        ExecutionConfig config = new ExecutionConfig();
+        config.setRestartStrategy(fixedDelayRestart(2, Time.seconds(2)));
+        jobGraph.setExecutionConfig(config);
+        return jobGraph;
+    }
+
+    /** An {@link TestShuffleServiceFactory} implementation for testing. */
+    public static class TestShuffleServiceFactory extends 
NettyShuffleServiceFactory {
+        @Override
+        public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext 
shuffleMasterContext) {
+            return new 
TestShuffleMaster(shuffleMasterContext.getConfiguration());
+        }
+    }
+
+    /** An {@link ShuffleMaster} implementation for testing. */
+    private static class TestShuffleMaster extends NettyShuffleMaster {
+
+        private static final AtomicReference<TestShuffleMaster> 
currentInstance =
+                new AtomicReference<>();
+
+        private static final BlockingQueue<String> partitionEvents = new 
LinkedBlockingQueue<>();
+
+        private final AtomicBoolean started = new AtomicBoolean();
+
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        private final BlockingQueue<ResultPartitionID> partitions = new 
LinkedBlockingQueue<>();
+
+        private final AtomicReference<JobShuffleContext> jobContext = new 
AtomicReference<>();
+
+        private final boolean stopTrackingPartition;
+
+        public TestShuffleMaster(Configuration conf) {
+            super(conf);
+            this.stopTrackingPartition = 
conf.getBoolean(STOP_TRACKING_PARTITION_KEY, false);
+            currentInstance.set(this);
+        }
+
+        @Override
+        public void start() throws Exception {
+            assertFalse(started.get());
+            assertFalse(closed.get());
+            started.set(true);
+            super.start();
+        }
+
+        @Override
+        public void close() throws Exception {
+            assertShuffleMasterAlive();
+            closed.set(true);
+            super.close();
+        }
+
+        @Override
+        public void registerJob(JobShuffleContext context) {
+            assertShuffleMasterAlive();
+            assertTrue(jobContext.compareAndSet(null, context));
+            super.registerJob(context);
+        }
+
+        @Override
+        public void unregisterJob(JobID jobID) {
+            assertJobRegistered();
+            jobContext.set(null);
+            super.unregisterJob(jobID);
+        }
+
+        @Override
+        public CompletableFuture<NettyShuffleDescriptor> 
registerPartitionWithProducer(
+                JobID jobID,
+                PartitionDescriptor partitionDescriptor,
+                ProducerDescriptor producerDescriptor) {
+            assertJobRegistered();
+            partitionEvents.add(PARTITION_REGISTRATION_EVENT);
+
+            CompletableFuture<NettyShuffleDescriptor> future = new 
CompletableFuture<>();
+            try {
+                NettyShuffleDescriptor shuffleDescriptor =
+                        super.registerPartitionWithProducer(
+                                        jobID, partitionDescriptor, 
producerDescriptor)
+                                .get();
+                // stop tracking the first registered partition when 
registering the second
+                // partition and trigger the failure of the second task, it is 
expected that
+                // the first partition will be reproduced
+                if (partitions.size() == 1 && stopTrackingPartition) {
+                    jobContext
+                            .get()
+                            .stopTrackingAndReleasePartitions(
+                                    
Collections.singletonList(partitions.peek()))
+                            .thenRun(() -> future.completeExceptionally(new 
Exception("Test")));
+                } else {
+                    future.complete(shuffleDescriptor);
+                }
+                partitions.add(shuffleDescriptor.getResultPartitionID());
+            } catch (Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+            return future;
+        }
+
+        @Override
+        public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
+            assertJobRegistered();
+            partitionEvents.add(EXTERNAL_PARTITION_RELEASE_EVENT);
+
+            super.releasePartitionExternally(shuffleDescriptor);
+        }
+
+        private void assertShuffleMasterAlive() {
+            assertFalse(closed.get());
+            assertTrue(started.get());
+        }
+
+        private void assertJobRegistered() {
+            assertShuffleMasterAlive();
+            assertNotNull(jobContext.get());
+        }
+    }
+}

Reply via email to