zhuzhurk commented on a change in pull request #16465:
URL: https://github.com/apache/flink/pull/16465#discussion_r678868130



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Job level shuffle context which can offer some job information like job ID 
and through it, the
+ * shuffle plugin can stop tracking the lost result partitions.

Review comment:
       maybe `can stop tracking` -> `can notify the job to stop tracking`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -837,6 +841,18 @@ public void notifyNotEnoughResourcesAvailable(
         }
     }
 
+    @Override
+    public CompletableFuture<?> 
stopTrackingPartitions(Collection<ResultPartitionID> partitionIDS) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        try {
+            partitionTracker.stopTrackingAndReleasePartitions(partitionIDS);

Review comment:
       I think `JobMaster#stopTrackingPartitions()` is invoked because the 
partitions are already identified as unavailable.
   While `stopTrackingAndReleasePartitions()` will issue request to shuffle 
master to release these partitions.
   I think what we really need is a 
`partitionTracker.stopTrackingPartitions(...)`. However, we still need to take 
care of the case that an external partitions occupies TM local resources, which 
is a bit weird though.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ShuffleMaster}. */
+public class ShuffleMasterTest {
+
+    public static final String STOP_TRACKING_PARTITION_KEY = 
"stop_tracking_partition_key";
+
+    @Test
+    public void testShuffleMasterLifeCycle() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(false))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "releasePartitionExternally",
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+        TestShuffleMaster.partitionEvents.clear();
+    }
+
+    @Test
+    public void testStopTrackingPartition() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(true))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",

Review comment:
       It's better to introduce some constants for these strings.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ShuffleMaster}. */
