[ 
https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-21028:
----------------------------------
    Component/s: Runtime / Task

> Streaming application didn't stop properly 
> -------------------------------------------
>
>                 Key: FLINK-21028
>                 URL: https://issues.apache.org/jira/browse/FLINK-21028
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.11.2
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> I have a Flink job running on YARN with a disjoint graph, i.e. a single job 
> contains two independent and isolated pipelines.
> From time to time, I stop the job with a savepoint like so:
> {code:java}
> flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS 
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
> A few days ago, this job suddenly didn't stop properly as usual but ran into 
> a possible race condition.
> On the CLI with stop, I received a simple timeout:
> {code:java}
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>  at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>  ... 9 more{code}
>  
> The root of the problem however is that on a taskmanager, I received an 
> exception in shutdown, which lead to restarting (a part) of the pipeline and 
> put it back to running state, thus the console command for stopping timed out 
> (as the job was (partially) back in running state). the exception which looks 
> like a race condition for me in the logs is:
> {code:java}
> 2021-01-12T06:15:15.827877+01:00 WARN 
> org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> 
> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: 
> write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched 
> from RUNNING to FAILED.
> java.util.concurrent.ExecutionException: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
>  at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
>  at 
> org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
>  at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>  ... 13 more
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>  ... 25 more
> Caused by: java.lang.IllegalStateException
>  at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>  at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
>  at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
>  ... 29 more{code}
> I already raised a question regarding this bug on the user mailing list with 
> the conclusion to just open a ticket here. Original on user mailing list: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to