[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011841#comment-17011841
 ] 

Chesnay Schepler commented on FLINK-14163:
------------------------------------------

Right.

So [~gjy], [~azagrebin] and me have had an offline discussion about this.

Let me summarize our conclusion:

The Problem:
The DefaultScheduler does not wait for the partition registration to complete 
before issuing the deploy call.
 As a result:
 - the TDD of the deployed task may be missing output descriptors since these 
are derived from the {{ResultPartitionDeploymentDescriptor}} returned by the 
ShuffleMaster
 - if the job is cancelled or fails before the partition registration is 
complete we are
 a) not issuing release calls since nothing has been tracked yet
 b) leaking partition tracking entries since the registration process isn't 
canceled and may start tracking partitions after the execution was canceled

The raised issue about input descriptors for downstreams tasks should not be a 
problem since the runtime handles this case via notifications about consumable 
partitions.

The solution:
Ensuring that we wait for the registration to complete (in 
{{DefaultScheduler#assignResourceOrHandleError)}}) should resolve the issue. 
The partition registration and deployment are then executed as a single 
scheduling unit, preventing issues with concurrent cancel calls, and it 
naturally ensures that partition descriptors are available during deployment.
We prefer this option as it should be relatively simple and does not require 
changes to the ShuffleMaster API.


Another orthogonal issue we stumbled on is that if the registration of a single 
partition fails the remaining partitions are not cleaned up on the 
ShuffleMaster, since the cleanup only occurs after partitions have been 
tracked, and partitions are only tracked if all registrations succeed. This 
seems to be a separate issue entirely and I'll open a separate ticket.

> Execution#producedPartitions is possibly not assigned when used
> ---------------------------------------------------------------
>
>                 Key: FLINK-14163
>                 URL: https://issues.apache.org/jira/browse/FLINK-14163
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Zhu Zhu
>            Assignee: Yuan Mei
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to