This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4fe3560015cd9cc076afad470228a9565d557935
Author: Weihua Hu <[email protected]>
AuthorDate: Wed May 24 18:06:06 2023 +0800

    [FLINK-32201][runtime]Automatically determine if the shuffle descriptor 
needs to be offloaded by the blob server based on the number of 
ShuffleDescriptor edges.
    
    This closes #22674
---
 .../org/apache/flink/runtime/blob/BlobWriter.java  | 24 ++++--
 .../deployment/CachedShuffleDescriptors.java       |  6 +-
 .../TaskDeploymentDescriptorFactory.java           | 60 +++++++++++--
 .../DefaultExecutionGraphBuilder.java              |  7 +-
 .../deployment/CachedShuffleDescriptorsTest.java   |  3 +-
 .../TaskDeploymentDescriptorFactoryTest.java       | 37 +++++---
 .../BlockingResultPartitionReleaseTest.java        |  4 +-
 .../RemoveCachedShuffleDescriptorTest.java         | 98 ++++++++++++++++++----
 .../runtime/scheduler/SchedulerTestingUtils.java   | 13 ++-
 9 files changed, 201 insertions(+), 51 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 4b284a2d1fe..2d5292b42cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -102,16 +102,22 @@ public interface BlobWriter {
         if (serializedValue.getByteArray().length < 
blobWriter.getMinOffloadingSize()) {
             return Either.Left(serializedValue);
         } else {
-            try {
-                final PermanentBlobKey permanentBlobKey =
-                        blobWriter.putPermanent(jobId, 
serializedValue.getByteArray());
-
-                return Either.Right(permanentBlobKey);
-            } catch (IOException e) {
-                LOG.warn("Failed to offload value for job {} to BLOB store.", 
jobId, e);
+            return offloadWithException(serializedValue, jobId, blobWriter);
+        }
+    }
 
-                return Either.Left(serializedValue);
-            }
+    static <T> Either<SerializedValue<T>, PermanentBlobKey> 
offloadWithException(
+            SerializedValue<T> serializedValue, JobID jobId, BlobWriter 
blobWriter) {
+        Preconditions.checkNotNull(serializedValue);
+        Preconditions.checkNotNull(jobId);
+        Preconditions.checkNotNull(blobWriter);
+        try {
+            final PermanentBlobKey permanentBlobKey =
+                    blobWriter.putPermanent(jobId, 
serializedValue.getByteArray());
+            return Either.Right(permanentBlobKey);
+        } catch (IOException e) {
+            LOG.warn("Failed to offload value for job {} to BLOB store.", 
jobId, e);
+            return Either.Left(serializedValue);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
index e50cf727f53..029b5b9383f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
@@ -53,10 +53,14 @@ public class CachedShuffleDescriptors {
     /** Stores the mapping of resultPartitionId to index subscripts in 
consumed partition group. */
     private final Map<IntermediateResultPartitionID, Integer> 
resultPartitionIdToIndex;
 
+    /** The number of consumers for {@link ConsumedPartitionGroup}. */
+    private final int numConsumers;
+
     public CachedShuffleDescriptors(
             ConsumedPartitionGroup consumedPartitionGroup,
             ShuffleDescriptorAndIndex[] shuffleDescriptors) {
         this.resultPartitionIdToIndex = new HashMap<>();
+        this.numConsumers = consumedPartitionGroup.getNumConsumers();
         int index = 0;
         for (IntermediateResultPartitionID resultPartitionID : 
consumedPartitionGroup) {
             resultPartitionIdToIndex.put(resultPartitionID, index++);
@@ -80,7 +84,7 @@ public class CachedShuffleDescriptors {
         if (!toBeSerialized.isEmpty()) {
             MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializedShuffleDescriptor =
                     
shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor(
-                            toBeSerialized.toArray(new 
ShuffleDescriptorAndIndex[0]));
+                            toBeSerialized.toArray(new 
ShuffleDescriptorAndIndex[0]), numConsumers);
             toBeSerialized.clear();
             serializedShuffleDescriptors.add(serializedShuffleDescriptor);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index 2b0b7ecaeca..4f1b8ea0546 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
@@ -58,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -66,6 +69,21 @@ import static org.apache.flink.util.Preconditions.checkState;
  * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
  */
 public class TaskDeploymentDescriptorFactory {
+    /**
+     * This is an expert option, that we do not want to expose in the 
documentation. The default
+     * value is good enough for almost all cases
+     */
+    @Experimental
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+            
key("jobmanager.task-deployment.offload-shuffle-descriptors-to-blob-server.threshold-num")
+                    .intType()
+                    .defaultValue(2048 * 2048)
+                    .withDescription(
+                            "Threshold for offloading shuffle descriptors to 
blob server. Once the number of shuffle descriptors"
+                                    + " exceeds this value, we will offload 
the shuffle descriptors to blob server."
+                                    + " This default value means JobManager 
need to serialize and transport"
+                                    + " 2048 shuffle descriptors (almost 32KB) 
to 2048 consumers (64MB in total)");
+
     private final MaybeOffloaded<JobInformation> serializedJobInformation;
     private final JobID jobID;
     private final PartitionLocationConstraint partitionDeploymentConstraint;
@@ -77,13 +95,15 @@ public class TaskDeploymentDescriptorFactory {
             JobID jobID,
             PartitionLocationConstraint partitionDeploymentConstraint,
             BlobWriter blobWriter,
-            boolean nonFinishedHybridPartitionShouldBeUnknown) {
+            boolean nonFinishedHybridPartitionShouldBeUnknown,
+            int offloadShuffleDescriptorsThreshold) {
         this.serializedJobInformation = 
getSerializedJobInformation(jobInformationOrBlobKey);
         this.jobID = jobID;
         this.partitionDeploymentConstraint = partitionDeploymentConstraint;
         this.nonFinishedHybridPartitionShouldBeUnknown = 
nonFinishedHybridPartitionShouldBeUnknown;
         this.shuffleDescriptorSerializer =
-                new DefaultShuffleDescriptorSerializer(jobID, blobWriter);
+                new DefaultShuffleDescriptorSerializer(
+                        jobID, blobWriter, offloadShuffleDescriptorsThreshold);
     }
 
     public MaybeOffloaded<JobInformation> getSerializedJobInformation() {
@@ -428,29 +448,44 @@ public class TaskDeploymentDescriptorFactory {
 
     /** Serialize shuffle descriptors. */
     interface ShuffleDescriptorSerializer {
+        /**
+         * Serialize and try offload shuffle descriptors.
+         *
+         * @param shuffleDescriptors to serialize
+         * @param numConsumer consumers number of these shuffle descriptors, 
it means how many times
+         *     serialized shuffle descriptor should be sent
+         * @return offloaded or non-offloaded serialized shuffle descriptors
+         */
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException;
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;
     }
 
     private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
         private final JobID jobID;
         private final BlobWriter blobWriter;
+        private final int offloadShuffleDescriptorsThreshold;
 
-        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter 
blobWriter) {
+        public DefaultShuffleDescriptorSerializer(
+                JobID jobID, BlobWriter blobWriter, int 
offloadShuffleDescriptorsThreshold) {
             this.jobID = checkNotNull(jobID);
             this.blobWriter = checkNotNull(blobWriter);
+            this.offloadShuffleDescriptorsThreshold = 
offloadShuffleDescriptorsThreshold;
         }
 
         @Override
         public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException {
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer)
+                throws IOException {
 
             final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
                     CompressedSerializedValue.fromObject(shuffleDescriptors);
 
             final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
                     serializedValueOrBlobKey =
-                            BlobWriter.tryOffload(compressedSerializedValue, 
jobID, blobWriter);
+                            shouldOffload(shuffleDescriptors, numConsumer)
+                                    ? BlobWriter.offloadWithException(
+                                            compressedSerializedValue, jobID, 
blobWriter)
+                                    : Either.Left(compressedSerializedValue);
 
             if (serializedValueOrBlobKey.isLeft()) {
                 return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
@@ -458,5 +493,18 @@ public class TaskDeploymentDescriptorFactory {
                 return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
             }
         }
+
+        /**
+         * Determine whether shuffle descriptors should be offloaded to blob 
server.
+         *
+         * @param shuffleDescriptorsToSerialize shuffle descriptors to 
serialize
+         * @param numConsumers how many consumers this serialized shuffle 
descriptor should be sent
+         * @return whether shuffle descriptors should be offloaded to blob 
server
+         */
+        private boolean shouldOffload(
+                ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int 
numConsumers) {
+            return shuffleDescriptorsToSerialize.length * numConsumers
+                    >= offloadShuffleDescriptorsThreshold;
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index e01cda80dfd..5f9aa919e76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -118,6 +118,10 @@ public class DefaultExecutionGraphBuilder {
                 
PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(
                         jobManagerConfig);
 
+        final int offloadShuffleDescriptorsThreshold =
+                jobManagerConfig.get(
+                        
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD);
+
         final TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory;
         try {
             taskDeploymentDescriptorFactory =
@@ -126,7 +130,8 @@ public class DefaultExecutionGraphBuilder {
                             jobId,
                             partitionLocationConstraint,
                             blobWriter,
-                            nonFinishedHybridPartitionShouldBeUnknown);
+                            nonFinishedHybridPartitionShouldBeUnknown,
+                            offloadShuffleDescriptorsThreshold);
         } catch (IOException e) {
             throw new JobException("Could not create the 
TaskDeploymentDescriptorFactory.", e);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
index f2007d4d930..6ad6270a8cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
@@ -215,7 +215,8 @@ class CachedShuffleDescriptorsTest {
 
         @Override
         public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException {
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer)
+                throws IOException {
             return new 
NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptors));
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index fd3e7bc813e..a1409f35d66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.deployment;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.TestingBlobWriter;
@@ -74,19 +75,27 @@ class TaskDeploymentDescriptorFactoryTest {
 
     @Test
     void testCacheShuffleDescriptorAsNonOffloaded() throws Exception {
-        testCacheShuffleDescriptor(new TestingBlobWriter(Integer.MAX_VALUE));
+        final Configuration jobMasterConfig = new Configuration();
+        jobMasterConfig.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD,
+                Integer.MAX_VALUE);
+        testCacheShuffleDescriptor(jobMasterConfig);
     }
 
     @Test
     void testCacheShuffleDescriptorAsOffloaded() throws Exception {
-        testCacheShuffleDescriptor(new TestingBlobWriter(0));
+        final Configuration jobMasterConfig = new Configuration();
+        jobMasterConfig.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, 0);
+        testCacheShuffleDescriptor(jobMasterConfig);
     }
 
-    private void testCacheShuffleDescriptor(TestingBlobWriter blobWriter) 
throws Exception {
+    private void testCacheShuffleDescriptor(Configuration jobMasterConfig) 
throws Exception {
         final JobID jobId = new JobID();
+        final TestingBlobWriter blobWriter = new TestingBlobWriter();
 
         final Tuple2<ExecutionJobVertex, ExecutionJobVertex> 
executionJobVertices =
-                setupExecutionGraphAndGetVertices(jobId, blobWriter);
+                setupExecutionGraphAndGetVertices(jobId, blobWriter, 
jobMasterConfig);
 
         final ExecutionVertex ev21 = 
executionJobVertices.f1.getTaskVertices()[0];
         createTaskDeploymentDescriptor(ev21);
@@ -153,9 +162,12 @@ class TaskDeploymentDescriptorFactoryTest {
         final TestingBlobWriter blobWriter = new TestingBlobWriter(0);
 
         final JobID jobId = new JobID();
+        final Configuration jobMasterConfig = new Configuration();
+        jobMasterConfig.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, 0);
 
         final Tuple2<ExecutionJobVertex, ExecutionJobVertex> 
executionJobVertices =
-                setupExecutionGraphAndGetVertices(jobId, blobWriter);
+                setupExecutionGraphAndGetVertices(jobId, blobWriter, 
jobMasterConfig);
 
         final ExecutionVertex ev21 = 
executionJobVertices.f1.getTaskVertices()[0];
         final TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(ev21);
@@ -166,19 +178,21 @@ class TaskDeploymentDescriptorFactoryTest {
     }
 
     private Tuple2<ExecutionJobVertex, ExecutionJobVertex> 
setupExecutionGraphAndGetVertices(
-            JobID jobId, BlobWriter blobWriter) throws Exception {
+            JobID jobId, BlobWriter blobWriter, Configuration jobMasterConfig) 
throws Exception {
         return setupExecutionGraphAndGetVertices(
                 jobId,
                 blobWriter,
                 ResultPartitionType.BLOCKING,
-                
ResultPartitionType::isBlockingOrBlockingPersistentResultPartition);
+                
ResultPartitionType::isBlockingOrBlockingPersistentResultPartition,
+                jobMasterConfig);
     }
 
     private Tuple2<ExecutionJobVertex, ExecutionJobVertex> 
setupExecutionGraphAndGetVertices(
             JobID jobId,
             BlobWriter blobWriter,
             ResultPartitionType resultPartitionType,
-            MarkPartitionFinishedStrategy markPartitionFinishedStrategy)
+            MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
+            Configuration jobMasterConfig)
             throws Exception {
         final JobVertex v1 = createJobVertex("v1", PARALLELISM);
         final JobVertex v2 = createJobVertex("v2", PARALLELISM);
@@ -188,7 +202,8 @@ class TaskDeploymentDescriptorFactoryTest {
         final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
 
         final ExecutionGraph executionGraph =
-                createExecutionGraph(jobId, ordered, blobWriter, 
markPartitionFinishedStrategy);
+                createExecutionGraph(
+                        jobId, ordered, blobWriter, 
markPartitionFinishedStrategy, jobMasterConfig);
 
         return Tuple2.of(
                 executionGraph.getJobVertex(v1.getID()), 
executionGraph.getJobVertex(v2.getID()));
@@ -229,7 +244,8 @@ class TaskDeploymentDescriptorFactoryTest {
             final JobID jobId,
             final List<JobVertex> jobVertices,
             final BlobWriter blobWriter,
-            final MarkPartitionFinishedStrategy markPartitionFinishedStrategy)
+            final MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
+            final Configuration jobMasterConfig)
             throws JobException, JobExecutionException {
 
         final JobGraph jobGraph =
@@ -239,6 +255,7 @@ class TaskDeploymentDescriptorFactoryTest {
                         .build();
 
         return TestingDefaultExecutionGraphBuilder.newBuilder()
+                .setJobMasterConfig(jobMasterConfig)
                 .setJobGraph(jobGraph)
                 .setBlobWriter(blobWriter)
                 
.setMarkPartitionFinishedStrategy(markPartitionFinishedStrategy)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java
index af1c543c509..3ef2e049b73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TestingBlobWriter;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -106,7 +107,8 @@ class BlockingResultPartitionReleaseTest {
                         mainThreadExecutor,
                         ioExecutor,
                         partitionTracker,
-                        EXECUTOR_RESOURCE.getExecutor());
+                        EXECUTOR_RESOURCE.getExecutor(),
+                        new Configuration());
         ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
         assertThat(partitionTracker.releasedPartitions).isEmpty();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
index 07c863ab99b..2b956b8fafe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.TestingBlobWriter;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.deployment.CachedShuffleDescriptors;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -92,7 +94,8 @@ class RemoveCachedShuffleDescriptorTest {
     @Test
     void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFinished() throws 
Exception {
         // Here we expect no offloaded BLOB.
-        testRemoveCacheForAllToAllEdgeAfterFinished(new 
TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
+        testRemoveCacheForAllToAllEdgeAfterFinished(
+                new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 
0, 0);
     }
 
     @Test
@@ -102,18 +105,32 @@ class RemoveCachedShuffleDescriptorTest {
         // edge (1).
         // When the downstream tasks are finished, the cache for 
ShuffleDescriptors should be
         // removed.
-        testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 
4, 3);
+        testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 
0, 4, 3);
     }
 
     private void testRemoveCacheForAllToAllEdgeAfterFinished(
-            TestingBlobWriter blobWriter, int expectedBefore, int 
expectedAfter) throws Exception {
+            TestingBlobWriter blobWriter,
+            int offloadShuffleDescriptorsThreshold,
+            int expectedBefore,
+            int expectedAfter)
+            throws Exception {
         final JobID jobId = new JobID();
 
         final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 
PARALLELISM);
         final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 
PARALLELISM);
+        final Configuration jobMasterConfiguration = new Configuration();
+        jobMasterConfiguration.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD,
+                offloadShuffleDescriptorsThreshold);
 
         final SchedulerBase scheduler =
-                createSchedulerAndDeploy(jobId, v1, v2, 
DistributionPattern.ALL_TO_ALL, blobWriter);
+                createSchedulerAndDeploy(
+                        jobId,
+                        v1,
+                        v2,
+                        DistributionPattern.ALL_TO_ALL,
+                        blobWriter,
+                        jobMasterConfiguration);
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
         executionInMainThread(
@@ -145,7 +162,8 @@ class RemoveCachedShuffleDescriptorTest {
 
     @Test
     void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFailover() throws 
Exception {
-        testRemoveCacheForAllToAllEdgeAfterFailover(new 
TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
+        testRemoveCacheForAllToAllEdgeAfterFailover(
+                new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 
0, 0);
     }
 
     @Test
@@ -155,18 +173,32 @@ class RemoveCachedShuffleDescriptorTest {
         // edge (1).
         // When the failover occurs for upstream tasks, the cache for 
ShuffleDescriptors should be
         // removed.
-        testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 
4, 3);
+        testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 
0, 4, 3);
     }
 
     private void testRemoveCacheForAllToAllEdgeAfterFailover(
-            TestingBlobWriter blobWriter, int expectedBefore, int 
expectedAfter) throws Exception {
+            TestingBlobWriter blobWriter,
+            int offloadShuffleDescriptorsThreshold,
+            int expectedBefore,
+            int expectedAfter)
+            throws Exception {
         final JobID jobId = new JobID();
 
         final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 
PARALLELISM);
         final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 
PARALLELISM);
+        final Configuration jobMasterConfiguration = new Configuration();
+        jobMasterConfiguration.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD,
+                offloadShuffleDescriptorsThreshold);
 
         final SchedulerBase scheduler =
-                createSchedulerAndDeploy(jobId, v1, v2, 
DistributionPattern.ALL_TO_ALL, blobWriter);
+                createSchedulerAndDeploy(
+                        jobId,
+                        v1,
+                        v2,
+                        DistributionPattern.ALL_TO_ALL,
+                        blobWriter,
+                        jobMasterConfiguration);
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
         executionInMainThread(
@@ -196,7 +228,7 @@ class RemoveCachedShuffleDescriptorTest {
     @Test
     void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() throws 
Exception {
         testRemoveCacheForPointwiseEdgeAfterFinished(
-                new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
+                new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 
0, 0);
     }
 
     @Test
@@ -206,18 +238,32 @@ class RemoveCachedShuffleDescriptorTest {
         // edges (4).
         // When the downstream tasks are finished, the cache for 
ShuffleDescriptors should be
         // removed.
-        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 
7, 6);
+        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 
0, 7, 6);
     }
 
     private void testRemoveCacheForPointwiseEdgeAfterFinished(
-            TestingBlobWriter blobWriter, int expectedBefore, int 
expectedAfter) throws Exception {
+            TestingBlobWriter blobWriter,
+            int offloadShuffleDescriptorsThreshold,
+            int expectedBefore,
+            int expectedAfter)
+            throws Exception {
         final JobID jobId = new JobID();
 
         final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 
PARALLELISM);
         final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 
PARALLELISM);
+        final Configuration jobMasterConfiguration = new Configuration();
+        jobMasterConfiguration.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD,
+                offloadShuffleDescriptorsThreshold);
 
         final SchedulerBase scheduler =
-                createSchedulerAndDeploy(jobId, v1, v2, 
DistributionPattern.POINTWISE, blobWriter);
+                createSchedulerAndDeploy(
+                        jobId,
+                        v1,
+                        v2,
+                        DistributionPattern.POINTWISE,
+                        blobWriter,
+                        jobMasterConfiguration);
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
         executionInMainThread(
@@ -264,7 +310,7 @@ class RemoveCachedShuffleDescriptorTest {
     @Test
     void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFailover() throws 
Exception {
         testRemoveCacheForPointwiseEdgeAfterFailover(
-                new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
+                new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 
0, 0);
     }
 
     @Test
@@ -274,18 +320,32 @@ class RemoveCachedShuffleDescriptorTest {
         // edges (4).
         // When the failover occurs for upstream tasks, the cache for 
ShuffleDescriptors should be
         // removed.
-        testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 
7, 6);
+        testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 
0, 7, 6);
     }
 
     private void testRemoveCacheForPointwiseEdgeAfterFailover(
-            TestingBlobWriter blobWriter, int expectedBefore, int 
expectedAfter) throws Exception {
+            TestingBlobWriter blobWriter,
+            int offloadShuffleDescriptorsThreshold,
+            int expectedBefore,
+            int expectedAfter)
+            throws Exception {
         final JobID jobId = new JobID();
 
         final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 
PARALLELISM);
         final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 
PARALLELISM);
+        final Configuration jobMasterConfiguration = new Configuration();
+        jobMasterConfiguration.set(
+                
TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD,
+                offloadShuffleDescriptorsThreshold);
 
         final SchedulerBase scheduler =
-                createSchedulerAndDeploy(jobId, v1, v2, 
DistributionPattern.POINTWISE, blobWriter);
+                createSchedulerAndDeploy(
+                        jobId,
+                        v1,
+                        v2,
+                        DistributionPattern.POINTWISE,
+                        blobWriter,
+                        jobMasterConfiguration);
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
         executionInMainThread(
@@ -328,7 +388,8 @@ class RemoveCachedShuffleDescriptorTest {
             JobVertex v1,
             JobVertex v2,
             DistributionPattern distributionPattern,
-            BlobWriter blobWriter)
+            BlobWriter blobWriter,
+            Configuration jobMasterConfiguration)
             throws Exception {
         return SchedulerTestingUtils.createSchedulerAndDeploy(
                 false,
@@ -340,7 +401,8 @@ class RemoveCachedShuffleDescriptorTest {
                 mainThreadExecutor,
                 ioExecutor,
                 NoOpJobMasterPartitionTracker.INSTANCE,
-                EXECUTOR_RESOURCE.getExecutor());
+                EXECUTOR_RESOURCE.getExecutor(),
+                jobMasterConfiguration);
     }
 
     private void triggerGlobalFailoverAndComplete(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index c6497b9e96b..74d8c3e9e9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -332,7 +333,8 @@ public class SchedulerTestingUtils {
             ComponentMainThreadExecutor mainThreadExecutor,
             ScheduledExecutorService ioExecutor,
             JobMasterPartitionTracker partitionTracker,
-            ScheduledExecutorService scheduledExecutor)
+            ScheduledExecutorService scheduledExecutor,
+            Configuration jobMasterConfiguration)
             throws Exception {
         final List<JobVertex> vertices = new 
ArrayList<>(Collections.singletonList(producer));
         IntermediateDataSetID dataSetId = new IntermediateDataSetID();
@@ -351,7 +353,8 @@ public class SchedulerTestingUtils {
                         mainThreadExecutor,
                         ioExecutor,
                         partitionTracker,
-                        scheduledExecutor);
+                        scheduledExecutor,
+                        jobMasterConfiguration);
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
         final TestingLogicalSlotBuilder slotBuilder = new 
TestingLogicalSlotBuilder();
 
@@ -402,7 +405,8 @@ public class SchedulerTestingUtils {
             ComponentMainThreadExecutor mainThreadExecutor,
             ScheduledExecutorService ioExecutor,
             JobMasterPartitionTracker partitionTracker,
-            ScheduledExecutorService scheduledExecutor)
+            ScheduledExecutorService scheduledExecutor,
+            Configuration jobMasterConfiguration)
             throws Exception {
         final JobGraph jobGraph =
                 JobGraphBuilder.newBatchJobGraphBuilder()
@@ -415,7 +419,8 @@ public class SchedulerTestingUtils {
                         .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
                         .setBlobWriter(blobWriter)
                         .setIoExecutor(ioExecutor)
-                        .setPartitionTracker(partitionTracker);
+                        .setPartitionTracker(partitionTracker)
+                        .setJobMasterConfiguration(jobMasterConfiguration);
         return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : 
builder.build();
     }
 


Reply via email to