[ 
https://issues.apache.org/jira/browse/FLINK-22676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354952#comment-17354952
 ] 

Till Rohrmann commented on FLINK-22676:
---------------------------------------

Copying my ML answer over to this ticket:

Thanks for starting this discussion and the initiative to implement a remote 
shuffle service [[email protected]]. It has always been the idea of the 
ShuffleService abstraction to make this possible and we probably have 
overlooked some details.

What I have understood from your description, you would like to introduce the 
{{locationID}} which points to the location where the result partition is 
stored (potentially external). Using the {{locationID}} and the {{tmID}} it is 
possible to say whether the partition is stored externally or not.

I think that deciding whether the partition is stored externally or not (or 
more precisely whether the partition occupies resources on the TM) can be 
answered by using the {{ShuffleDescriptor.storesLocalResourcesOn}} method. If 
it returns some ResourceID then we have to tell the TM about the release. If 
not, then we only tell the shuffle master about the partition release. How the 
data can be accessed on the external system is then encapsulated by the 
{{ShuffleMaster}} and the {{ShuffleDescriptor}}. The logic for releasing the 
partitions on the TMs and the {{ShuffleMaster}} should already be implemented 
in the {{JobMasterPartitionTrackerImpl}}.

I think what we need to change is that we don't stop the tracking of completed 
partitions when a TM on which the producers run disconnects and if we store the 
result partition externally. This is required to make partitions survive in 
case of TM failures. What this also requires is to distinguish between finished 
and in-progress partitions.

What indeed is currently not implemented is the channel from the ShuffleMaster 
to the {{JobMasterPartitionTrackerImpl}}. This is, however, not a big problem 
atm. If the {{ShuffleMaster}} should lose a result partition, then a reading 
task should fail with a {{PartitionException}} which will invalidate the 
partition on the {{JobMasterPartitionTracker}} so that it is reproduced. 
Listening to the {{ShuffleMaster}} would be an optimization to learn more 
quickly about this fact and to avoid a restart cycle.

Did I understand you correctly, Jin, and do my comments make sense?

> The partition tracker should support remote shuffle properly
> ------------------------------------------------------------
>
>                 Key: FLINK-22676
>                 URL: https://issues.apache.org/jira/browse/FLINK-22676
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination, Runtime / Network
>            Reporter: Jin Xing
>            Priority: Major
>
> In current Flink, data partition is bound with the ResourceID of TM in 
> Execution#startTrackingPartitions and partition tracker will stop tracking 
> corresponding partitions when a TM 
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle 
> data is bound with computing resource (TM). It works fine for internal 
> shuffle service, but doesn't for remote shuffle service. Note that shuffle 
> data is accommodated on remote, the lifecycle of a completed partition is 
> capable to be decoupled with TM, i.e. TM is totally fine to be released when 
> no computing task on it and further shuffle reading requests could be 
> directed to remote shuffle cluster. In addition, when a TM is lost, its 
> completed data partitions on remote shuffle cluster could avoid reproducing.
>  
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed up 
> partition's locationID (where the partition is located) and tmID (which TM 
> the partition is produced from). In TM internal shuffle, partition's 
> locationID is the same with tmID, but it is not in remote shuffle; 
> JobMasterPartitionTracker as an independent component should be able to 
> differentiate locationID and tmID of a partition, thus to handle the 
> lifecycle of a partition properly;
> We propose that JobMasterPartitionTracker manages and indexes partitions with 
> both locationID and tmID. The process of registration and unregistration will 
> be like below:
> *A. Partiiton Registration*
>  # Execution#registerProducedPartitions registers partition to ShuffleMaster 
> and get a ShuffleDescriptor. Current ShuffleDescriptor#storesLocalResourcesOn 
> only returns the location of the producing TM if the partition occupies local 
> resources there.
>  We proposes to change a proper name of this method and always return the 
> locationID of the partition. It might be as below:
> {code:java}
>  ResourceID getLocationID();  {code}
>  # Execution#registerProducePartitions then registers partition to 
> JMPartitionTracker with tmID (ResourceID of TaskManager from 
> TaskManagerLocation) and the locationID (acquired in step 1). 
> JobMasterPartitionTracker will indexes a partition with both tmID and 
> locationID;
> *B. Invokes from JM and ShuffleMaster*
>       JobMasterPartitionTracker listens invokes from both JM and 
> ShuffleMaster.
>  # When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a 
> TM disconnects, it will check whether the disconnected tmID equals to a 
> certain locationID of a partition. If so, tracking of the corresponding 
> partition will be stopped.
>  # When JobMasterPartitionTracker hears from ShuffleMaster that a data 
> location gets lost, it will unregister corresponding partitions by locationID;
> *C. Partition Unregistration*
> When unregister a partition, JobMasterPartitionTracker removes the 
> corresponding indexes to tmID and locationID firstly, and then release the 
> partition by shuffle service types --
>  # If the locationID equals to the tmID, it indicates the partition is 
> accommodated by TM internal shuffle service, JMPartitionTracker will invokes 
> TaskExecutorGateway for the release;
>  # If the locationID doesn't equal to tmID, it indicates the partition is 
> accommodated by external shuffle service, JMPartitionTracker will invokes 
> ShuffleMaster for the release;



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to