[ 
https://issues.apache.org/jira/browse/FLINK-21028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282297#comment-17282297
 ] 

Piotr Nowojski commented on FLINK-21028:
----------------------------------------

{quote}
Can this be related to the disjoint job graph setup? 
{quote}
[~TheoD] I don't think so. At least I do not see how, but as I don't understand 
how is it happening, I might be wrong.

{quote}
After FLINK-19297, symptom could be different. A null but not full buffer with 
later EndOfPartitionEvent could introduce a corrupted record.
{quote}
[~kezhuw] you might be right. Keep also in mind, that by looking at the stack 
trace here, this "later", doesn't necessarily have to be 
{{EndOfPartitionEvent}}, but "later" could be records that are being emitted 
during the closing procedure:

{noformat}
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
{noformat}
{quote}
Actually, I am kind of confused about the pipeline. Theo Diefenthal I saw the 
chained task was named as Source: rawdata_source1 -> validation_source1 -> 
enrich_source1 -> map_json_source1 -> Sink: write_to_kafka_source1). If this is 
a source to sink chained task, how this could happen ? There is no output at 
all.
{quote}
Good question. I second that. Also it's suspicious that on the stack trace we 
have 6 operator wrappers, while the task name suggests there should be at most 
5.

[~TheoD], please share the logs with us, we might spot something that you 
missed, or at the very least we would understand the context better. Could you 
also share the JobGraph with us?


> Streaming application didn't stop properly 
> -------------------------------------------
>
>                 Key: FLINK-21028
>                 URL: https://issues.apache.org/jira/browse/FLINK-21028
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.11.2
>            Reporter: Theo Diefenthal
>            Priority: Major
>
> I have a Flink job running on YARN with a disjoint graph, i.e. a single job 
> contains two independent and isolated pipelines.
> From time to time, I stop the job with a savepoint like so:
> {code:java}
> flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS 
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}{code}
> A few days ago, this job suddenly didn't stop properly as usual but ran into 
> a possible race condition.
> On the CLI with stop, I received a simple timeout:
> {code:java}
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>  at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>  at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>  ... 9 more{code}
>  
> The root of the problem however is that on a taskmanager, I received an 
> exception in shutdown, which lead to restarting (a part) of the pipeline and 
> put it back to running state, thus the console command for stopping timed out 
> (as the job was (partially) back in running state). the exception which looks 
> like a race condition for me in the logs is:
> {code:java}
> 2021-01-12T06:15:15.827877+01:00 WARN 
> org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> 
> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: 
> write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched 
> from RUNNING to FAILED.
> java.util.concurrent.ExecutionException: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
>  at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
>  at 
> org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
>  at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>  ... 13 more
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>  ... 25 more
> Caused by: java.lang.IllegalStateException
>  at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>  at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
>  at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
>  at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
>  ... 29 more{code}
> I already raised a question regarding this bug on the user mailing list with 
> the conclusion to just open a ticket here. Original on user mailing list: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to