[ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-22676:
----------------------------
    Affects Version/s: 1.4

> The partition tracker should support remote shuffle properly
> ------------------------------------------------------------
>
>                 Key: FLINK-22676
>                 URL: https://issues.apache.org/jira/browse/FLINK-22676
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination, Runtime / Network
>    Affects Versions: 1.4
>            Reporter: Jin Xing
>            Assignee: Jin Xing
>            Priority: Major
>              Labels: pull-request-available
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to