[
https://issues.apache.org/jira/browse/FLINK-19109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188635#comment-17188635
]
Roman Khachatryan edited comment on FLINK-19109 at 9/2/20, 4:49 PM:
--------------------------------------------------------------------
I see that with chaining enabled TimestampsAndWatermarksOperator works as
expected - until ContinuousFileReaderOperator starts reading first elements.
After that, it schedules a timer which is executed with 1-2 second delay.
This delay is caused by MailboxProcessor not picking up a mail of an already
fired timer (timer services are OK).
This seems reasonable since the priority of operators MailboxExecutor is
defined by its chain index.
(the chain is ContinuousFileReaderOperator -> Map ->
TimestampsAndWatermarksOperator).
[~pnowojski] does it makes sense to you?
Do you have any idea how to fix this?
Update:
In 1.10, upon getting END_OF_INPUT (which is received early - after adding all
splits to a queue), StreamTask closes all operators.
ContinuousFileReaderOperator.close calls waitSplitReaderFinished() which
doesn't allow Task thread to process any mails. Once it's done, mails are
drained and the remaining timer callback is executed.
In 1.11, ContinuousFileReaderOperator.close should allow timers to execute and
(thanks to FLINK-14231) services shouldn't be closed. But as
ContinuousFileReaderOperator -enqueues a higher priority mail-, ignores mails
if mailbox loop is stopped, it doesn't help.
was (Author: roman_khachatryan):
I see that with chaining enabled TimestampsAndWatermarksOperator works as
expected - until ContinuousFileReaderOperator starts reading first elements.
After that, it schedules a timer which is executed with 1-2 second delay.
This delay is caused by MailboxProcessor not picking up a mail of an already
fired timer (timer services are OK).
This seems reasonable since the priority of operators MailboxExecutor is
defined by its chain index.
(the chain is ContinuousFileReaderOperator -> Map ->
TimestampsAndWatermarksOperator).
[~pnowojski] does it makes sense to you?
Do you have any idea how to fix this?
Update:
In 1.10, upon getting END_OF_INPUT (which is received early - after adding all
splits to a queue), StreamTask closes all operators.
ContinuousFileReaderOperator.close calls waitSplitReaderFinished() which
doesn't allow Task thread to process any mails. Once it's done, mails are
drained and the remaining timer callback is executed.
In 1.11, ContinuousFileReaderOperator.close should allow timers to execute and
(thanks to FLINK-14231) services shouldn't be closed. But as
ContinuousFileReaderOperator enqueues a higher priority mail, it doesn't help.
> Split Reader eats chained periodic watermarks
> ---------------------------------------------
>
> Key: FLINK-19109
> URL: https://issues.apache.org/jira/browse/FLINK-19109
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.10.0, 1.10.1, 1.11.0, 1.10.2, 1.11.1
> Reporter: David Anderson
> Assignee: Roman Khachatryan
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> Attempting to generate watermarks chained to the Split Reader /
> ContinuousFileReaderOperator, as in
> {code:java}
> SingleOutputStreamOperator<Event> results = env
> .readTextFile(...)
> .map(...)
> .assignTimestampsAndWatermarks(bounded)
> .keyBy(...)
> .process(...);{code}
> leads to the Watermarks failing to be produced. Breaking the chain, via
> {{disableOperatorChaining()}} or a {{rebalance}}, works around the bug. Using
> punctuated watermarks also avoids the issue.
> Looking at this in the debugger reveals that timer service is being
> prematurely quiesced.
> In many respects this is FLINK-7666 brought back to life.
> The problem is not present in 1.9.3.
> There's a minimal reproducible example in
> [https://github.com/alpinegizmo/flink-question-001/tree/bug].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)