wanglijie95 commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1246353660
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
/** Serialize shuffle descriptors. */
interface ShuffleDescriptorSerializer {
MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws
IOException;
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer) throws IOException;
Review Comment:
We'd better add document for this method, especially the meaning of params.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
/** Serialize shuffle descriptors. */
interface ShuffleDescriptorSerializer {
MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws
IOException;
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer) throws IOException;
}
private static class DefaultShuffleDescriptorSerializer implements
ShuffleDescriptorSerializer {
private final JobID jobID;
private final BlobWriter blobWriter;
+ private final int offloadShuffleDescriptorsThreshold;
- public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter
blobWriter) {
+ public DefaultShuffleDescriptorSerializer(
+ JobID jobID, BlobWriter blobWriter, int
offloadShuffleDescriptorsThreshold) {
this.jobID = jobID;
this.blobWriter = blobWriter;
+ this.offloadShuffleDescriptorsThreshold =
offloadShuffleDescriptorsThreshold;
}
public MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws
IOException {
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer)
+ throws IOException {
final CompressedSerializedValue<ShuffleDescriptorAndIndex[]>
compressedSerializedValue =
CompressedSerializedValue.fromObject(shuffleDescriptors);
final Either<SerializedValue<ShuffleDescriptorAndIndex[]>,
PermanentBlobKey>
serializedValueOrBlobKey =
- BlobWriter.tryOffload(compressedSerializedValue,
jobID, blobWriter);
+ shouldOffload(shuffleDescriptors, numConsumer)
+ ? BlobWriter.offloadWithException(
+ compressedSerializedValue, jobID,
blobWriter)
+ : Either.Left(compressedSerializedValue);
if (serializedValueOrBlobKey.isLeft()) {
return new
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
} else {
return new
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
}
}
+
+ /**
+ * Determine whether shuffle descriptors should be offloaded by blob
server.
+ *
+ * @param shuffleDescriptorsToSerialize shuffle descriptors to
serialize
+ * @param numConsumers how many consumers this serialized shuffle
descriptor should be sent
+ * @return whether shuffle descriptors should be offloaded by blob
server
+ */
+ private boolean shouldOffload(
+ ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int
numConsumers) {
+ return shuffleDescriptorsToSerialize.length * numConsumers
+ >= offloadShuffleDescriptorsThreshold;
+ }
}
+
+ @Experimental
+ @Documentation.ExcludeFromDocumentation(
Review Comment:
I think the `@Documentation.ExcludeFromDocumentation` is not needed here. We
can change it to comments. And we generally put the option at the front of a
class.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
/** Serialize shuffle descriptors. */
interface ShuffleDescriptorSerializer {
MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws
IOException;
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer) throws IOException;
}
private static class DefaultShuffleDescriptorSerializer implements
ShuffleDescriptorSerializer {
private final JobID jobID;
private final BlobWriter blobWriter;
+ private final int offloadShuffleDescriptorsThreshold;
- public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter
blobWriter) {
+ public DefaultShuffleDescriptorSerializer(
+ JobID jobID, BlobWriter blobWriter, int
offloadShuffleDescriptorsThreshold) {
this.jobID = jobID;
this.blobWriter = blobWriter;
Review Comment:
Add `checkNotNull` for `jobID` and `blobWriter`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -428,33 +434,66 @@ public int getIndex() {
/** Serialize shuffle descriptors. */
interface ShuffleDescriptorSerializer {
MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws
IOException;
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer) throws IOException;
}
private static class DefaultShuffleDescriptorSerializer implements
ShuffleDescriptorSerializer {
private final JobID jobID;
private final BlobWriter blobWriter;
+ private final int offloadShuffleDescriptorsThreshold;
- public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter
blobWriter) {
+ public DefaultShuffleDescriptorSerializer(
+ JobID jobID, BlobWriter blobWriter, int
offloadShuffleDescriptorsThreshold) {
this.jobID = jobID;
this.blobWriter = blobWriter;
+ this.offloadShuffleDescriptorsThreshold =
offloadShuffleDescriptorsThreshold;
}
public MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws
IOException {
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, int
numConsumer)
+ throws IOException {
final CompressedSerializedValue<ShuffleDescriptorAndIndex[]>
compressedSerializedValue =
CompressedSerializedValue.fromObject(shuffleDescriptors);
final Either<SerializedValue<ShuffleDescriptorAndIndex[]>,
PermanentBlobKey>
serializedValueOrBlobKey =
- BlobWriter.tryOffload(compressedSerializedValue,
jobID, blobWriter);
+ shouldOffload(shuffleDescriptors, numConsumer)
+ ? BlobWriter.offloadWithException(
+ compressedSerializedValue, jobID,
blobWriter)
+ : Either.Left(compressedSerializedValue);
if (serializedValueOrBlobKey.isLeft()) {
return new
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
} else {
return new
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
}
}
+
+ /**
+ * Determine whether shuffle descriptors should be offloaded by blob
server.
+ *
+ * @param shuffleDescriptorsToSerialize shuffle descriptors to
serialize
+ * @param numConsumers how many consumers this serialized shuffle
descriptor should be sent
+ * @return whether shuffle descriptors should be offloaded by blob
server
+ */
+ private boolean shouldOffload(
+ ShuffleDescriptorAndIndex[] shuffleDescriptorsToSerialize, int
numConsumers) {
+ return shuffleDescriptorsToSerialize.length * numConsumers
+ >= offloadShuffleDescriptorsThreshold;
+ }
}
+
+ @Experimental
+ @Documentation.ExcludeFromDocumentation(
+ "this is an expert option, that we do not want to expose in the
documentation")
+ public static final ConfigOption<Integer>
OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD =
+ key("jobmanager.deployment.offload-shuffle-descriptors-threshold")
+ .intType()
+ .defaultValue(4_000 * 4_000)
+ .withDescription(
+ "The threshold to force enable offload shuffle
descriptors via blob server."
+ + " This is a fixed value since it is
difficult for users to configure."
+ + " This default value means JobManager
need to serialized and transport"
Review Comment:
`This is a fixed value since it is difficult for users to configure` -> This
comments is outdated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]