[
https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268016#comment-17268016
]
Theo Diefenthal edited comment on FLINK-21028 at 1/19/21, 4:34 PM:
-------------------------------------------------------------------
Sadly, I don't have any DEBUG logs here as I can't reproduce the issue. This
job runs since ~ 2 years and is stopped and restarted afterwards once a week
and this is the first time that this happened.
There isn't anything meaningful in the INFO-Logs: I can see on all taskmanagers
the logs that all checkpoints succeeded up to the time when I called "stop".
Afterwards, there are only messages from various tasks that they switched their
state from RUNNING to FINISHED. Only this one task prints this exception which
is already printed above and switched its state to FAILING.
It might be helpful to note that the job runs on 18 task managers, each having
one slot. The cluster has 10 or 15 nodes, so definitely less than 18. Some
machines thus run multiple task managers (Shouldn't be a problem, but might be
helpful?).
Note also the feature of the "distinct graph" here: We have a parallelism of 18
(18 Task Managers, 1 Slot), and we have one particual subgraph with parallelism
18 and some other subgraphs with lower parallelism. In total, there are like
200 tasks and some task managers run multiple subgraphs in parallel, even
though they only have one slot... Maybe that is part of the issue here?
was (Author: theod):
Sadly, I don't have any DEBUG logs here as I can't reproduce the issue. This
job runs since ~ 2 years and is stopped and restarted afterwards once a week
and this is the first time that this happened.
There isn't anything meaningful in the INFO-Logs: I can see on all taskmanagers
the logs that all checkpoints succeeded up to the time when I called "stop".
Afterwards, there are only messages from various tasks that they switched their
state from RUNNING to FINISHED besides this one task with this exception which
is already printed above.
> Streaming application didn't stop properly
> -------------------------------------------
>
> Key: FLINK-21028
> URL: https://issues.apache.org/jira/browse/FLINK-21028
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.11.2
> Reporter: Theo Diefenthal
> Priority: Major
>
> I have a Flink job running on YARN with a disjoint graph, i.e. a single job
> contains two independent and isolated pipelines.
> From time to time, I stop the job with a savepoint like so:
> {code:java}
> flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
> A few days ago, this job suddenly didn't stop properly as usual but ran into
> a possible race condition.
> On the CLI with stop, I received a simple timeout:
> {code:java}
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
> ... 9 more{code}
>
> The root of the problem however is that on a taskmanager, I received an
> exception in shutdown, which lead to restarting (a part) of the pipeline and
> put it back to running state, thus the console command for stopping timed out
> (as the job was (partially) back in running state). the exception which looks
> like a race condition for me in the logs is:
> {code:java}
> 2021-01-12T06:15:15.827877+01:00 WARN
> org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 ->
> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink:
> write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched
> from RUNNING to FAILED.
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
> at
> org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
> ... 13 more
> Caused by: java.lang.RuntimeException
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
> ... 25 more
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
> at
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
> ... 29 more{code}
> I already raised a question regarding this bug on the user mailing list with
> the conclusion to just open a ticket here. Original on user mailing list:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)