[ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jin Xing updated FLINK-22676:
-----------------------------
    Description: 
In current Flink, data partition is bound with the ResourceID of TM in 
Execution#startTrackingPartitions and partition tracker will stop tracking 
corresponding partitions when a TM 
disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
data is bound with computing resource (TM). It works fine for internal shuffle 
service, but doesn't for remote shuffle service. Note that shuffle data is 
accommodated on remote, the lifecycle of a completed partition is capable to be 
decoupled with TM, i.e. TM is totally fine to be released when no computing 
task on it and further shuffle reading requests could be directed to remote 
shuffle cluster. In addition, when a TM is lost, its completed data partitions 
on remote shuffle cluster could avoid reproducing.

 

The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
partition's locationID (where the partition is located) and tmID (which TM the 
partition is produced from). In TM internal shuffle, partition's locationID is 
the same with tmID, but it is not in remote shuffle; JobMasterPartitionTracker 
as an independent component should be able to differentiate locationID and tmID 
of a partition, thus to handle the lifecycle of a partition properly;

We propose that JobMasterPartitionTracker manages and indexes partitions with 
both locationID and tmID. The process of registration and unregistration will 
be like below:

*A. Partiiton Registration*
 # Execution#registerProducedPartitions registers partition to ShuffleMaster 
and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
only returns the location of the producing TM if the partition occupies local 
resources there.
 We proposes to change a proper name of this method and always return the 
locationID of the partition. It might be as below:

{code:java}
 ResourceID getLocationID();  {code}
 # Execution#registerProducePartitions then registers partition to 
JMPartitionTracker with tmID (ResourceID of TaskManager from 
TaskManagerLocation) and the locationID (acquired in step 1). 
JobMasterPartitionTracker will indexes a partition with both tmID and 
locationID;

*B. Invokes from JM and ShuffleMaster*

      JobMasterPartitionTracker listens invokes from both JM and ShuffleMaster.
 # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a TM 
disconnects, it will check whether the disconnected tmID equals to a certain 
locationID of a partition. If so, tracking of the corresponding partition will 
be stopped.
 # When JobMasterPartitionTracker hears from ShuffleMaster that a data location 
gets lost, it will unregister corresponding partitions by locationID;

*C. Partition Unregistration*
When unregister a partition, JobMasterPartitionTracker removes the 
corresponding indexes to tmID and locationID firstly, and then release the 
partition by shuffle service types --
 # If the locationID equals to the tmID, it indicates the partition is 
accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
TaskExecutorGateway for the release;
 # If the locationID doesn't equal to tmID, it indicates the partition is 
accommodated by external shuffle service, JMPartitionTracker will invokes 
ShuffleMaster for the release;

  was:In current Flink, data partition is bound with the ResourceID of TM in 
Execution#startTrackingPartitions and partition tracker will stop tracking 
corresponding partitions when a TM 
disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
data is bound with computing resource (TM). It works fine for internal shuffle 
service, but doesn't for remote shuffle service. Note that shuffle data is 
accommodated on remote, the lifecycle of a completed partition is capable to be 
decoupled with TM, i.e. TM is totally fine to be released when no computing 
task on it and further shuffle reading requests could be directed to remote 
shuffle cluster. In addition, when a TM is lost, its completed data partitions 
on remote shuffle cluster could avoid reproducing.


> The partition tracker should support remote shuffle properly
> ------------------------------------------------------------
>
>                 Key: FLINK-22676
>                 URL: https://issues.apache.org/jira/browse/FLINK-22676
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>            Reporter: Jin Xing
>            Priority: Major
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to