[ 
https://issues.apache.org/jira/browse/FLINK-23808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403846#comment-17403846
 ] 

Yun Gao commented on FLINK-23808:
---------------------------------

Hi [~pnowojski], this is indeed discovered by our IT Case, and the stack is


{code:java}
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
~[?:1.8.0_271]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_271]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:362)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
 ~[classes/:?]
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.advanceToEndOfEventTime(SourceStreamTask.java:164)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:530)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:358)
 ~[classes/:?]
        at 
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
 ~[classes/:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_271]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) 
~[classes/:?]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
Caused by: java.lang.NullPointerException
        at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processWatermark(TimestampsAndWatermarksOperator.java:125)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.advanceToEndOfEventTime(SourceStreamTask.java:164)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:530)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:358)
 ~[classes/:?]
        at 
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
 ~[classes/:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_271]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
 ~[classes/:?]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786) 
~[classes/:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) 
~[classes/:?]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_271]
{code}


> Bypass operators when advanceToEndOfEventTime for both legacy and new source 
> tasks
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-23808
>                 URL: https://issues.apache.org/jira/browse/FLINK-23808
>             Project: Flink
>          Issue Type: Sub-task
>    Affects Versions: 1.14.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> Currently when end of data processing, the sources would advance to max 
> watermark and emit MAX_WATERMARK. However, currently the output used is the 
> mainOperatorOutput, namely the output of the source operator. If the source 
> operator is chained with other operators and the tasks are finished on 
> restore, there would be problems  since the following operators should be 
> skipped, but some operators still have actions when the watermark is 
> advanced, like WatermarkAssignerOperator, which might cause problems.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to