azagrebin commented on a change in pull request #8789: [FLINK-12890] Add 
partition lifecycle related Shuffle API
URL: https://github.com/apache/flink/pull/8789#discussion_r296215145
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 ##########
 @@ -41,4 +44,17 @@
        CompletableFuture<T> registerPartitionWithProducer(
                PartitionDescriptor partitionDescriptor,
                ProducerDescriptor producerDescriptor);
+
+       /**
+        * Release any external resources occupied by the given partition.
+        *
+        * <p>This call triggers release of any resources which are occupied by 
the given partition in the external systems
+        * outside of the producer executor. This is mostly relevant for the 
batch jobs and blocking result partitions
+        * for which {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} returns {@code 
false}.
+        * The producer local resources are managed by {@link 
ShuffleDescriptor#hasLocalResources()} and
+        * {@link ShuffleEnvironment#releasePartitions(Collection)}.
+        *
+        * @param shuffleDescriptor shuffle descriptor of the result partition 
to release externally.
+        */
+       void releasePartitionExternally(T shuffleDescriptor);
 
 Review comment:
   The current idea is that (both for `not RPD.releaseOnConsumption`):
   
   - the local release is for RPC JM->TM (only if `SD.storesLocalResourcesOn`): 
cleans up `JobAwareShuffleEnvironment` wrapper and calls 
`ShuffleEnvironment#releasePartitionsLocally`.
   
   - the external release is for RPC `ShuffleMaster` -> external system when 
the consumption is done, e.g. auxiliary service on yarn node. It is not for 
`ShuffleEnvironment` because TM yarn container can be already released after 
the production (but not consumption) is done for external shuffle service (does 
not `SD.storesLocalResourcesOn`). It is not needed for Netty implementation 
(empty) but has to be called for external shuffle service w/o JM/TM RPC (no 
TM/ShuffleEnvironment connection).
   
   So in both cases we call the methods for certain semantic purpose.
   
   We need the semantic difference in these methods because we already assume 
that there are TM local and external systems, and JM/TM treat the cases 
differently. It would be nice to have only one call, e.g. 
`ShuffleMaster.releasePartitions` and let it decide what to call/release but 
then it would have to implement RPC internally and also we need to do the JM/TM 
RPC anyways to manage `JobAwareShuffleEnvironment` wrapper. Having declared the 
intentions in method names should actually simplify the shuffle implementations.
   
   `releaseOnConsumption` currently means that shuffle service internally 
monitors end of consumption (like Netty implementation always did) and takes 
care about release automatically without any action required by JM/TM. True, in 
future we could change it that it is managed only by JM/TM directly, I think we 
keep it for safety until the fine-grained release is stable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to