wanglijie95 commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1247502214
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -441,4 +440,66 @@ public int getIndex() {
return index;
}
}
+
+ /** Serialize shuffle descriptors. */
+ interface ShuffleDescriptorSerializer {
+ /**
+ * Serialize and try offload shuffle descriptors.
+ *
+ * @param shuffleDescriptors to serialize
+ * @param numConsumer consumers number of these shuffle descriptors,
it means how many times
+ * serialized shuffle descriptor should be sent
+ * @return offloaded or non-offloaded serialized shuffle descriptors
+ */
+ MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer) throws IOException;
+ }
+
+ private static class DefaultShuffleDescriptorSerializer implements
ShuffleDescriptorSerializer {
+ private final JobID jobID;
+ private final BlobWriter blobWriter;
+ private final int offloadShuffleDescriptorsThreshold;
+
+ public DefaultShuffleDescriptorSerializer(
+ JobID jobID, BlobWriter blobWriter, int
offloadShuffleDescriptorsThreshold) {
+ this.jobID = checkNotNull(jobID);
+ this.blobWriter = checkNotNull(blobWriter);
+ this.offloadShuffleDescriptorsThreshold =
offloadShuffleDescriptorsThreshold;
+ }
+
+ @Override
+ public MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer)
+ throws IOException {
+
+ final CompressedSerializedValue<ShuffleDescriptorAndIndex[]>
compressedSerializedValue =
+ CompressedSerializedValue.fromObject(shuffleDescriptors);
+
+ final Either<SerializedValue<ShuffleDescriptorAndIndex[]>,
PermanentBlobKey>
+ serializedValueOrBlobKey =
+ shouldOffload(shuffleDescriptors, numConsumer)
+ ? BlobWriter.offloadWithException(
+ compressedSerializedValue, jobID,
blobWriter)
+ : Either.Left(compressedSerializedValue);
+
+ if (serializedValueOrBlobKey.isLeft()) {
+ return new
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
+ } else {
+ return new
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
+ }
+ }
+
+ /**
+ * Determine whether shuffle descriptors should be offloaded by blob
server.
Review Comment:
"by blob server" -> "to blob server"
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -441,4 +440,66 @@ public int getIndex() {
return index;
}
}
+
+ /** Serialize shuffle descriptors. */
+ interface ShuffleDescriptorSerializer {
+ /**
+ * Serialize and try offload shuffle descriptors.
+ *
+ * @param shuffleDescriptors to serialize
+ * @param numConsumer consumers number of these shuffle descriptors,
it means how many times
+ * serialized shuffle descriptor should be sent
+ * @return offloaded or non-offloaded serialized shuffle descriptors
+ */
+ MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer) throws IOException;
+ }
+
+ private static class DefaultShuffleDescriptorSerializer implements
ShuffleDescriptorSerializer {
+ private final JobID jobID;
+ private final BlobWriter blobWriter;
+ private final int offloadShuffleDescriptorsThreshold;
+
+ public DefaultShuffleDescriptorSerializer(
+ JobID jobID, BlobWriter blobWriter, int
offloadShuffleDescriptorsThreshold) {
+ this.jobID = checkNotNull(jobID);
+ this.blobWriter = checkNotNull(blobWriter);
+ this.offloadShuffleDescriptorsThreshold =
offloadShuffleDescriptorsThreshold;
+ }
+
+ @Override
+ public MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer)
+ throws IOException {
+
+ final CompressedSerializedValue<ShuffleDescriptorAndIndex[]>
compressedSerializedValue =
+ CompressedSerializedValue.fromObject(shuffleDescriptors);
+
+ final Either<SerializedValue<ShuffleDescriptorAndIndex[]>,
PermanentBlobKey>
+ serializedValueOrBlobKey =
+ shouldOffload(shuffleDescriptors, numConsumer)
+ ? BlobWriter.offloadWithException(
+ compressedSerializedValue, jobID,
blobWriter)
+ : Either.Left(compressedSerializedValue);
+
+ if (serializedValueOrBlobKey.isLeft()) {
+ return new
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
+ } else {
+ return new
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
+ }
+ }
+
+ /**
+ * Determine whether shuffle descriptors should be offloaded by blob
server.
+ *
+ * @param shuffleDescriptorsToSerialize shuffle descriptors to
serialize
+ * @param numConsumers how many consumers this serialized shuffle
descriptor should be sent
+ * @return whether shuffle descriptors should be offloaded by blob
server
Review Comment:
"by blob server" -> "to blob server"
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Factory of {@link TaskDeploymentDescriptor} to deploy {@link
* org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
*/
public class TaskDeploymentDescriptorFactory {
+ @Experimental
+ public static final ConfigOption<Integer>
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+ key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+ .intType()
+ .defaultValue(4_000 * 4_000)
+ .withDescription(
+ "The threshold to force enable offload shuffle
descriptors via blob server."
Review Comment:
Maybe change the first sentence to "Threshold for offloading shuffle
descriptors to blob server. Once the number of shuffle descriptors exceeds this
value, we will offload the shuffle descriptors to blob server."
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Factory of {@link TaskDeploymentDescriptor} to deploy {@link
* org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
*/
public class TaskDeploymentDescriptorFactory {
+ @Experimental
+ public static final ConfigOption<Integer>
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
Review Comment:
Maybe add doc for this option, and explain it. For example, "This is an
expert option, that we do not want to expose in the documentation. The default
value is good enough for almost all cases"
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Factory of {@link TaskDeploymentDescriptor} to deploy {@link
* org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
*/
public class TaskDeploymentDescriptorFactory {
+ @Experimental
+ public static final ConfigOption<Integer>
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+ key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+ .intType()
+ .defaultValue(4_000 * 4_000)
+ .withDescription(
+ "The threshold to force enable offload shuffle
descriptors via blob server."
+ + " This default value means JobManager
need to serialized and transport"
+ + " 4000 shuffle descriptors(almost 100KB)
to 4000 consumer(400MB in total)");
Review Comment:
```suggestion
+ " 4000 shuffle descriptors(almost
100KB) to 4000 consumers (400MB in total)");
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Factory of {@link TaskDeploymentDescriptor} to deploy {@link
* org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
*/
public class TaskDeploymentDescriptorFactory {
+ @Experimental
+ public static final ConfigOption<Integer>
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+ key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
Review Comment:
Maye be change the key to
`jobmanager.task-deployment.offload-shuffle-descriptors-to-blob-server.threshold-num`
?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -58,30 +60,45 @@
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Factory of {@link TaskDeploymentDescriptor} to deploy {@link
* org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
*/
public class TaskDeploymentDescriptorFactory {
+ @Experimental
+ public static final ConfigOption<Integer>
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+ key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+ .intType()
+ .defaultValue(4_000 * 4_000)
+ .withDescription(
+ "The threshold to force enable offload shuffle
descriptors via blob server."
+ + " This default value means JobManager
need to serialized and transport"
Review Comment:
```suggestion
+ " This default value means JobManager
need to serialize and transport"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]