Thesharing commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1211072892
##########
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java:
##########
@@ -95,11 +95,20 @@ static <T> Either<SerializedValue<T>, PermanentBlobKey>
serializeAndTryOffload(
static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
SerializedValue<T> serializedValue, JobID jobId, BlobWriter
blobWriter) {
+ return tryOffload(serializedValue, jobId, blobWriter, false);
Review Comment:
It's a bit confusing for the `false` value here. Should we offload or
shouldn't we offload here?
I think maybe we could change the function into this way:
```java
static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
SerializedValue<T> serializedValue, JobID jobId, BlobWriter
blobWriter) {
if (serializedValue.getByteArray().length <
blobWriter.getMinOffloadingSize()) {
return Either.Left(serializedValue);
} else {
return offloadWithException(serializedValue, jobId, blobWriter);
}
}
static <T> Either<SerializedValue<T>, PermanentBlobKey> offloadWithException(
SerializedValue<T> serializedValue, JobID jobId, BlobWriter
blobWriter) {
Preconditions.checkNotNull(serializedValue);
Preconditions.checkNotNull(jobId);
Preconditions.checkNotNull(blobWriter);
try {
final PermanentBlobKey permanentBlobKey =
blobWriter.putPermanent(jobId,
serializedValue.getByteArray());
return Either.Right(permanentBlobKey);
} catch (IOException e) {
LOG.warn("Failed to offload value for job {} to BLOB store.", jobId,
e);
return Either.Left(serializedValue);
}
}
```
Then in `TaskDeploymentDescriptorFactory`, we decide whether we should
offload or not according to the consumers parallelism and then call
`offloadWithException`. This will make sure `BlobWriter` don't need to know the
logic of `shouldOffload` in task deployment.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -54,10 +56,18 @@ public class CachedShuffleDescriptors {
/** Stores the mapping of resultPartitionId to index subscripts in
consumed partition group. */
private final Map<IntermediateResultPartitionID, Integer>
resultPartitionIdToIndex;
- public CachedShuffleDescriptors(
+ /** The number of consumers for {@link ConsumedPartitionGroup}. */
+ private final int numConsumers;
+
+ private final int offloadShuffleDescriptorsThreshold;
Review Comment:
Since all the values of `offloadShuffleDescriptorsThreshold` are the same,
could we put this value into `TaskDeploymentDescriptorFactory`? Also, we could
move the `shouldOffload` function to the `TaskDeploymentDescriptorFactory`,
too. Then we just need to pass all necessary parameters to `shouldOffload` in
`serializeAndTryOffloadShuffleDescriptor` function.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -104,4 +119,30 @@ public void
markPartitionFinished(IntermediateResultPartition resultPartition) {
checkNotNull(
resultPartitionIdToIndex.get(resultPartition.getPartitionId()))));
}
+
+ private boolean shouldOffload(ShuffleDescriptorAndIndex[]
shuffleDescriptorsToSerialize) {
+ return shuffleDescriptorsToSerialize.length * numConsumers
Review Comment:
I'm wondering why we use `shuffleDescriptorsToSerialize.length` here. It may
be a bit confusing for this case:
For a vertex of 10,000 * 10,000 in hybrid shuffle:
1. `shuffleDescriptorsToSerialize` only contains half of the
ShuffleDescriptors, it wouldn't be offloaded.
2. Afterwards, `shuffleDescriptorsToSerialize` contains all the
ShuffleDescriptors, it would be offloaded.
Could we just use `numConsumers` here?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java:
##########
@@ -143,7 +147,8 @@ public static DefaultExecutionGraph buildGraph(
executionJobVertexFactory,
jobGraph.getJobStatusHooks(),
markPartitionFinishedStrategy,
- nonFinishedHybridPartitionShouldBeUnknown);
+ nonFinishedHybridPartitionShouldBeUnknown,
Review Comment:
Hey, @reswqa. I'm wondering since
`nonFinishedHybridPartitionShouldBeUnknown` and
`offloadShuffleDescriptorsThreshold` are both related to task deployment, could
we merge them into a configuration class or a factory or something else? It
could make the initialization of `ExecutionGraph` and other related components
more readable. What's your opinion?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -216,14 +221,16 @@ private ShuffleDescriptorAndIndex[]
computeConsumedPartitionShuffleDescriptors(
}
private MaybeOffloaded<ShuffleDescriptorAndIndex[]>
serializeAndTryOffloadShuffleDescriptor(
- ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException
{
+ ShuffleDescriptorAndIndex[] shuffleDescriptors, boolean
shouldOffload)
+ throws IOException {
final CompressedSerializedValue<ShuffleDescriptorAndIndex[]>
compressedSerializedValue =
CompressedSerializedValue.fromObject(shuffleDescriptors);
final Either<SerializedValue<ShuffleDescriptorAndIndex[]>,
PermanentBlobKey>
serializedValueOrBlobKey =
- BlobWriter.tryOffload(compressedSerializedValue,
jobID, blobWriter);
+ BlobWriter.tryOffload(
Review Comment:
Since `BlobWriter` don't know the logic of task deployment, here we could
say:
```java
final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, PermanentBlobKey>
serializedValueOrBlobKey =
shouldOffload(numConsumers)
? BlobWriter.offloadWithException(
compressedSerializedValue, jobID, blobWriter)
: Either.Left(compressedSerializedValue);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]