Thesharing commented on code in PR #22674:
URL: https://github.com/apache/flink/pull/22674#discussion_r1211072892


##########
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java:
##########
@@ -95,11 +95,20 @@ static <T> Either<SerializedValue<T>, PermanentBlobKey> 
serializeAndTryOffload(
 
     static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
             SerializedValue<T> serializedValue, JobID jobId, BlobWriter 
blobWriter) {
+        return tryOffload(serializedValue, jobId, blobWriter, false);

Review Comment:
   It's a bit confusing for the `false` value here. Should we offload or 
shouldn't we offload here?
   
   I think maybe we could change the function into this way:
   
   ```java
   static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
           SerializedValue<T> serializedValue, JobID jobId, BlobWriter 
blobWriter) {
       if (serializedValue.getByteArray().length < 
blobWriter.getMinOffloadingSize()) {
           return Either.Left(serializedValue);
       } else {
           return offloadWithException(serializedValue, jobId, blobWriter);
       }
   }
   
   static <T> Either<SerializedValue<T>, PermanentBlobKey> offloadWithException(
           SerializedValue<T> serializedValue, JobID jobId, BlobWriter 
blobWriter) {
       Preconditions.checkNotNull(serializedValue);
       Preconditions.checkNotNull(jobId);
       Preconditions.checkNotNull(blobWriter);
       try {
           final PermanentBlobKey permanentBlobKey =
                   blobWriter.putPermanent(jobId, 
serializedValue.getByteArray());
           return Either.Right(permanentBlobKey);
       } catch (IOException e) {
           LOG.warn("Failed to offload value for job {} to BLOB store.", jobId, 
e);
           return Either.Left(serializedValue);
       }
   }
   ```
   
   Then in `TaskDeploymentDescriptorFactory`, we decide whether we should 
offload or not according to the consumers parallelism and then call 
`offloadWithException`. This will make sure `BlobWriter` don't need to know the 
logic of `shouldOffload` in task deployment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -54,10 +56,18 @@ public class CachedShuffleDescriptors {
     /** Stores the mapping of resultPartitionId to index subscripts in 
consumed partition group. */
     private final Map<IntermediateResultPartitionID, Integer> 
resultPartitionIdToIndex;
 
-    public CachedShuffleDescriptors(
+    /** The number of consumers for {@link ConsumedPartitionGroup}. */
+    private final int numConsumers;
+
+    private final int offloadShuffleDescriptorsThreshold;

Review Comment:
   Since all the values of `offloadShuffleDescriptorsThreshold` are the same, 
could we put this value into `TaskDeploymentDescriptorFactory`? Also, we could 
move the `shouldOffload` function to the `TaskDeploymentDescriptorFactory`, 
too. Then we just need to pass all necessary parameters to `shouldOffload` in 
`serializeAndTryOffloadShuffleDescriptor` function.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java:
##########
@@ -104,4 +119,30 @@ public void 
markPartitionFinished(IntermediateResultPartition resultPartition) {
                         checkNotNull(
                                 
resultPartitionIdToIndex.get(resultPartition.getPartitionId()))));
     }
+
+    private boolean shouldOffload(ShuffleDescriptorAndIndex[] 
shuffleDescriptorsToSerialize) {
+        return shuffleDescriptorsToSerialize.length * numConsumers

Review Comment:
   I'm wondering why we use `shuffleDescriptorsToSerialize.length` here. It may 
be a bit confusing for this case:
   
   For a vertex of 10,000 * 10,000 in hybrid shuffle:
   1. `shuffleDescriptorsToSerialize` only contains half of the 
ShuffleDescriptors, it wouldn't be offloaded.
   2. Afterwards, `shuffleDescriptorsToSerialize` contains all the 
ShuffleDescriptors, it would be offloaded.
   
   Could we just use `numConsumers` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java:
##########
@@ -143,7 +147,8 @@ public static DefaultExecutionGraph buildGraph(
                             executionJobVertexFactory,
                             jobGraph.getJobStatusHooks(),
                             markPartitionFinishedStrategy,
-                            nonFinishedHybridPartitionShouldBeUnknown);
+                            nonFinishedHybridPartitionShouldBeUnknown,

Review Comment:
   Hey, @reswqa. I'm wondering since 
`nonFinishedHybridPartitionShouldBeUnknown` and 
`offloadShuffleDescriptorsThreshold` are both related to task deployment, could 
we merge them into a configuration class or a factory or something else? It 
could make the initialization of `ExecutionGraph` and other related components 
more readable. What's your opinion?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -216,14 +221,16 @@ private ShuffleDescriptorAndIndex[] 
computeConsumedPartitionShuffleDescriptors(
     }
 
     private MaybeOffloaded<ShuffleDescriptorAndIndex[]> 
serializeAndTryOffloadShuffleDescriptor(
-            ShuffleDescriptorAndIndex[] shuffleDescriptors) throws IOException 
{
+            ShuffleDescriptorAndIndex[] shuffleDescriptors, boolean 
shouldOffload)
+            throws IOException {
 
         final CompressedSerializedValue<ShuffleDescriptorAndIndex[]> 
compressedSerializedValue =
                 CompressedSerializedValue.fromObject(shuffleDescriptors);
 
         final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, 
PermanentBlobKey>
                 serializedValueOrBlobKey =
-                        BlobWriter.tryOffload(compressedSerializedValue, 
jobID, blobWriter);
+                        BlobWriter.tryOffload(

Review Comment:
   Since `BlobWriter` don't know the logic of task deployment, here we could 
say:
   
   ```java
   final Either<SerializedValue<ShuffleDescriptorAndIndex[]>, PermanentBlobKey>
           serializedValueOrBlobKey =
                   shouldOffload(numConsumers)
                           ? BlobWriter.offloadWithException(
                                   compressedSerializedValue, jobID, blobWriter)
                           : Either.Left(compressedSerializedValue);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to