+public class ShuffleMasterTest {
+
+    public static final String STOP_TRACKING_PARTITION_KEY = 
"stop_tracking_partition_key";
+
+    @Test
+    public void testShuffleMasterLifeCycle() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(false))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "releasePartitionExternally",
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+        TestShuffleMaster.partitionEvents.clear();
+    }
+
+    @Test
+    public void testStopTrackingPartition() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(true))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "releasePartitionExternally",
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+        TestShuffleMaster.partitionEvents.clear();
+    }
+
+    private MiniClusterConfiguration createClusterConfiguration(boolean 
stopTrackingPartition) {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS,
+                TestShuffleServiceFactory.class.getName());
+        configuration.setString(RestOptions.BIND_PORT, "0");
+        configuration.setBoolean(STOP_TRACKING_PARTITION_KEY, 
stopTrackingPartition);
+        return new MiniClusterConfiguration.Builder()
+                .setNumTaskManagers(1)
+                .setNumSlotsPerTaskManager(1)
+                .setConfiguration(configuration)
+                .build();
+    }
+
+    private JobGraph createJobGraph() throws Exception {
+        JobVertex source = new JobVertex("source");
+        source.setParallelism(2);
+        source.setInvokableClass(NoOpInvokable.class);
+
+        JobVertex sink = new JobVertex("sink");
+        sink.setParallelism(2);
+        sink.setInvokableClass(NoOpInvokable.class);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
+        ExecutionConfig config = new ExecutionConfig();
+        config.setRestartStrategy(fixedDelayRestart(2, Time.seconds(2)));
+        jobGraph.setExecutionConfig(config);
+        return jobGraph;
+    }
+
+    /** An {@link TestShuffleServiceFactory} implementation for testing. */
+    public static class TestShuffleServiceFactory extends 
NettyShuffleServiceFactory {
+        @Override
+        public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext 
shuffleMasterContext) {
+            return new 
TestShuffleMaster(shuffleMasterContext.getConfiguration());
+        }
+    }
+
+    /** An {@link ShuffleMaster} implementation for testing. */
+    public static class TestShuffleMaster extends NettyShuffleMaster {
+
+        private static final AtomicReference<TestShuffleMaster> 
currentInstance =
+                new AtomicReference<>();
+
+        private static final BlockingQueue<String> partitionEvents = new 
LinkedBlockingQueue<>();
+
+        private final AtomicBoolean started = new AtomicBoolean();
+
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        private final BlockingQueue<ResultPartitionID> partitions = new 
LinkedBlockingQueue<>();
+
+        private final AtomicReference<JobShuffleContext> jobContext = new 
AtomicReference<>();
+
+        private final boolean stopTrackingPartition;
+
+        public TestShuffleMaster(Configuration conf) {
+            super(conf);
+            this.stopTrackingPartition = 
conf.getBoolean(STOP_TRACKING_PARTITION_KEY, false);
+            currentInstance.set(this);
+        }
+
+        @Override
+        public void start() throws Exception {
+            assertFalse(started.get());
+            assertFalse(closed.get());
+            started.set(true);
+            super.start();
+        }
+
+        @Override
+        public void close() throws Exception {
+            assertShuffleMasterAlive();
+            closed.set(true);
+            super.close();
+        }
+
+        @Override
+        public void registerJob(JobShuffleContext context) {
+            assertShuffleMasterAlive();
+            assertTrue(jobContext.compareAndSet(null, context));
+            super.registerJob(context);
+        }
+
+        @Override
+        public void unregisterJob(JobID jobID) {
+            assertJobRegistered();
+            jobContext.set(null);
+            super.unregisterJob(jobID);
+        }
+
+        @Override
+        public CompletableFuture<NettyShuffleDescriptor> 
registerPartitionWithProducer(
+                JobID jobID,
+                PartitionDescriptor partitionDescriptor,
+                ProducerDescriptor producerDescriptor) {
+            assertJobRegistered();
+            partitionEvents.add("registerPartitionWithProducer");
+
+            CompletableFuture<NettyShuffleDescriptor> future = new 
CompletableFuture<>();
+            try {
+                NettyShuffleDescriptor shuffleDescriptor =
+                        super.registerPartitionWithProducer(
+                                        jobID, partitionDescriptor, 
producerDescriptor)
+                                .get();
+                if (partitions.size() == 1 && stopTrackingPartition) {

Review comment:
       It's better to add some explanation for what we want to achieve here. I 
feel it is not easy to understand at the first glance.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
##########
@@ -31,7 +31,39 @@
  * @param <T> partition shuffle descriptor used for producer/consumer 
deployment and their data
  *     exchange.
  */
-public interface ShuffleMaster<T extends ShuffleDescriptor> {
+public interface ShuffleMaster<T extends ShuffleDescriptor> extends 
AutoCloseable {
+
+    /**
+     * Starts this shuffle master as a service. One can do some initialization 
here, for example
+     * getting access and connecting to the external system.
+     */
+    default void start() throws Exception {}
+
+    /**
+     * Closes this shuffle master service which should release all resources. 
A shuffle master will
+     * only be closed when the cluster is shut down.
+     */
+    @Override
+    default void close() throws Exception {}
+
+    /**
+     * Registers the target job together with the corresponding {@link 
JobShuffleContext} to this
+     * shuffle master. Through the shuffle context, one can obtain some basic 
information like job
+     * ID, job configuration and all result partitions produced. Besides, by 
stopping tracking the
+     * lost result partition, one can remove and reproduce them.

Review comment:
       >>> Besides, by stopping tracking the lost result partition, one can 
remove and reproduce them.
   
   "It enables ShuffleMaster to notify JobMaster about lost result partitions, 
so that JobMaster can identify and reproduce unavailable partitions earlier."
   
   I feel that state it like this might be easier for understanding. WDYT?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ShuffleMaster}. */
+public class ShuffleMasterTest {
+
+    public static final String STOP_TRACKING_PARTITION_KEY = 
"stop_tracking_partition_key";
+
+    @Test
+    public void testShuffleMasterLifeCycle() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(false))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "releasePartitionExternally",
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+        TestShuffleMaster.partitionEvents.clear();
+    }
+
+    @Test
+    public void testStopTrackingPartition() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(true))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "releasePartitionExternally",
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+        TestShuffleMaster.partitionEvents.clear();
+    }
+
+    private MiniClusterConfiguration createClusterConfiguration(boolean 
stopTrackingPartition) {
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS,
+                TestShuffleServiceFactory.class.getName());
+        configuration.setString(RestOptions.BIND_PORT, "0");
+        configuration.setBoolean(STOP_TRACKING_PARTITION_KEY, 
stopTrackingPartition);
+        return new MiniClusterConfiguration.Builder()
+                .setNumTaskManagers(1)
+                .setNumSlotsPerTaskManager(1)
+                .setConfiguration(configuration)
+                .build();
+    }
+
+    private JobGraph createJobGraph() throws Exception {
+        JobVertex source = new JobVertex("source");
+        source.setParallelism(2);
+        source.setInvokableClass(NoOpInvokable.class);
+
+        JobVertex sink = new JobVertex("sink");
+        sink.setParallelism(2);
+        sink.setInvokableClass(NoOpInvokable.class);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
+        ExecutionConfig config = new ExecutionConfig();
+        config.setRestartStrategy(fixedDelayRestart(2, Time.seconds(2)));
+        jobGraph.setExecutionConfig(config);
+        return jobGraph;
+    }
+
+    /** An {@link TestShuffleServiceFactory} implementation for testing. */
+    public static class TestShuffleServiceFactory extends 
NettyShuffleServiceFactory {
+        @Override
+        public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext 
shuffleMasterContext) {
+            return new 
TestShuffleMaster(shuffleMasterContext.getConfiguration());
+        }
+    }
+
+    /** An {@link ShuffleMaster} implementation for testing. */
+    public static class TestShuffleMaster extends NettyShuffleMaster {

Review comment:
       can be private

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ShuffleMaster}. */
+public class ShuffleMasterTest {

Review comment:
       should extend `TestLogger`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.NettyShuffleServiceFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ShuffleMaster}. */
+public class ShuffleMasterTest {
+
+    public static final String STOP_TRACKING_PARTITION_KEY = 
"stop_tracking_partition_key";
+
+    @Test
+    public void testShuffleMasterLifeCycle() throws Exception {
+        try (MiniCluster cluster = new 
MiniCluster(createClusterConfiguration(false))) {
+            cluster.start();
+            cluster.executeJobBlocking(createJobGraph());
+        }
+        assertTrue(TestShuffleMaster.currentInstance.get().closed.get());
+
+        String[] expectedPartitionEvents =
+                new String[] {
+                    "registerPartitionWithProducer",
+                    "registerPartitionWithProducer",
+                    "releasePartitionExternally",
+                    "releasePartitionExternally",
+                };
+        assertArrayEquals(expectedPartitionEvents, 
TestShuffleMaster.partitionEvents.toArray());
+        TestShuffleMaster.partitionEvents.clear();

Review comment:
       It's better to clear it in `@Before` to avoid any other case forget to 
do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to