[
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356327#comment-17356327
]
Till Rohrmann commented on FLINK-22676:
---------------------------------------
Yes, this sounds like a good idea.
> The partition tracker should support remote shuffle properly
> ------------------------------------------------------------
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination, Runtime / Network
> Reporter: Jin Xing
> Priority: Major
>
> In current Flink, data partition is bound with the ResourceID of TM in
> Execution#startTrackingPartitions and partition tracker will stop tracking
> corresponding partitions when a TM
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle
> data is bound with computing resource (TM). It works fine for internal
> shuffle service, but doesn't for remote shuffle service. Note that shuffle
> data is accommodated on remote, the lifecycle of a completed partition is
> capable to be decoupled with TM, i.e. TM is totally fine to be released when
> no computing task on it and further shuffle reading requests could be
> directed to remote shuffle cluster. In addition, when a TM is lost, its
> completed data partitions on remote shuffle cluster could avoid reproducing.
>
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up
> partition's locationID (where the partition is located) and tmID (which TM
> the partition is produced from). In TM internal shuffle, partition's
> locationID is the same with tmID, but it is not in remote shuffle;
> JobMasterPartitionTracker as an independent component should be able to
> differentiate locationID and tmID of a partition, thus to handle the
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with
> both locationID and tmID. The process of registration and unregistration will
> be like below:
> *A. Partiiton Registration*
> # Execution#registerProducedPartitions registers partition to ShuffleMaster
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn
> only returns the location of the producing TM if the partition occupies local
> resources there.
> We proposes to change a proper name of this method and always return the
> locationID of the partition. It might be as below:
> {code:java}
> ResourceID getLocationID(); {code}
> # Execution#registerProducePartitions then registers partition to
> JMPartitionTracker with tmID (ResourceID of TaskManager from
> TaskManagerLocation) and the locationID (acquired in step 1).
> JobMasterPartitionTracker will indexes a partition with both tmID and
> locationID;
> *B. Invokes from JM and ShuffleMaster*
> JobMasterPartitionTracker listens invokes from both JM and
> ShuffleMaster.
> # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a
> TM disconnects, it will check whether the disconnected tmID equals to a
> certain locationID of a partition. If so, tracking of the corresponding
> partition will be stopped.
> # When JobMasterPartitionTracker hears from ShuffleMaster that a data
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the
> corresponding indexes to tmID and locationID firstly, and then release the
> partition by shuffle service types --
> # If the locationID equals to the tmID, it indicates the partition is
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes
> TaskExecutorGateway for the release;
> # If the locationID doesn't equal to tmID, it indicates the partition is
> accommodated by external shuffle service, JMPartitionTracker will invokes
> ShuffleMaster for the release;
--
This message was sent by Atlassian Jira
(v8.3.4#803005)