[
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16978430#comment-16978430
]
Stephan Ewen commented on FLINK-14735:
--------------------------------------
I am pushing the first part of your fix for master (new fix for new scheduler)
and looking at the legacy scheduler fix now.
For 1.8 we only need the legacy scheduler fix, correct?
> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
> Key: FLINK-14735
> URL: https://issues.apache.org/jira/browse/FLINK-14735
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Jingsong Lee
> Priority: Major
> Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to
> timeout.
> JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
> at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
> Source)
> at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
> at
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
> at
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
> at
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
> at
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)