wanglijie95 commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1247502214


##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -441,4 +440,66 @@ public int getIndex() {
             return index;
         }
     }
+
+    /** Serialize shuffle descriptors. */
+    interface ShuffleDescriptorSerializer {
+        /**
+         * Serialize and try offload shuffle descriptors.
+         *
+         * @param shuffleDescriptors to serialize
+         * @param numConsumer consumers number of these shuffle descriptors, 
it means how many times
+         *     serialized shuffle descriptor should be sent
+         * @return offloaded or non-offloaded serialized shuffle descriptors
+         */
+        MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;
+    }
+
+    private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
+        private final JobID jobID;
+        private final BlobWriter blobWriter;
+        private final int offloadShuffleDescriptorsThreshold;
+
+        public DefaultShuffleDescriptorSerializer(
+                JobID jobID, BlobWriter blobWriter, int 
offloadShuffleDescriptorsThreshold) {
+            this.jobID = checkNotNull(jobID);
+            this.blobWriter = checkNotNull(blobWriter);
+            this.offloadShuffleDescriptorsThreshold = 
offloadShuffleDescriptorsThreshold;
+        }
+
+        @Override
+        public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer)
+                throws IOException {
+
+            final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
+                    CompressedSerializedValue.fromObject(shuffleDescriptors);
+
+            final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
+                    serializedValueOrBlobKey =
+                            shouldOffload(shuffleDescriptors, numConsumer)
+                                    ? BlobWriter.offloadWithException(
+                                            compressedSerializedValue, jobID, 
blobWriter)
+                                    : Either.Left(compressedSerializedValue);
+
+            if (serializedValueOrBlobKey.isLeft()) {
+                return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
+            } else {
+                return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
+            }
+        }
+
+        /**
+         * Determine whether shuffle descriptors should be offloaded by blob 
server.

Review Comment:
   "by blob server" -> "to blob server"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -441,4 +440,66 @@ public int getIndex() {
             return index;
         }
     }
+
+    /** Serialize shuffle descriptors. */
+    interface ShuffleDescriptorSerializer {
+        /**
+         * Serialize and try offload shuffle descriptors.
+         *
+         * @param shuffleDescriptors to serialize
+         * @param numConsumer consumers number of these shuffle descriptors, 
it means how many times
+         *     serialized shuffle descriptor should be sent
+         * @return offloaded or non-offloaded serialized shuffle descriptors
+         */
+        MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer) throws IOException;
+    }
+
+    private static class DefaultShuffleDescriptorSerializer implements 
ShuffleDescriptorSerializer {
+        private final JobID jobID;
+        private final BlobWriter blobWriter;
+        private final int offloadShuffleDescriptorsThreshold;
+
+        public DefaultShuffleDescriptorSerializer(
+                JobID jobID, BlobWriter blobWriter, int 
offloadShuffleDescriptorsThreshold) {
+            this.jobID = checkNotNull(jobID);
+            this.blobWriter = checkNotNull(blobWriter);
+            this.offloadShuffleDescriptorsThreshold = 
offloadShuffleDescriptorsThreshold;
+        }
+
+        @Override
+        public MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
+                ShuffleDescriptorAndIndex[] shuffleDescriptors, int 
numConsumer)
+                throws IOException {
+
+            final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
+                    CompressedSerializedValue.fromObject(shuffleDescriptors);
+
+            final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
+                    serializedValueOrBlobKey =
+                            shouldOffload(shuffleDescriptors, numConsumer)
+                                    ? BlobWriter.offloadWithException(
+                                            compressedSerializedValue, jobID, 
blobWriter)
+                                    : Either.Left(compressedSerializedValue);
+
+            if (serializedValueOrBlobKey.isLeft()) {
+                return new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
+            } else {
+                return new 
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
+            }
+        }
+
+        /**
+         * Determine whether shuffle descriptors should be offloaded by blob 
server.
+         *
+         * @param shuffleDescriptorsToSerialize shuffle descriptors to 
serialize
+         * @param numConsumers how many consumers this serialized shuffle 
descriptor should be sent
+         * @return whether shuffle descriptors should be offloaded by blob 
server

Review Comment:
   "by blob server" -> "to blob server"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Factory of {@link TaskDeploymentDescriptor} to deploy {@link
  * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
  */
 public class TaskDeploymentDescriptorFactory {
+    @Experimental
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+            key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+                    .intType()
+                    .defaultValue(4_000 * 4_000)
+                    .withDescription(
+                            "The threshold to force enable offload shuffle 
descriptors via blob server."

Review Comment:
   Maybe change the first sentence to "Threshold for offloading shuffle 
descriptors to blob server. Once the number of shuffle descriptors exceeds this 
value, we will offload the shuffle descriptors to blob server."



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Factory of {@link TaskDeploymentDescriptor} to deploy {@link
  * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
  */
 public class TaskDeploymentDescriptorFactory {
+    @Experimental
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =

Review Comment:
   Maybe add doc for this option, and explain it.  For example, "This is an 
expert option, that we do not want to expose in the documentation. The default 
value is good enough for almost all cases"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Factory of {@link TaskDeploymentDescriptor} to deploy {@link
  * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
  */
 public class TaskDeploymentDescriptorFactory {
+    @Experimental
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+            key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+                    .intType()
+                    .defaultValue(4_000 * 4_000)
+                    .withDescription(
+                            "The threshold to force enable offload shuffle 
descriptors via blob server."
+                                    + " This default value means JobManager 
need to serialized and transport"
+                                    + " 4000 shuffle descriptors(almost 100KB) 
to 4000 consumer(400MB in total)");

Review Comment:
   ```suggestion
                                       + " 4000 shuffle descriptors(almost 
100KB) to 4000 consumers (400MB in total)");
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Factory of {@link TaskDeploymentDescriptor} to deploy {@link
  * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
  */
 public class TaskDeploymentDescriptorFactory {
+    @Experimental
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+            key("jobmanager.deployment.offload-shuffle-descriptors-threshold")

Review Comment:
   Maye be change the key to 
`jobmanager.task-deployment.offload-shuffle-descriptors-to-blob-server.threshold-num`
 ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Factory of {@link TaskDeploymentDescriptor} to deploy {@link
  * org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
  */
 public class TaskDeploymentDescriptorFactory {
+    @Experimental
+    public static final ConfigOption<Integer> 
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+            key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+                    .intType()
+                    .defaultValue(4_000 * 4_000)
+                    .withDescription(
+                            "The threshold to force enable offload shuffle 
descriptors via blob server."
+                                    + " This default value means JobManager 
need to serialized and transport"

Review Comment:
   ```suggestion
                                       + " This default value means JobManager 
need to serialize and transport"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to