[ 
https://issues.apache.org/jira/browse/FLINK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008606#comment-17008606
 ] 

Zhu Zhu commented on FLINK-14163:
---------------------------------

Thanks [~zjwang] for the feedbacks.
I think partition releasing could also be a problem for both legacy and 
DefaultScheduler. If a task is released before its partitions are successfully 
registered to shuffle master, JM will see no partition to release when 
cancelling the task, so that the partitions could be leaked in shuffle master.
And agree that making {{ShuffleMaster#registerPartitionWithProducer}} a sync 
interface would be much easier to avoid such possible issues at the moment.
[~gjy] do you think we need to fix this in 1.10? The major problem (streaming 
job tasks get deployed without knowing its inputs) is only possible to happen 
with DefaultScheduler (with custom shuffle service). I'd prefer to fix it in 
1.10 if we can have an easy and low risk fix.

> Execution#producedPartitions is possibly not assigned when used
> ---------------------------------------------------------------
>
>                 Key: FLINK-14163
>                 URL: https://issues.apache.org/jira/browse/FLINK-14163
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Zhu Zhu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Currently {{Execution#producedPartitions}} is assigned after the partitions 
> have completed the registration to shuffle master in 
> {{Execution#registerProducedPartitions(...)}}.
> The partition registration is an async interface 
> ({{ShuffleMaster#registerPartitionWithProducer(...)}}), so 
> {{Execution#producedPartitions}} is possible[1] not set when used. 
> Usages includes:
> 1. deploying this task, so that the task may be deployed without its result 
> partitions assigned, and the job would hang. (DefaultScheduler issue only, 
> since legacy scheduler handled this case)
> 2. generating input descriptors for downstream tasks: 
> 3. retrieve {{ResultPartitionID}} for partition releasing: 
> [1] If a user uses Flink default shuffle master {{NettyShuffleMaster}}, it is 
> not problematic at the moment since it returns a completed future on 
> registration, so that it would be a synchronized process. However, if users 
> implement their own shuffle service in which the 
> {{ShuffleMaster#registerPartitionWithProducer}} returns an pending future, it 
> can be a problem. This is possible since customizable shuffle service is open 
> to users since 1.9 (via config "shuffle-service-factory.class").
> To avoid issues to happen, we may either 
> 1. fix all the usages of {{Execution#producedPartitions}} regarding the async 
> assigning, or 
> 2. change {{ShuffleMaster#registerPartitionWithProducer(...)}} to a sync 
> interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to