This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 29e452eedfd2f66b32a55f0cda9543eb01260085 Author: Weihua Hu <[email protected]> AuthorDate: Thu Jun 29 14:44:00 2023 +0800 [FLINK-32201][runtime] Abstract shuffle descriptor serialize related logic to ShuffleDescriptorSerializer. --- .../deployment/CachedShuffleDescriptors.java | 9 +--- .../TaskDeploymentDescriptorFactory.java | 60 ++++++++++++++-------- .../deployment/CachedShuffleDescriptorsTest.java | 25 +++++---- 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java index 3a2733d2eaf..e50cf727f53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.util.function.FunctionWithException; import java.io.IOException; import java.util.ArrayDeque; @@ -76,15 +75,11 @@ public class CachedShuffleDescriptors { } public void serializeShuffleDescriptors( - FunctionWithException< - ShuffleDescriptorAndIndex[], - MaybeOffloaded<ShuffleDescriptorAndIndex[]>, - IOException> - shuffleDescriptorSerializer) + TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer shuffleDescriptorSerializer) throws IOException { if (!toBeSerialized.isEmpty()) { MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializedShuffleDescriptor = - shuffleDescriptorSerializer.apply( + shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor( toBeSerialized.toArray(new ShuffleDescriptorAndIndex[0])); toBeSerialized.clear(); serializedShuffleDescriptors.add(serializedShuffleDescriptor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java index 385a8e1ed69..2b0b7ecaeca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -68,8 +69,8 @@ public class TaskDeploymentDescriptorFactory { private final MaybeOffloaded<JobInformation> serializedJobInformation; private final JobID jobID; private final PartitionLocationConstraint partitionDeploymentConstraint; - private final BlobWriter blobWriter; private final boolean nonFinishedHybridPartitionShouldBeUnknown; + private final ShuffleDescriptorSerializer shuffleDescriptorSerializer; public TaskDeploymentDescriptorFactory( Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey, @@ -80,8 +81,9 @@ public class TaskDeploymentDescriptorFactory { this.serializedJobInformation = getSerializedJobInformation(jobInformationOrBlobKey); this.jobID = jobID; this.partitionDeploymentConstraint = partitionDeploymentConstraint; - this.blobWriter = blobWriter; this.nonFinishedHybridPartitionShouldBeUnknown = nonFinishedHybridPartitionShouldBeUnknown; + this.shuffleDescriptorSerializer = + new DefaultShuffleDescriptorSerializer(jobID, blobWriter); } public MaybeOffloaded<JobInformation> getSerializedJobInformation() { @@ -192,8 +194,7 @@ public class TaskDeploymentDescriptorFactory { computeConsumedPartitionShuffleDescriptors( consumedPartitionGroup, internalExecutionGraphAccessor)); } - cachedShuffleDescriptors.serializeShuffleDescriptors( - this::serializeAndTryOffloadShuffleDescriptor); + cachedShuffleDescriptors.serializeShuffleDescriptors(shuffleDescriptorSerializer); return cachedShuffleDescriptors.getAllSerializedShuffleDescriptors(); } @@ -220,23 +221,6 @@ public class TaskDeploymentDescriptorFactory { return shuffleDescriptors; } - private MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( - ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException { - - final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> compressedSerializedValue = - CompressedSerializedValue.fromObject(shuffleDescriptors); - - final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, PermanentBlobKey> - serializedValueOrBlobKey = - BlobWriter.tryOffload(compressedSerializedValue, jobID, blobWriter); - - if (serializedValueOrBlobKey.isLeft()) { - return new TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left()); - } else { - return new TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right()); - } - } - private static Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) { final InternalExecutionGraphAccessor internalExecutionGraphAccessor = @@ -441,4 +425,38 @@ public class TaskDeploymentDescriptorFactory { return index; } } + + /** Serialize shuffle descriptors. */ + interface ShuffleDescriptorSerializer { + MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( + ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException; + } + + private static class DefaultShuffleDescriptorSerializer implements ShuffleDescriptorSerializer { + private final JobID jobID; + private final BlobWriter blobWriter; + + public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter blobWriter) { + this.jobID = checkNotNull(jobID); + this.blobWriter = checkNotNull(blobWriter); + } + + @Override + public MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( + ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException { + + final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> compressedSerializedValue = + CompressedSerializedValue.fromObject(shuffleDescriptors); + + final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, PermanentBlobKey> + serializedValueOrBlobKey = + BlobWriter.tryOffload(compressedSerializedValue, jobID, blobWriter); + + if (serializedValueOrBlobKey.isLeft()) { + return new TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left()); + } else { + return new TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right()); + } + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java index 8845deb8bc4..f2007d4d930 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java @@ -85,7 +85,7 @@ class CachedShuffleDescriptorsTest { assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).isEmpty(); cachedShuffleDescriptors.serializeShuffleDescriptors( - CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor); + new TestingShuffleDescriptorSerializer()); assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).hasSize(1); MaybeOffloaded<ShuffleDescriptorAndIndex[]> maybeOffloadedShuffleDescriptor = cachedShuffleDescriptors.getAllSerializedShuffleDescriptors().get(0); @@ -122,13 +122,13 @@ class CachedShuffleDescriptorsTest { new CachedShuffleDescriptors( consumedPartitionGroup1, createSingleShuffleDescriptorAndIndex(shuffleDescriptor, 0)); - cachedShuffleDescriptors.serializeShuffleDescriptors( - CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor); + TestingShuffleDescriptorSerializer testingShuffleDescriptorSerializer = + new TestingShuffleDescriptorSerializer(); + cachedShuffleDescriptors.serializeShuffleDescriptors(testingShuffleDescriptorSerializer); cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition1); cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition2); - cachedShuffleDescriptors.serializeShuffleDescriptors( - CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor); + cachedShuffleDescriptors.serializeShuffleDescriptors(testingShuffleDescriptorSerializer); assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).hasSize(2); MaybeOffloaded<ShuffleDescriptorAndIndex[]> maybeOffloaded = @@ -149,11 +149,6 @@ class CachedShuffleDescriptorsTest { Arrays.asList(0, 1)); } - private static MaybeOffloaded<ShuffleDescriptorAndIndex[]> nonOffloadedShuffleDescriptor( - ShuffleDescriptorAndIndex[] toBeSerialized) throws IOException { - return new NonOffloaded<>(CompressedSerializedValue.fromObject(toBeSerialized)); - } - private void assertNonOffloadedShuffleDescriptorAndIndexEquals( MaybeOffloaded<ShuffleDescriptorAndIndex[]> maybeOffloaded, List<ShuffleDescriptor> expectedDescriptors, @@ -214,4 +209,14 @@ class CachedShuffleDescriptorsTest { scheduler.startScheduling(); return scheduler.getExecutionGraph(); } + + private static class TestingShuffleDescriptorSerializer + implements TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer { + + @Override + public MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor( + ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException { + return new NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptors)); + } + } }
