[
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974896#comment-16974896
]
Zhu Zhu commented on FLINK-14735:
---------------------------------
I gave it a thought again and looks we can improve it in a much simpler way.
*The changes to expose ALL-to-ALL {{DistributionPattern}} is not needed for
this improvement.*
The {{LazyFromSourcesSchedulingStrategy}} for NG scheduler will deduplicate the
consumer vertices now. However, it is slow in this case because the input
checking for legacy scheduler is invoked (the schedule block
inExecution#scheduleOrUpdateConsumers). Skipping it by checking the scheduler
in ahead would improve the performance a lot, like this 1 line
[change|https://github.com/zhuzhurk/flink/commit/81908ba35b8c087cd9fab108857927e88d3d2b1c].
If we also want this improvement for legacy scheduler, we need to split
{{scheduleOrUpdateConsumers}} to {{scheduleConsumers(vertices)}} and
{{updateConsumers(edges)}}. And we can deduplicate the vertices for
{{scheduleConsumers(vertices)}}. {{updateConsumers(edges)}} need to build
partition info based on the edge so the param cannot be simplified. Here'a
sample
[change|https://github.com/zhuzhurk/flink/commit/17caa528b0e480c0d773f6b5fb24ae264af8a290].
Given that we are to make NG scheduler default and deprecate legacy scheduler,
maybe we can do it for NG scheduler only.
[~sewen] WDYT?
> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
> Key: FLINK-14735
> URL: https://issues.apache.org/jira/browse/FLINK-14735
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Jingsong Lee
> Priority: Major
> Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to
> timeout.
> JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
> at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
> Source)
> at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
> at
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
> at
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
> at
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
> at
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)