[
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973508#comment-16973508
]
Zhu Zhu edited comment on FLINK-14735 at 11/13/19 4:47 PM:
-----------------------------------------------------------
This issue happens because of the high computing complexity to check vertex
inputs to decide whether the vertex can be scheduled. This may happen when the
job in in large scale, with ALL-to-ALL job edges and
InputDependencyConstraint==ALL.
Imagine this case:
{{A(p=1000) -- (ALL-to-ALL & BLOCKING) --> B(p=1000), C - (BLOCKING) -> B,
InputDependencyConstraint==ALL}}
When all sub-tasks of A finishes, it will try scheduling downstream tasks
regarding whether their inputs are ready. If C is not finished, there would be
{{1000 (num partitions) x 1000 (num consumers each partition) x 1000(num
consumed partition for each consumer)=1,000,000,000}} checks, which can takes a
lot of time, blocks JM main RPC thread and lead to RPC timeout.
However, if it is ALL-to-ALL edge consumers, it's not necessary to
{{scheduleOrUpdateConsumers}} on consumers of each partition, since the
consumers of all the partitions are the same.
1. For legacy scheduler, we can reduce the complexity in
{{Execution#markFinished}} by only invoke {{scheduleOrUpdateConsumers}} on only
one {{finishedPartition}} of an {{IntermediateResult}} with ALL-to-ALL out
edge. The computing complexity would then be 1/1000 of that without this
improvement.
2. For NG scheduler, to apply this improvement, we lack a few pre-requisites:
- Interface {{getDistributionPattern()}} is needed in {{Result}}, with the
assumption that each {{IntermediateDataSet}} will have only one consumer
{{JobEdge}}
- {{DistributionPattern}} is not stored in {{ExecutionGraph}}
To make it in NG scheduler, changes are needed as below:
1) Store the {{DistributionPattern}} in {{IntermediateResult}}
2) Introduce {{getDistributionPattern()}} in {{Result}}, and implement it
3) Update {{LazyFromSourcesSchedulingStrategy}} with the improvement for
ALL-to-ALL result partition
[~gjy] WDYT?
was (Author: zhuzh):
This issue happens because of the high computing complexity to check vertex
inputs to decide whether the vertex can be scheduled. This may happen when the
job in in large scale, with ALL-to-ALL job edges and
InputDependencyConstraint==ALL.
Imagine this case:
{{A(p=1000) -- (ALL-to-ALL & BLOCKING) --> B(p=1000), C - (BLOCKING) -> B,
InputDependencyConstraint==ALL}}
When all sub-tasks of A finishes, it will try scheduling downstream tasks
regarding whether their inputs are ready. If C is not finished, there would be
{{1000 (num partitions) x 1000 (num consumers each partition) x 1000(num
consumed partition for each consumer)=1,000,000,000}} checks, which can takes a
lot of time, blocks JM main RPC thread and lead to RPC timeout.
However, if it is ALL-to-ALL edge consumers, it's not necessary to
{{scheduleOrUpdateConsumers}} on consumers of each partition, since the
consumers of all the partitions are the same.
1. For legacy scheduler, we can reduce the complexity in
{{Execution#markFinished}} by only invoke {{scheduleOrUpdateConsumers}} on only
one {{finishedPartition}} of an {{IntermediateResult}} with ALL-to-ALL out
edge. The computing complexity would then be 1/1000 of that without this
improvement.
2. For NG scheduler, to apply this improvement, we lack a few pre-requisites:
- Interface {{getDistributionPattern()}} is needed in {{Result}}, with the
assumption that each {{IntermediateDataSet}} will have only one consumer
{{JobEdge}}
- {{DistributionPattern}} is not stored in {{ExecutionGraph}}
To make it in NG scheduler, changes are needed as below:
1) Store the {{DistributionPattern}} in {{IntermediateResult}}
2) Introduce {{getDistributionPattern()}} in {{Result}}, and implement it
3) Update {{LazyFromSourcesSchedulingStrategy}} with the improvement for
ALL-to-ALL result partition
[~gjy] WDYT?
> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
> Key: FLINK-14735
> URL: https://issues.apache.org/jira/browse/FLINK-14735
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Jingsong Lee
> Priority: Major
> Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to
> timeout.
> JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
> at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
> Source)
> at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
> at
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
> at
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
> at
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
> at
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)