[
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355540#comment-17355540
]
Jin Xing commented on FLINK-22676:
----------------------------------
Thanks Till for help on this ~
{quote}This is a valid point. I still would suggest making this improvement a
separate step after we have completed the groundwork to support basic remote
shuffle services.
{quote}
I understood your comments. It make sense to make the improvement as a separate
step – – I will put it in another JIRA.
At the first step, we will focus and resolve whether to stop tracking a
partition when a TM disconnects. JMPartitionTracker will use
ShuffleDescriptor#storesLocalResourcesOn to decide if a partition is stored
locally on TM, if so, tracking will be stopped. I will create a PR to
demonstrate this.
> The partition tracker should support remote shuffle properly
> ------------------------------------------------------------
>
> Key: FLINK-22676
> URL: https://issues.apache.org/jira/browse/FLINK-22676
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination, Runtime / Network
> Reporter: Jin Xing
> Priority: Major
>
> In current Flink, data partition is bound with the ResourceID of TM in
> Execution#startTrackingPartitions and partition tracker will stop tracking
> corresponding partitions when a TM
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle
> data is bound with computing resource (TM). It works fine for internal
> shuffle service, but doesn't for remote shuffle service. Note that shuffle
> data is accommodated on remote, the lifecycle of a completed partition is
> capable to be decoupled with TM, i.e. TM is totally fine to be released when
> no computing task on it and further shuffle reading requests could be
> directed to remote shuffle cluster. In addition, when a TM is lost, its
> completed data partitions on remote shuffle cluster could avoid reproducing.
>
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up
> partition's locationID (where the partition is located) and tmID (which TM
> the partition is produced from). In TM internal shuffle, partition's
> locationID is the same with tmID, but it is not in remote shuffle;
> JobMasterPartitionTracker as an independent component should be able to
> differentiate locationID and tmID of a partition, thus to handle the
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with
> both locationID and tmID. The process of registration and unregistration will
> be like below:
> *A. Partiiton Registration*
> # Execution#registerProducedPartitions registers partition to ShuffleMaster
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn
> only returns the location of the producing TM if the partition occupies local
> resources there.
> We proposes to change a proper name of this method and always return the
> locationID of the partition. It might be as below:
> {code:java}
> ResourceID getLocationID(); {code}
> # Execution#registerProducePartitions then registers partition to
> JMPartitionTracker with tmID (ResourceID of TaskManager from
> TaskManagerLocation) and the locationID (acquired in step 1).
> JobMasterPartitionTracker will indexes a partition with both tmID and
> locationID;
> *B. Invokes from JM and ShuffleMaster*
> JobMasterPartitionTracker listens invokes from both JM and
> ShuffleMaster.
> # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a
> TM disconnects, it will check whether the disconnected tmID equals to a
> certain locationID of a partition. If so, tracking of the corresponding
> partition will be stopped.
> # When JobMasterPartitionTracker hears from ShuffleMaster that a data
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the
> corresponding indexes to tmID and locationID firstly, and then release the
> partition by shuffle service types --
> # If the locationID equals to the tmID, it indicates the partition is
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes
> TaskExecutorGateway for the release;
> # If the locationID doesn't equal to tmID, it indicates the partition is
> accommodated by external shuffle service, JMPartitionTracker will invokes
> ShuffleMaster for the release;
--
This message was sent by Atlassian Jira
(v8.3.4#803005)