[ 
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974150#comment-16974150
 ] 

Stephan Ewen edited comment on FLINK-14735 at 11/14/19 11:32 AM:
-----------------------------------------------------------------

[~zhuzh] Do you know what causes the heartbeat timeouts? Is it the mass of RPC 
events coming in and overwhelming the JobMaster? Or does the handling of 
individual events take very long?

As a separate thought: I think it is better to not use Java Streams in that 
part of the Execution Graph. This is performance critical code, and streams are 
not the best match for that.


was (Author: stephanewen):
[~zhuzh] Do you know what causes the heartbeat timeouts? Is it the mass of RPC 
events coming in and overwhelming the JobMaster? Or does the handling of 
individual events take very long?

As a separate thought: I think no Java Streams should be used in that part of 
the Execution Graph.
This is clearly performance critical code, and streams are not a good match for 
that.

> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
>                 Key: FLINK-14735
>                 URL: https://issues.apache.org/jira/browse/FLINK-14735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Jingsong Lee
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to 
> timeout.
>  JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
>         at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>         at 
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
>         at 
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>         at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
>  Source)
>         at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
>         at 
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
>         at 
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
>         at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to