[
https://issues.apache.org/jira/browse/FLINK-32876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu updated FLINK-32876:
----------------------------
Affects Version/s: 1.17.1
> ExecutionTimeBasedSlowTaskDetector treats unscheduled tasks as slow tasks and
> causes speculative execution to fail.
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-32876
> URL: https://issues.apache.org/jira/browse/FLINK-32876
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.18.0, 1.17.1
> Reporter: Junrui Li
> Assignee: Junrui Li
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
>
> When we enable speculative execution and configure job with the following
> configuration:
> {code:java}
> execution.batch.speculative.enabled: true
> slow-task-detector.execution-time.baseline-ratio: 0.0
> slow-task-detector.execution-time.baseline-lower-bound: 0s{code}
> The ExecutionTimeBasedSlowTaskDetector will identify ExecutionJobVertex that
> has not yet been scheduled as slow tasks and notify them to the
> SpeculativeScheduler. However, the SpeculativeScheduler requires that the
> corresponding ExecutionVertex has entered the scheduled state before
> scheduling backup tasks. If this requirement is not met, it will result in
> speculative execution failure.
> The exception stack trace is as follows:
> {code:java}
> java.lang.IllegalStateException: Execution vertex
> b3f44e8b1dc132ff2a47f7955c75ef7d_0 does not have a recorded version at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getCurrentVersion(ExecutionVertexVersioner.java:71)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.lambda$getExecutionVertexVersions$1(ExecutionVertexVersioner.java:89)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> ~[?:1.8.0_333] at
> java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1580)
> ~[?:1.8.0_333] at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> ~[?:1.8.0_333] at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> ~[?:1.8.0_333] at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> ~[?:1.8.0_333] at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:1.8.0_333] at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> ~[?:1.8.0_333] at
> org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getExecutionVertexVersions(ExecutionVertexVersioner.java:90)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler.notifySlowTasks(SpeculativeScheduler.java:377)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector.lambda$scheduleTask$1(ExecutionTimeBasedSlowTaskDetector.java:129)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_333] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ~[?:1.8.0_333] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
> ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
> ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
> ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
> ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_333]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
> [?:1.8.0_333] at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
> [?:1.8.0_333] at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> [?:1.8.0_333] {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)