[
https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282489#comment-17282489
]
Piotr Nowojski edited comment on FLINK-21028 at 2/10/21, 2:40 PM:
------------------------------------------------------------------
Yes, thanks [~kezhuw] good catch. I think you are right. That explains the
problem. For clean shutdown we should never interrupt threads. While:
StreamTask.notifyCheckpointComplete ==> SourceStreamTask.finishTask ==>
SourceStreamTask.cancelTask ==> SourceStreamTask.sourceThread.interrupt
is doing just that leaving the network stack in the inconsistent state, and
later we are entering the "clean shutdown procedure", which attempts to write
more records to the network stack. Here (in 1.11.x) it display itself with
{noformat}
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
{noformat}
While in Flink 1.12.x all kinds of stream corruptions exceptions can happen.
I'm not saying those are the only potential outcomes. If interrupted exception
is thrown/caught from the user code/3rd party library it can also be left in an
invalid state, causing whole bunch of different problems.
was (Author: pnowojski):
Yes, thanks [~kezhuw] good catch. I think you are right. That explains the
problem. For clean shutdown we should never interrupt threads. While:
StreamTask.notifyCheckpointComplete ==> SourceStreamTask.finishTask ==>
SourceStreamTask.cancelTask ==> SourceStreamTask.sourceThread.interrupt
is doing just that leaving the network stack in the inconsistent state, and
later we are entering the "clean shutdown procedure", which attempts to write
more records to the network stack. Here (in 1.11.x) it display itself with
{noformat}
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
{noformat}
While in Flink 1.12.x all kinds of stream corruptions exceptions can happen.
> Streaming application didn't stop properly
> -------------------------------------------
>
> Key: FLINK-21028
> URL: https://issues.apache.org/jira/browse/FLINK-21028
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.11.2
> Reporter: Theo Diefenthal
> Priority: Major
>
> I have a Flink job running on YARN with a disjoint graph, i.e. a single job
> contains two independent and isolated pipelines.
> From time to time, I stop the job with a savepoint like so:
> {code:java}
> flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
> A few days ago, this job suddenly didn't stop properly as usual but ran into
> a possible race condition.
> On the CLI with stop, I received a simple timeout:
> {code:java}
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
> ... 9 more{code}
>
> The root of the problem however is that on a taskmanager, I received an
> exception in shutdown, which lead to restarting (a part) of the pipeline and
> put it back to running state, thus the console command for stopping timed out
> (as the job was (partially) back in running state). the exception which looks
> like a race condition for me in the logs is:
> {code:java}
> 2021-01-12T06:15:15.827877+01:00 WARN
> org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 ->
> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink:
> write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched
> from RUNNING to FAILED.
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
> at
> org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
> ... 13 more
> Caused by: java.lang.RuntimeException
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
> ... 25 more
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
> at
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
> ... 29 more{code}
> I already raised a question regarding this bug on the user mailing list with
> the conclusion to just open a ticket here. Original on user mailing list:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)