[ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355162#comment-17355162
 ] 

Jin Xing commented on FLINK-22676:
----------------------------------

In current Flink, JMPartitonTracker works independent from ShuffleMaster. But 
how a partition is tracked or managed really depends on the implementation of 
'shuffle service'. Maybe a crazy idea is to move the partition tracking logic 
under ShuffleMaster. In addition, when unregister a partition, 
JMPartitionTracker invokes TM gateway and release by ShuffleEnvironment if the 
partition locates on TM, otherwise JMPartitionTracker invokes ShuffleMaster to 
release the partition. I think, by such mechanism, TM internal shuffle seems to 
be an exception and not working under the framework of "Pluggable Shuffle 
Service". From my point of view, it will be a more unified design if all 
unregistration requests go by ShuffleMaster. I know what's described in this 
paragraph seems breaking change. It will be realistic to keep the components as 
they are and do refinements in JMPartitionTracker to support external shuffle 
properly.

> The partition tracker should support remote shuffle properly
> ------------------------------------------------------------
>
>                 Key: FLINK-22676
>                 URL: https://issues.apache.org/jira/browse/FLINK-22676
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination, Runtime / Network
>            Reporter: Jin Xing
>            Priority: Major
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to