zhuzhurk commented on a change in pull request #16314:
URL: https://github.com/apache/flink/pull/16314#discussion_r661226308



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
##########
@@ -59,7 +71,10 @@
     @Nonnegative private final int consumedSubpartitionIndex;
 
     /** An input channel for each consumed subpartition. */
-    private final ShuffleDescriptor[] inputChannels;
+    private ShuffleDescriptor[] inputChannels;

Review comment:
       should be `transient`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
##########
@@ -59,7 +71,10 @@
     @Nonnegative private final int consumedSubpartitionIndex;
 
     /** An input channel for each consumed subpartition. */
-    private final ShuffleDescriptor[] inputChannels;
+    private ShuffleDescriptor[] inputChannels;
+
+    /** Serialized value of shuffle descriptors. */
+    private MaybeOffloaded<ShuffleDescriptor[]> serializedInputChannels;
 
     public InputGateDeploymentDescriptor(

Review comment:
       This constructor should now be marked as `VisibleForTesting`. And it 
should be changed to invoke the production constructor, i.e. 
   ```
   this(
     consumedResultId, 
     consumedPartitionType, 
     consumedSubpartitionIndex, 
     new NonOffloaded(new SerializedValue(inputChannels)));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
##########
@@ -171,6 +180,21 @@ boolean areAllPartitionsFinished() {
         return numberOfRunningProducers.get() == 0;
     }
 
+    public MaybeOffloaded<ShuffleDescriptor[]> getCachedShuffleDescriptors(
+            ConsumedPartitionGroup consumedPartitionGroup) {
+        return shuffleDescriptorCache.get(consumedPartitionGroup);
+    }
+
+    public void cacheShuffleDescriptors(
+            ConsumedPartitionGroup consumedPartitionGroup,
+            MaybeOffloaded<ShuffleDescriptor[]> shuffleDescriptors) {
+        this.shuffleDescriptorCache.put(consumedPartitionGroup, 
shuffleDescriptors);
+    }
+
+    public void notifyPartitionStatusChanged() {
+        this.shuffleDescriptorCache.clear();

Review comment:
       The offloaded data should be removed once not needed. Otherwise there 
can be lots of outdated blob files.
   However, I'm a bit concerned that it can happen that the blob is deleted 
when a task still needs it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to