[ 
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973508#comment-16973508
 ] 

Zhu Zhu edited comment on FLINK-14735 at 11/14/19 2:49 AM:
-----------------------------------------------------------

This issue happens because of the high computing complexity to check vertex 
inputs to decide whether the vertex can be scheduled. This may happen when the 
job in in large scale, with ALL-to-ALL job edges and 
InputDependencyConstraint==ALL.

Imagine this case:
{{A(p=1000) -- (ALL-to-ALL & BLOCKING) --> B(p=1000), C - (BLOCKING) -> B, 
InputDependencyConstraint==ALL}}
When all sub-tasks of A finishes, it will try scheduling downstream tasks 
regarding whether their inputs are ready. If C is not finished, there would be 
{{1000 (num partitions) x 1000 (num consumers each partition) x 1000(num 
consumed partition for each consumer)=1,000,000,000}} checks, which can takes a 
lot of time, blocks JM main RPC thread and lead to RPC timeout.

However, if it is ALL-to-ALL edge consumers, it's not necessary to 
{{scheduleOrUpdateConsumers}} on consumers of each partition, since the 
consumers of all the partitions are the same. 
1. For legacy scheduler, we can reduce the complexity in 
{{Execution#markFinished}} by only invoke {{scheduleOrUpdateConsumers}} on only 
one {{finishedPartition}} of an {{IntermediateResult}} with {{ALL-to-ALL}} out 
edge. The computing complexity would then be 1/1000 of that without this 
improvement. 
This, however, would require the 
{{IntermediateResultPartition}}/{{IntermediateResult}} to known its 
{{DistributionPattern}}. The {{DistributionPattern}} of a 
{{IntermediateResult}} is the same as that of its out {{JobEdge}}(each 
{{IntermediateDataSet}} has one and only one {{JobEdge}}, according to current 
implementations and existing assumptions).
2. For NG scheduler, to apply this improvement, an interface 
{{getDistributionPattern()}} is needed in {{Result}}

To make it, changes are needed as below:
 1) Store the {{DistributionPattern}} in {{IntermediateResult}}
 2) Introduce {{getDistributionPattern()}} in {{Result}}, and implement it
 3) Update {{Execution#markFinished}} with the improvement for ALL-to-ALL 
result partition
 4) Update {{LazyFromSourcesSchedulingStrategy}} with the improvement for 
ALL-to-ALL result partition

[~gjy] WDYT?


was (Author: zhuzh):
This issue happens because of the high computing complexity to check vertex 
inputs to decide whether the vertex can be scheduled. This may happen when the 
job in in large scale, with ALL-to-ALL job edges and 
InputDependencyConstraint==ALL.

Imagine this case:
{{A(p=1000) -- (ALL-to-ALL & BLOCKING) --> B(p=1000), C - (BLOCKING) -> B, 
InputDependencyConstraint==ALL}}
When all sub-tasks of A finishes, it will try scheduling downstream tasks 
regarding whether their inputs are ready. If C is not finished, there would be 
{{1000 (num partitions) x 1000 (num consumers each partition) x 1000(num 
consumed partition for each consumer)=1,000,000,000}} checks, which can takes a 
lot of time, blocks JM main RPC thread and lead to RPC timeout.

However, if it is ALL-to-ALL edge consumers, it's not necessary to 
{{scheduleOrUpdateConsumers}} on consumers of each partition, since the 
consumers of all the partitions are the same. 
1. For legacy scheduler, we can reduce the complexity in 
{{Execution#markFinished}} by only invoke {{scheduleOrUpdateConsumers}} on only 
one {{finishedPartition}} of an {{IntermediateResult}} with ALL-to-ALL out 
edge. The computing complexity would then be 1/1000 of that without this 
improvement.
2. For NG scheduler, to apply this improvement, we lack a few pre-requisites:
 - Interface {{getDistributionPattern()}} is needed in {{Result}}, with the 
assumption that each {{IntermediateDataSet}} will have only one consumer 
{{JobEdge}}
 - {{DistributionPattern}} is not stored in {{ExecutionGraph}}

To make it in NG scheduler, changes are needed as below:
 1) Store the {{DistributionPattern}} in {{IntermediateResult}}
 2) Introduce {{getDistributionPattern()}} in {{Result}}, and implement it
 3) Update {{LazyFromSourcesSchedulingStrategy}} with the improvement for 
ALL-to-ALL result partition

[~gjy] WDYT?

> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
>                 Key: FLINK-14735
>                 URL: https://issues.apache.org/jira/browse/FLINK-14735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Jingsong Lee
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to 
> timeout.
>  JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
>         at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>         at 
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
>         at 
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>         at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
>  Source)
>         at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
>         at 
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
>         at 
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
>         at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to