[
https://issues.apache.org/jira/browse/FLINK-23808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403846#comment-17403846
]
Yun Gao commented on FLINK-23808:
---------------------------------
Hi [~pnowojski], this is indeed discovered by our IT Case, and the stack is
{code:java}
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
~[?:1.8.0_271]
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
~[?:1.8.0_271]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:362)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
~[classes/:?]
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.advanceToEndOfEventTime(SourceStreamTask.java:164)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:530)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:358)
~[classes/:?]
at
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
~[classes/:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_271]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
~[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
~[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
~[classes/:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processWatermark(TimestampsAndWatermarksOperator.java:125)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.advanceToEndOfEventTime(SourceStreamTask.java:164)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:530)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:358)
~[classes/:?]
at
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
~[classes/:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_271]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
~[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
~[classes/:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
~[classes/:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
{code}
> Bypass operators when advanceToEndOfEventTime for both legacy and new source
> tasks
> ----------------------------------------------------------------------------------
>
> Key: FLINK-23808
> URL: https://issues.apache.org/jira/browse/FLINK-23808
> Project: Flink
> Issue Type: Sub-task
> Affects Versions: 1.14.0
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently when end of data processing, the sources would advance to max
> watermark and emit MAX_WATERMARK. However, currently the output used is the
> mainOperatorOutput, namely the output of the source operator. If the source
> operator is chained with other operators and the tasks are finished on
> restore, there would be problems since the following operators should be
> skipped, but some operators still have actions when the watermark is
> advanced, like WatermarkAssignerOperator, which might cause problems.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)