wanglijie95 commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1246353660


##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
     /** Serialize shuffle descriptors. */
     interface ShuffleDescriptorSerializer {
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException;
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;

Review Comment:
   We'd better add document for this method, especially the meaning of params.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
     /** Serialize shuffle descriptors. */
     interface ShuffleDescriptorSerializer {
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException;
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;
     }
 
     private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
         private final JobID jobID;
         private final BlobWriter blobWriter;
+        private final int offloadShuffleDescriptorsThreshold;
 
-        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter 
blobWriter) {
+        public DefaultShuffleDescriptorSerializer(
+                JobID jobID, BlobWriter blobWriter, int 
offloadShuffleDescriptorsThreshold) {
             this.jobID = jobID;
             this.blobWriter = blobWriter;
+            this.offloadShuffleDescriptorsThreshold = 
offloadShuffleDescriptorsThreshold;
         }
 
         public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException {
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer)
+                throws IOException {
 
             final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
                     CompressedSerializedValue.fromObject(shuffleDescriptors);
 
             final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
                     serializedValueOrBlobKey =
-                            BlobWriter.tryOffload(compressedSerializedValue, 
jobID, blobWriter);
+                            shouldOffload(shuffleDescriptors, numConsumer)
+                                    ? BlobWriter.offloadWithException(
+                                            compressedSerializedValue, jobID, 
blobWriter)
+                                    : Either.Left(compressedSerializedValue);
 
             if (serializedValueOrBlobKey.isLeft()) {
                 return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
             } else {
                 return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
             }
         }
+
+        /**
+         * Determine whether shuffle descriptors should be offloaded by blob 
server.
+         *
+         * @param shuffleDescriptorsToSerialize shuffle descriptors to 
serialize
+         * @param numConsumers how many consumers this serialized shuffle 
descriptor should be sent
+         * @return whether shuffle descriptors should be offloaded by blob 
server
+         */
+        private boolean shouldOffload(
+                ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int 
numConsumers) {
+            return shuffleDescriptorsToSerialize.length * numConsumers
+                    >= offloadShuffleDescriptorsThreshold;
+        }
     }
+
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(

Review Comment:
   I think the `@Documentation.ExcludeFromDocumentation` is not needed here. We 
can change it to comments.  And we generally put the option at the front of a 
class.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
     /** Serialize shuffle descriptors. */
     interface ShuffleDescriptorSerializer {
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException;
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;
     }
 
     private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
         private final JobID jobID;
         private final BlobWriter blobWriter;
+        private final int offloadShuffleDescriptorsThreshold;
 
-        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter 
blobWriter) {
+        public DefaultShuffleDescriptorSerializer(
+                JobID jobID, BlobWriter blobWriter, int 
offloadShuffleDescriptorsThreshold) {
             this.jobID = jobID;
             this.blobWriter = blobWriter;

Review Comment:
   Add `checkNotNull` for `jobID` and `blobWriter`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
     /** Serialize shuffle descriptors. */
     interface ShuffleDescriptorSerializer {
         MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException;
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;
     }
 
     private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
         private final JobID jobID;
         private final BlobWriter blobWriter;
+        private final int offloadShuffleDescriptorsThreshold;
 
-        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter 
blobWriter) {
+        public DefaultShuffleDescriptorSerializer(
+                JobID jobID, BlobWriter blobWriter, int 
offloadShuffleDescriptorsThreshold) {
             this.jobID = jobID;
             this.blobWriter = blobWriter;
+            this.offloadShuffleDescriptorsThreshold = 
offloadShuffleDescriptorsThreshold;
         }
 
         public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-                ShuffleDescriptorAndIndex[] shuffleDescriptors) throws 
IOException {
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer)
+                throws IOException {
 
             final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
                     CompressedSerializedValue.fromObject(shuffleDescriptors);
 
             final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
                     serializedValueOrBlobKey =
-                            BlobWriter.tryOffload(compressedSerializedValue, 
jobID, blobWriter);
+                            shouldOffload(shuffleDescriptors, numConsumer)
+                                    ? BlobWriter.offloadWithException(
+                                            compressedSerializedValue, jobID, 
blobWriter)
+                                    : Either.Left(compressedSerializedValue);
 
             if (serializedValueOrBlobKey.isLeft()) {
                 return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
             } else {
                 return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
             }
         }
+
+        /**
+         * Determine whether shuffle descriptors should be offloaded by blob 
server.
+         *
+         * @param shuffleDescriptorsToSerialize shuffle descriptors to 
serialize
+         * @param numConsumers how many consumers this serialized shuffle 
descriptor should be sent
+         * @return whether shuffle descriptors should be offloaded by blob 
server
+         */
+        private boolean shouldOffload(
+                ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int 
numConsumers) {
+            return shuffleDescriptorsToSerialize.length * numConsumers
+                    >= offloadShuffleDescriptorsThreshold;
+        }
     }
+
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "this is an expert option, that we do not want to expose in the 
documentation")
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+            key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+                    .intType()
+                    .defaultValue(4_000 * 4_000)
+                    .withDescription(
+                            "The threshold to force enable offload shuffle 
descriptors via blob server."
+                                    + " This is a fixed value since it is 
difficult for users to configure."
+                                    + " This default value means JobManager 
need to serialized and transport"

Review Comment:
   `This is a fixed value since it is difficult for users to configure` -> This 
comments is outdated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to