[
https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Haibo Sun updated FLINK-14228:
------------------------------
Description:
Currently, the runtime support implementation of
{{Bounded[One|Multi]Input#endInput}} has the following problems:
* The runtime are propagating {{endInput}} immediately on the operator chain
when input of the head operator is finished. Because some operators flush the
buffered data in {{close}}, the downstream operators still receive records
after executing {{endInput}}. This need the operators to flush the buffered
data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491
and issue#13376.
* Timers are not taken into account.
Actually, {{StreamOperator#close}} tells the operator to finish all its
processing and flush output (all remaining buffered data), while {{endInput}}
indicates that no more data will arrive on some input of the operator. That is
to say, for the non-tail operators on the operator chain, when the upstream
operator is closed, the input of its downstream operator arrives at the end. So
for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be:
# {{(Source/Network)Input}} of {{OP1}} is finished.
# call {{OP1#endInput}}
# quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new
timers.
# wait for the pending timers (in processing) of {{OP1}} to finish.
# call {{OP1#close}}
# call {{OP2#endInput}}
# quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new
timers.
# ...
was:
Currently, the runtime support implementation of
{{Bounded[One|Multi]Input#endInput}} has the following problems:
* The runtime are propagating {{endInput}} immediately on the operator chain
when input of the head operator is finished. Because some operators flush the
buffered data in {{close}}, the downstream operators still receive records
after executing {{endInput}}. This need the operators to flush the buffered
data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491
and issue#13376.
* Timers are not taken into account.
{{Actually, StreamOperator#close}} tells the operator to finish all its
processing and flush output (all remaining buffered data), while {{endInput}}
indicates that no more data will arrive on some input of the operator. That is
to say, for the non-tail operators on the operator chain, when the upstream
operator is closed, the input of its downstream operator arrives at the end. So
for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should be:
# {{(Source/Network)Input}} of {{OP1}} is finished.
# call {{OP1#endInput}}
# quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new
timers.
# wait for the pending timers (in processing) of {{OP1}} to finish.
# call {{OP1#close}}
# call {{OP2#endInput}}
# quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new
timers.
# ...
> The runtime support for Bounded[One|Multi]Input#endInput does not properly
> implement their semantics
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-14228
> URL: https://issues.apache.org/jira/browse/FLINK-14228
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.9.0
> Reporter: Haibo Sun
> Assignee: Haibo Sun
> Priority: Major
> Fix For: 1.10.0
>
>
> Currently, the runtime support implementation of
> {{Bounded[One|Multi]Input#endInput}} has the following problems:
> * The runtime are propagating {{endInput}} immediately on the operator chain
> when input of the head operator is finished. Because some operators flush the
> buffered data in {{close}}, the downstream operators still receive records
> after executing {{endInput}}. This need the operators to flush the buffered
> data in {{endInput}} instead of {{close}}, like the PRs for fixing
> issue#13491 and issue#13376.
> * Timers are not taken into account.
> Actually, {{StreamOperator#close}} tells the operator to finish all its
> processing and flush output (all remaining buffered data), while {{endInput}}
> indicates that no more data will arrive on some input of the operator. That
> is to say, for the non-tail operators on the operator chain, when the
> upstream operator is closed, the input of its downstream operator arrives at
> the end. So for an operator chain \{{OP1 -> OP2 -> ... }}, the logic should
> be:
> # {{(Source/Network)Input}} of {{OP1}} is finished.
> # call {{OP1#endInput}}
> # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new
> timers.
> # wait for the pending timers (in processing) of {{OP1}} to finish.
> # call {{OP1#close}}
> # call {{OP2#endInput}}
> # quiesce {{ProcessingTimeService}} to disallow \{{OP2} from registering new
> timers.
> # ...
--
This message was sent by Atlassian Jira
(v8.3.4#803005)