This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29e452eedfd2f66b32a55f0cda9543eb01260085
Author: Weihua Hu <[email protected]>
AuthorDate: Thu Jun 29 14:44:00 2023 +0800

    [FLINK-32201][runtime] Abstract shuffle descriptor serialize related logic 
to ShuffleDescriptorSerializer.
---
 .../deployment/CachedShuffleDescriptors.java       |  9 +---
 .../TaskDeploymentDescriptorFactory.java           | 60 ++++++++++++++--------
 .../deployment/CachedShuffleDescriptorsTest.java   | 25 +++++----
 3 files changed, 56 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
index 3a2733d2eaf..e50cf727f53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -76,15 +75,11 @@ public class CachedShuffleDescriptors {
     }
 
     public void serializeShuffleDescriptors(
-            FunctionWithException<
-                            ShuffleDescriptorAndIndex[],
-                            MaybeOffloaded<ShuffleDescriptorAndIndex[]>,
-                            IOException>
-                    shuffleDescriptorSerializer)
+            TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer 
shuffleDescriptorSerializer)
             throws IOException {
         if (!toBeSerialized.isEmpty()) {
             MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializedShuffleDescriptor =
-                    shuffleDescriptorSerializer.apply(
+                    
shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor(
                             toBeSerialized.toArray(new 
ShuffleDescriptorAndIndex[0]));
             toBeSerialized.clear();
             serializedShuffleDescriptors.add(serializedShuffleDescriptor);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index 385a8e1ed69..2b0b7ecaeca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -68,8 +69,8 @@ public class TaskDeploymentDescriptorFactory {
     private final MaybeOffloaded<JobInformation> serializedJobInformation;
     private final JobID jobID;
     private final PartitionLocationConstraint partitionDeploymentConstraint;
-    private final BlobWriter blobWriter;
     private final boolean nonFinishedHybridPartitionShouldBeUnknown;
+    private final ShuffleDescriptorSerializer shuffleDescriptorSerializer;
 
     public TaskDeploymentDescriptorFactory(
             Either<SerializedValue<JobInformation>, PermanentBlobKey> 
jobInformationOrBlobKey,
@@ -80,8 +81,9 @@ public class TaskDeploymentDescriptorFactory {
         this.serializedJobInformation = 
getSerializedJobInformation(jobInformationOrBlobKey);
         this.jobID = jobID;
         this.partitionDeploymentConstraint = partitionDeploymentConstraint;
-        this.blobWriter = blobWriter;
         this.nonFinishedHybridPartitionShouldBeUnknown = 
nonFinishedHybridPartitionShouldBeUnknown;
+        this.shuffleDescriptorSerializer =
+                new DefaultShuffleDescriptorSerializer(jobID, blobWriter);
     }
 
     public MaybeOffloaded<JobInformation> getSerializedJobInformation() {
@@ -192,8 +194,7 @@ public class TaskDeploymentDescriptorFactory {
                             computeConsumedPartitionShuffleDescriptors(
                                     consumedPartitionGroup, 
internalExecutionGraphAccessor));
         }
-        cachedShuffleDescriptors.serializeShuffleDescriptors(
-                this::serializeAndTryOffloadShuffleDescriptor);
+        
cachedShuffleDescriptors.serializeShuffleDescriptors(shuffleDescriptorSerializer);
 
         return cachedShuffleDescriptors.getAllSerializedShuffleDescriptors();
     }
@@ -220,23 +221,6 @@ public class TaskDeploymentDescriptorFactory {
         return shuffleDescriptors;
     }
 
-    private MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-            ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException 
{
-
-        final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
-                CompressedSerializedValue.fromObject(shuffleDescriptors);
-
-        final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
-                serializedValueOrBlobKey =
-                        BlobWriter.tryOffload(compressedSerializedValue, 
jobID, blobWriter);
-
-        if (serializedValueOrBlobKey.isLeft()) {
-            return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
-        } else {
-            return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
-        }
-    }
-
     private static Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]>
             getClusterPartitionShuffleDescriptors(ExecutionVertex 
executionVertex) {
         final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
@@ -441,4 +425,38 @@ public class TaskDeploymentDescriptorFactory {
             return index;
         }
     }
+
+    /** Serialize shuffle descriptors. */
+    interface ShuffleDescriptorSerializer {
+        MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException;
+    }
+
+    private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
+        private final JobID jobID;
+        private final BlobWriter blobWriter;
+
+        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter 
blobWriter) {
+            this.jobID = checkNotNull(jobID);
+            this.blobWriter = checkNotNull(blobWriter);
+        }
+
+        @Override
+        public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException {
+
+            final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
+                    CompressedSerializedValue.fromObject(shuffleDescriptors);
+
+            final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
+                    serializedValueOrBlobKey =
+                            BlobWriter.tryOffload(compressedSerializedValue, 
jobID, blobWriter);
+
+            if (serializedValueOrBlobKey.isLeft()) {
+                return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
+            } else {
+                return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
+            }
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
index 8845deb8bc4..f2007d4d930 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
@@ -85,7 +85,7 @@ class CachedShuffleDescriptorsTest {
 
         
assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).isEmpty();
         cachedShuffleDescriptors.serializeShuffleDescriptors(
-                CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor);
+                new TestingShuffleDescriptorSerializer());
         
assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).hasSize(1);
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
maybeOffloadedShuffleDescriptor =
                 
cachedShuffleDescriptors.getAllSerializedShuffleDescriptors().get(0);
@@ -122,13 +122,13 @@ class CachedShuffleDescriptorsTest {
                 new CachedShuffleDescriptors(
                         consumedPartitionGroup1,
                         
createSingleShuffleDescriptorAndIndex(shuffleDescriptor, 0));
-        cachedShuffleDescriptors.serializeShuffleDescriptors(
-                CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor);
+        TestingShuffleDescriptorSerializer testingShuffleDescriptorSerializer =
+                new TestingShuffleDescriptorSerializer();
+        
cachedShuffleDescriptors.serializeShuffleDescriptors(testingShuffleDescriptorSerializer);
 
         
cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition1);
         
cachedShuffleDescriptors.markPartitionFinished(intermediateResultPartition2);
-        cachedShuffleDescriptors.serializeShuffleDescriptors(
-                CachedShuffleDescriptorsTest::nonOffloadedShuffleDescriptor);
+        
cachedShuffleDescriptors.serializeShuffleDescriptors(testingShuffleDescriptorSerializer);
         
assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptors()).hasSize(2);
 
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> maybeOffloaded =
@@ -149,11 +149,6 @@ class CachedShuffleDescriptorsTest {
                 Arrays.asList(0, 1));
     }
 
-    private static MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
nonOffloadedShuffleDescriptor(
-            ShuffleDescriptorAndIndex[] toBeSerialized) throws IOException {
-        return new 
NonOffloaded<>(CompressedSerializedValue.fromObject(toBeSerialized));
-    }
-
     private void assertNonOffloadedShuffleDescriptorAndIndexEquals(
             MaybeOffloaded<ShuffleDescriptorAndIndex[]> maybeOffloaded,
             List<ShuffleDescriptor> expectedDescriptors,
@@ -214,4 +209,14 @@ class CachedShuffleDescriptorsTest {
         scheduler.startScheduling();
         return scheduler.getExecutionGraph();
     }
+
+    private static class TestingShuffleDescriptorSerializer
+            implements 
TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer {
+
+        @Override
+        public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException {
+            return new 
NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptors));
+        }
+    }
 }

Reply via email to