[
https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267989#comment-17267989
]
Theo Diefenthal commented on FLINK-21028:
-----------------------------------------
Hi Till, thanks for your feedback.
Originally on the mailing list, I split it up into three different issue in
order to focus on different anspects as for me, those issues occured at the
same time but still are somehow different.
In this issue here, I assume definitely a bug in Flinks shutdown behavior,
something like a race condition. Some Buffer was already closed even though it
was still assumed to be opened. (No hardware failure or anything else, just
flink internal)
In FLINK-21029, I raise a more general question: If _anything unexpected_
occurs during a requested job shutdown: Should we force shutdown or try to
restart? So this is not directly related to the bug here but general behavior.
In FLINK-21030, I see a follow up error on top of this bug: If a failing job
starts to recover, it seems to don't recover the entire pipeline but only the
connected components.
I wasn't sure whether to open a single ticket for all of them or just creating
one, but seeing your comment on FLINK-21029, I think it was a good decision to
split them up: We can discuss there how Flink should handle any failure in
shutdown whereas this bug here stays "clean" and focused on why under this
special circumstances there was this race condition.
> Streaming application didn't stop properly
> -------------------------------------------
>
> Key: FLINK-21028
> URL: https://issues.apache.org/jira/browse/FLINK-21028
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.11.2
> Reporter: Theo Diefenthal
> Priority: Major
>
> I have a Flink job running on YARN with a disjoint graph, i.e. a single job
> contains two independent and isolated pipelines.
> From time to time, I stop the job with a savepoint like so:
> {code:java}
> flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
> A few days ago, this job suddenly didn't stop properly as usual but ran into
> a possible race condition.
> On the CLI with stop, I received a simple timeout:
> {code:java}
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
> ... 9 more{code}
>
> The root of the problem however is that on a taskmanager, I received an
> exception in shutdown, which lead to restarting (a part) of the pipeline and
> put it back to running state, thus the console command for stopping timed out
> (as the job was (partially) back in running state). the exception which looks
> like a race condition for me in the logs is:
> {code:java}
> 2021-01-12T06:15:15.827877+01:00 WARN
> org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 ->
> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink:
> write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched
> from RUNNING to FAILED.
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
> at
> org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
> ... 13 more
> Caused by: java.lang.RuntimeException
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
> ... 25 more
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
> at
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
> ... 29 more{code}
> I already raised a question regarding this bug on the user mailing list with
> the conclusion to just open a ticket here. Original on user mailing list:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)