This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4fe3560015cd9cc076afad470228a9565d557935 Author: Weihua Hu <[email protected]> AuthorDate: Wed May 24 18:06:06 2023 +0800 [FLINK-32201][runtime]Automatically determine if the shuffle descriptor needs to be offloaded by the blob server based on the number of ShuffleDescriptor edges. This closes #22674 --- .../org/apache/flink/runtime/blob/BlobWriter.java | 24 ++++-- .../deployment/CachedShuffleDescriptors.java | 6 +- .../TaskDeploymentDescriptorFactory.java | 60 +++++++++++-- .../DefaultExecutionGraphBuilder.java | 7 +- .../deployment/CachedShuffleDescriptorsTest.java | 3 +- .../TaskDeploymentDescriptorFactoryTest.java | 37 +++++--- .../BlockingResultPartitionReleaseTest.java | 4 +- .../RemoveCachedShuffleDescriptorTest.java | 98 ++++++++++++++++++---- .../runtime/scheduler/SchedulerTestingUtils.java | 13 ++- 9 files changed, 201 insertions(+), 51 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java index 4b284a2d1fe..2d5292b42cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java @@ -102,16 +102,22 @@ public interface BlobWriter { if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) { return Either.Left(serializedValue); } else { - try { - final PermanentBlobKey permanentBlobKey = - blobWriter.putPermanent(jobId, serializedValue.getByteArray()); - - return Either.Right(permanentBlobKey); - } catch (IOException e) { - LOG.warn("Failed to offload value for job {} to BLOB store.", jobId, e); + return offloadWithException(serializedValue, jobId, blobWriter); + } + } - return Either.Left(serializedValue); - } + static <T> Either<SerializedValue<T>, PermanentBlobKey> offloadWithException( + SerializedValue<T> serializedValue, JobID jobId, BlobWriter blobWriter) { + Preconditions.checkNotNull(serializedValue); + Preconditions.checkNotNull(jobId); + Preconditions.checkNotNull(blobWriter); + try { + final PermanentBlobKey permanentBlobKey = + blobWriter.putPermanent(jobId, serializedValue.getByteArray()); + return Either.Right(permanentBlobKey); + } catch (IOException e) { + LOG.warn("Failed to offload value for job {} to BLOB store.", jobId, e); + return Either.Left(serializedValue); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java index e50cf727f53..029b5b9383f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java @@ -53,10 +53,14 @@ public class CachedShuffleDescriptors { /** Stores the mapping of resultPartitionId to index subscripts in consumed partition group. */ private final Map<IntermediateResultPartitionID, Integer> resultPartitionIdToIndex; + /** The number of consumers for {@link ConsumedPartitionGroup}. */ + private final int numConsumers; + public CachedShuffleDescriptors( ConsumedPartitionGroup consumedPartitionGroup, ShuffleDescriptorAndIndex[] shuffleDescriptors) { this.resultPartitionIdToIndex = new HashMap<>(); + this.numConsumers = consumedPartitionGroup.getNumConsumers(); int index = 0; for (IntermediateResultPartitionID resultPartitionID : consumedPartitionGroup) { resultPartitionIdToIndex.put(resultPartitionID, index++); @@ -80,7 +84,7 @@ public class CachedShuffleDescriptors { if (!toBeSerialized.isEmpty()) { MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializedShuffleDescriptor = shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor( - toBeSerialized.toArray(new ShuffleDescriptorAndIndex[0])); + toBeSerialized.toArray(new ShuffleDescriptorAndIndex[0]), numConsumers); toBeSerialized.clear(); serializedShuffleDescriptors.add(serializedShuffleDescriptor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java index 2b0b7ecaeca..4f1b8ea0546 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.deployment; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; @@ -58,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -66,6 +69,21 @@ import static org.apache.flink.util.Preconditions.checkState; * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}. */ public class TaskDeploymentDescriptorFactory { + /** + * This is an expert option, that we do not want to expose in the documentation. The default + * value is good enough for almost all cases + */ + @Experimental + public static final ConfigOption<Integer> OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD = + key("jobmanager.task-deployment.offload-shuffle-descriptors-to-blob-server.threshold-num") + .intType() + .defaultValue(2048 * 2048) + .withDescription( + "Threshold for offloading shuffle descriptors to blob server. Once the number of shuffle descriptors" + + " exceeds this value, we will offload the shuffle descriptors to blob server." + + " This default value means JobManager need to serialize and transport" + + " 2048 shuffle descriptors (almost 32KB) to 2048 consumers (64MB in total)"); + private final MaybeOffloaded<JobInformation> serializedJobInformation; private final JobID jobID; private final PartitionLocationConstraint partitionDeploymentConstraint; @@ -77,13 +95,15 @@ public class TaskDeploymentDescriptorFactory { JobID jobID, PartitionLocationConstraint partitionDeploymentConstraint, BlobWriter blobWriter, - boolean nonFinishedHybridPartitionShouldBeUnknown) { + boolean nonFinishedHybridPartitionShouldBeUnknown, + int offloadShuffleDescriptorsThreshold) { this.serializedJobInformation = getSerializedJobInformation(jobInformationOrBlobKey); this.jobID = jobID; this.partitionDeploymentConstraint = partitionDeploymentConstraint; this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; this.shuffleDescriptorSerializer = - new DefaultShuffleDescriptorSerializer(jobID, blobWriter); + new DefaultShuffleDescriptorSerializer( + jobID, blobWriter, offloadShuffleDescriptorsThreshold); } public MaybeOffloaded<JobInformation> getSerializedJobInformation() { @@ -428,29 +448,44 @@ public class TaskDeploymentDescriptorFactory { /** Serialize shuffle descriptors. */ interface ShuffleDescriptorSerializer { + /** + * Serialize and try offload shuffle descriptors. + * + * @param shuffleDescriptors to serialize + * @param numConsumer consumers number of these shuffle descriptors, it means how many times + * serialized shuffle descriptor should be sent + * @return offloaded or non-offloaded serialized shuffle descriptors + */ MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( - ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException; + ShuffleDescriptorAndIndex[] shuffleDescriptors, int numConsumer) throws IOException; } private static class DefaultShuffleDescriptorSerializer implements ShuffleDescriptorSerializer { private final JobID jobID; private final BlobWriter blobWriter; + private final int offloadShuffleDescriptorsThreshold; - public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter blobWriter) { + public DefaultShuffleDescriptorSerializer( + JobID jobID, BlobWriter blobWriter, int offloadShuffleDescriptorsThreshold) { this.jobID = checkNotNull(jobID); this.blobWriter = checkNotNull(blobWriter); + this.offloadShuffleDescriptorsThreshold = offloadShuffleDescriptorsThreshold; } @Override public MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( - ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException { + ShuffleDescriptorAndIndex[] shuffleDescriptors, int numConsumer) + throws IOException { final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> compressedSerializedValue = CompressedSerializedValue.fromObject(shuffleDescriptors); final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, PermanentBlobKey> serializedValueOrBlobKey = - BlobWriter.tryOffload(compressedSerializedValue, jobID, blobWriter); + shouldOffload(shuffleDescriptors, numConsumer) + ? BlobWriter.offloadWithException( + compressedSerializedValue, jobID, blobWriter) + : Either.Left(compressedSerializedValue); if (serializedValueOrBlobKey.isLeft()) { return new TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left()); @@ -458,5 +493,18 @@ public class TaskDeploymentDescriptorFactory { return new TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right()); } } + + /** + * Determine whether shuffle descriptors should be offloaded to blob server. + * + * @param shuffleDescriptorsToSerialize shuffle descriptors to serialize + * @param numConsumers how many consumers this serialized shuffle descriptor should be sent + * @return whether shuffle descriptors should be offloaded to blob server + */ + private boolean shouldOffload( + ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int numConsumers) { + return shuffleDescriptorsToSerialize.length * numConsumers + >= offloadShuffleDescriptorsThreshold; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index e01cda80dfd..5f9aa919e76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -118,6 +118,10 @@ public class DefaultExecutionGraphBuilder { PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory( jobManagerConfig); + final int offloadShuffleDescriptorsThreshold = + jobManagerConfig.get( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD); + final TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory; try { taskDeploymentDescriptorFactory = @@ -126,7 +130,8 @@ public class DefaultExecutionGraphBuilder { jobId, partitionLocationConstraint, blobWriter, - nonFinishedHybridPartitionShouldBeUnknown); + nonFinishedHybridPartitionShouldBeUnknown, + offloadShuffleDescriptorsThreshold); } catch (IOException e) { throw new JobException("Could not create the TaskDeploymentDescriptorFactory.", e); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java index f2007d4d930..6ad6270a8cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java @@ -215,7 +215,8 @@ class CachedShuffleDescriptorsTest { @Override public MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( - ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException { + ShuffleDescriptorAndIndex[] shuffleDescriptors, int numConsumer) + throws IOException { return new NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptors)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java index fd3e7bc813e..a1409f35d66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.deployment; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.TestingBlobWriter; @@ -74,19 +75,27 @@ class TaskDeploymentDescriptorFactoryTest { @Test void testCacheShuffleDescriptorAsNonOffloaded() throws Exception { - testCacheShuffleDescriptor(new TestingBlobWriter(Integer.MAX_VALUE)); + final Configuration jobMasterConfig = new Configuration(); + jobMasterConfig.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, + Integer.MAX_VALUE); + testCacheShuffleDescriptor(jobMasterConfig); } @Test void testCacheShuffleDescriptorAsOffloaded() throws Exception { - testCacheShuffleDescriptor(new TestingBlobWriter(0)); + final Configuration jobMasterConfig = new Configuration(); + jobMasterConfig.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, 0); + testCacheShuffleDescriptor(jobMasterConfig); } - private void testCacheShuffleDescriptor(TestingBlobWriter blobWriter) throws Exception { + private void testCacheShuffleDescriptor(Configuration jobMasterConfig) throws Exception { final JobID jobId = new JobID(); + final TestingBlobWriter blobWriter = new TestingBlobWriter(); final Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = - setupExecutionGraphAndGetVertices(jobId, blobWriter); + setupExecutionGraphAndGetVertices(jobId, blobWriter, jobMasterConfig); final ExecutionVertex ev21 = executionJobVertices.f1.getTaskVertices()[0]; createTaskDeploymentDescriptor(ev21); @@ -153,9 +162,12 @@ class TaskDeploymentDescriptorFactoryTest { final TestingBlobWriter blobWriter = new TestingBlobWriter(0); final JobID jobId = new JobID(); + final Configuration jobMasterConfig = new Configuration(); + jobMasterConfig.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, 0); final Tuple2<ExecutionJobVertex, ExecutionJobVertex> executionJobVertices = - setupExecutionGraphAndGetVertices(jobId, blobWriter); + setupExecutionGraphAndGetVertices(jobId, blobWriter, jobMasterConfig); final ExecutionVertex ev21 = executionJobVertices.f1.getTaskVertices()[0]; final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(ev21); @@ -166,19 +178,21 @@ class TaskDeploymentDescriptorFactoryTest { } private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices( - JobID jobId, BlobWriter blobWriter) throws Exception { + JobID jobId, BlobWriter blobWriter, Configuration jobMasterConfig) throws Exception { return setupExecutionGraphAndGetVertices( jobId, blobWriter, ResultPartitionType.BLOCKING, - ResultPartitionType::isBlockingOrBlockingPersistentResultPartition); + ResultPartitionType::isBlockingOrBlockingPersistentResultPartition, + jobMasterConfig); } private Tuple2<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraphAndGetVertices( JobID jobId, BlobWriter blobWriter, ResultPartitionType resultPartitionType, - MarkPartitionFinishedStrategy markPartitionFinishedStrategy) + MarkPartitionFinishedStrategy markPartitionFinishedStrategy, + Configuration jobMasterConfig) throws Exception { final JobVertex v1 = createJobVertex("v1", PARALLELISM); final JobVertex v2 = createJobVertex("v2", PARALLELISM); @@ -188,7 +202,8 @@ class TaskDeploymentDescriptorFactoryTest { final List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2)); final ExecutionGraph executionGraph = - createExecutionGraph(jobId, ordered, blobWriter, markPartitionFinishedStrategy); + createExecutionGraph( + jobId, ordered, blobWriter, markPartitionFinishedStrategy, jobMasterConfig); return Tuple2.of( executionGraph.getJobVertex(v1.getID()), executionGraph.getJobVertex(v2.getID())); @@ -229,7 +244,8 @@ class TaskDeploymentDescriptorFactoryTest { final JobID jobId, final List<JobVertex> jobVertices, final BlobWriter blobWriter, - final MarkPartitionFinishedStrategy markPartitionFinishedStrategy) + final MarkPartitionFinishedStrategy markPartitionFinishedStrategy, + final Configuration jobMasterConfig) throws JobException, JobExecutionException { final JobGraph jobGraph = @@ -239,6 +255,7 @@ class TaskDeploymentDescriptorFactoryTest { .build(); return TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobMasterConfig(jobMasterConfig) .setJobGraph(jobGraph) .setBlobWriter(blobWriter) .setMarkPartitionFinishedStrategy(markPartitionFinishedStrategy) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java index af1c543c509..3ef2e049b73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/BlockingResultPartitionReleaseTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.TestingBlobWriter; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; @@ -106,7 +107,8 @@ class BlockingResultPartitionReleaseTest { mainThreadExecutor, ioExecutor, partitionTracker, - EXECUTOR_RESOURCE.getExecutor()); + EXECUTOR_RESOURCE.getExecutor(), + new Configuration()); ExecutionGraph executionGraph = scheduler.getExecutionGraph(); assertThat(partitionTracker.releasedPartitions).isEmpty(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java index 07c863ab99b..2b956b8fafe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.TestingBlobWriter; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.deployment.CachedShuffleDescriptors; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; @@ -92,7 +94,8 @@ class RemoveCachedShuffleDescriptorTest { @Test void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception { // Here we expect no offloaded BLOB. - testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0); + testRemoveCacheForAllToAllEdgeAfterFinished( + new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0); } @Test @@ -102,18 +105,32 @@ class RemoveCachedShuffleDescriptorTest { // edge (1). // When the downstream tasks are finished, the cache for ShuffleDescriptors should be // removed. - testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 4, 3); + testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 0, 4, 3); } private void testRemoveCacheForAllToAllEdgeAfterFinished( - TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception { + TestingBlobWriter blobWriter, + int offloadShuffleDescriptorsThreshold, + int expectedBefore, + int expectedAfter) + throws Exception { final JobID jobId = new JobID(); final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", PARALLELISM); final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", PARALLELISM); + final Configuration jobMasterConfiguration = new Configuration(); + jobMasterConfiguration.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, + offloadShuffleDescriptorsThreshold); final SchedulerBase scheduler = - createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.ALL_TO_ALL, blobWriter); + createSchedulerAndDeploy( + jobId, + v1, + v2, + DistributionPattern.ALL_TO_ALL, + blobWriter, + jobMasterConfiguration); final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); executionInMainThread( @@ -145,7 +162,8 @@ class RemoveCachedShuffleDescriptorTest { @Test void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception { - testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0); + testRemoveCacheForAllToAllEdgeAfterFailover( + new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0); } @Test @@ -155,18 +173,32 @@ class RemoveCachedShuffleDescriptorTest { // edge (1). // When the failover occurs for upstream tasks, the cache for ShuffleDescriptors should be // removed. - testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 4, 3); + testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 0, 4, 3); } private void testRemoveCacheForAllToAllEdgeAfterFailover( - TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception { + TestingBlobWriter blobWriter, + int offloadShuffleDescriptorsThreshold, + int expectedBefore, + int expectedAfter) + throws Exception { final JobID jobId = new JobID(); final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", PARALLELISM); final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", PARALLELISM); + final Configuration jobMasterConfiguration = new Configuration(); + jobMasterConfiguration.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, + offloadShuffleDescriptorsThreshold); final SchedulerBase scheduler = - createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.ALL_TO_ALL, blobWriter); + createSchedulerAndDeploy( + jobId, + v1, + v2, + DistributionPattern.ALL_TO_ALL, + blobWriter, + jobMasterConfiguration); final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); executionInMainThread( @@ -196,7 +228,7 @@ class RemoveCachedShuffleDescriptorTest { @Test void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception { testRemoveCacheForPointwiseEdgeAfterFinished( - new TestingBlobWriter(Integer.MAX_VALUE), 0, 0); + new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0); } @Test @@ -206,18 +238,32 @@ class RemoveCachedShuffleDescriptorTest { // edges (4). // When the downstream tasks are finished, the cache for ShuffleDescriptors should be // removed. - testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 7, 6); + testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 0, 7, 6); } private void testRemoveCacheForPointwiseEdgeAfterFinished( - TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception { + TestingBlobWriter blobWriter, + int offloadShuffleDescriptorsThreshold, + int expectedBefore, + int expectedAfter) + throws Exception { final JobID jobId = new JobID(); final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", PARALLELISM); final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", PARALLELISM); + final Configuration jobMasterConfiguration = new Configuration(); + jobMasterConfiguration.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, + offloadShuffleDescriptorsThreshold); final SchedulerBase scheduler = - createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.POINTWISE, blobWriter); + createSchedulerAndDeploy( + jobId, + v1, + v2, + DistributionPattern.POINTWISE, + blobWriter, + jobMasterConfiguration); final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); executionInMainThread( @@ -264,7 +310,7 @@ class RemoveCachedShuffleDescriptorTest { @Test void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception { testRemoveCacheForPointwiseEdgeAfterFailover( - new TestingBlobWriter(Integer.MAX_VALUE), 0, 0); + new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0); } @Test @@ -274,18 +320,32 @@ class RemoveCachedShuffleDescriptorTest { // edges (4). // When the failover occurs for upstream tasks, the cache for ShuffleDescriptors should be // removed. - testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 7, 6); + testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 0, 7, 6); } private void testRemoveCacheForPointwiseEdgeAfterFailover( - TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception { + TestingBlobWriter blobWriter, + int offloadShuffleDescriptorsThreshold, + int expectedBefore, + int expectedAfter) + throws Exception { final JobID jobId = new JobID(); final JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", PARALLELISM); final JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", PARALLELISM); + final Configuration jobMasterConfiguration = new Configuration(); + jobMasterConfiguration.set( + TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, + offloadShuffleDescriptorsThreshold); final SchedulerBase scheduler = - createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.POINTWISE, blobWriter); + createSchedulerAndDeploy( + jobId, + v1, + v2, + DistributionPattern.POINTWISE, + blobWriter, + jobMasterConfiguration); final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); executionInMainThread( @@ -328,7 +388,8 @@ class RemoveCachedShuffleDescriptorTest { JobVertex v1, JobVertex v2, DistributionPattern distributionPattern, - BlobWriter blobWriter) + BlobWriter blobWriter, + Configuration jobMasterConfiguration) throws Exception { return SchedulerTestingUtils.createSchedulerAndDeploy( false, @@ -340,7 +401,8 @@ class RemoveCachedShuffleDescriptorTest { mainThreadExecutor, ioExecutor, NoOpJobMasterPartitionTracker.INSTANCE, - EXECUTOR_RESOURCE.getExecutor()); + EXECUTOR_RESOURCE.getExecutor(), + jobMasterConfiguration); } private void triggerGlobalFailoverAndComplete( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index c6497b9e96b..74d8c3e9e9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -332,7 +333,8 @@ public class SchedulerTestingUtils { ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor, JobMasterPartitionTracker partitionTracker, - ScheduledExecutorService scheduledExecutor) + ScheduledExecutorService scheduledExecutor, + Configuration jobMasterConfiguration) throws Exception { final List<JobVertex> vertices = new ArrayList<>(Collections.singletonList(producer)); IntermediateDataSetID dataSetId = new IntermediateDataSetID(); @@ -351,7 +353,8 @@ public class SchedulerTestingUtils { mainThreadExecutor, ioExecutor, partitionTracker, - scheduledExecutor); + scheduledExecutor, + jobMasterConfiguration); final ExecutionGraph executionGraph = scheduler.getExecutionGraph(); final TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder(); @@ -402,7 +405,8 @@ public class SchedulerTestingUtils { ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor, JobMasterPartitionTracker partitionTracker, - ScheduledExecutorService scheduledExecutor) + ScheduledExecutorService scheduledExecutor, + Configuration jobMasterConfiguration) throws Exception { final JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder() @@ -415,7 +419,8 @@ public class SchedulerTestingUtils { .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0)) .setBlobWriter(blobWriter) .setIoExecutor(ioExecutor) - .setPartitionTracker(partitionTracker); + .setPartitionTracker(partitionTracker) + .setJobMasterConfiguration(jobMasterConfiguration); return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : builder.build(); }
