[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706811#comment-16706811
 ] 

Zhu Zhu commented on FLINK-10945:
---------------------------------

Here's the design doc and welcome comments.

https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing

> Avoid resource deadlocks for finite stream jobs when resources are limited
> --------------------------------------------------------------------------
>
>                 Key: FLINK-10945
>                 URL: https://issues.apache.org/jira/browse/FLINK-10945
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.7.1
>            Reporter: Zhu Zhu
>            Assignee: Zhu Zhu
>            Priority: Major
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
> Here one input corresponds to an *IntermediateResult*. The constraint can be 
> configured for *certain vertex or job-wide*. When the constraint is *ALL*, a 
> task will be launched after all its upstream tasks have been deployed, 
> avoiding case 2 of resource deadlocks. By setting all job edges to *BLOCKING* 
> and all vertex InputDependencyConstraint to *ALL*, we can make sure that all 
> tasks can finish once they are deployed, thus no resource deadlock will 
> happen.
> Making the constraint *ANY* job-wide will keep the job scheduling behaves in 
> the same way as current version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to