Chesnay Schepler created FLINK-22211:
----------------------------------------
Summary: DataStream.collect() logs warnings if job is not
initialized yet
Key: FLINK-22211
URL: https://issues.apache.org/jira/browse/FLINK-22211
Project: Flink
Issue Type: Sub-task
Components: Client / Job Submission
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
Fix For: 1.13.0
When using {{DataStream.collect()}} we always have an excpetion in the log for
the first fetch attempt, before the JM is ready.
The loop retries and the program succeeds, but the exception in the log raises
confusion about whether there is a swallowed but impactful error.
{code}
7199 [main] WARN
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An
exception occurs when fetching query results
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException:
Unable to get JobMasterGateway for initializing job. The requested operation is
not available while the JobManager is initializing.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
~[?:?]
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155)
~[classes/:?]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
[classes/:?]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
[classes/:?]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
[classes/:?]
at
org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320)
[classes/:?]
at
org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1303)
[classes/:?]
at
org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.testLostOperatorEventsLeadsToRecovery(OperatorEventSendingCheckpointITCase.java:88)
[test-classes/:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
[junit-4.12.jar:4.12]
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
[junit-4.12.jar:4.12]
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
[junit-4.12.jar:4.12]
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
[junit-4.12.jar:4.12]
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
[junit-4.12.jar:4.12]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
[junit-4.12.jar:4.12]
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
[junit-rt.jar:?]
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
[junit-rt.jar:?]
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
[junit-rt.jar:?]
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
[junit-rt.jar:?]
Caused by:
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException:
Unable to get JobMasterGateway for initializing job. The requested operation is
not available while the JobManager is initializing.
at
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:892)
~[classes/:?]
at
org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:902)
~[classes/:?]
at
org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:724)
~[classes/:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[classes/:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.12.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)