[
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974150#comment-16974150
]
Stephan Ewen edited comment on FLINK-14735 at 11/14/19 11:32 AM:
-----------------------------------------------------------------
[~zhuzh] Do you know what causes the heartbeat timeouts? Is it the mass of RPC
events coming in and overwhelming the JobMaster? Or does the handling of
individual events take very long?
As a separate thought: I think it is better to not use Java Streams in that
part of the Execution Graph. This is performance critical code, and streams are
not the best match for that.
was (Author: stephanewen):
[~zhuzh] Do you know what causes the heartbeat timeouts? Is it the mass of RPC
events coming in and overwhelming the JobMaster? Or does the handling of
individual events take very long?
As a separate thought: I think no Java Streams should be used in that part of
the Execution Graph.
This is clearly performance critical code, and streams are not a good match for
that.
> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
> Key: FLINK-14735
> URL: https://issues.apache.org/jira/browse/FLINK-14735
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Jingsong Lee
> Priority: Major
> Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to
> timeout.
> JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
> at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
> Source)
> at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
> at
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
> at
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
> at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
> at
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
> at
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
> at
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)