[ 
https://issues.apache.org/jira/browse/FLINK-14228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-14228:
------------------------------
    Description: 
Currently, the runtime support implementation of 
{{Bounded[One|Multi]Input#endInput}} has the following problems:
 * The runtime are propagating {{endInput}} immediately on the operator chain 
when input of the head operator is finished. Because some operators flush the 
buffered data in {{close}}, the downstream operators still receive records 
after executing {{endInput}}. This need the operators to flush the buffered 
data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 
and issue#13376.

 * Timers are not taken into account.

Actually, {{StreamOperator#close}} tells the operator to finish all its 
processing and flush output (all remaining buffered data), while {{endInput}} 
indicates that no more data will arrive on some input of the operator. That is 
to say, for the non-tail operators on the operator chain, when the upstream 
operator is closed, the input of its downstream operator arrives at the end. So 
for an operator chain {{OP1 -> OP2 -> ... }},  the logic should be:
 # {{(Source/Network)Input}} of {{OP1}} is finished.
 # call {{OP1#endInput}}
 # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new 
timers.
 # wait for the pending timers (in processing) of {{OP1}} to finish.
 # call {{OP1#close}}
 # call {{OP2#endInput}}
 # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new 
timers.
 # ...

  was:
Currently, the runtime support implementation of 
{{Bounded[One|Multi]Input#endInput}} has the following problems:
 * The runtime are propagating {{endInput}} immediately on the operator chain 
when input of the head operator is finished. Because some operators flush the 
buffered data in {{close}}, the downstream operators still receive records 
after executing {{endInput}}. This need the operators to flush the buffered 
data in {{endInput}} instead of {{close}}, like the PRs for fixing issue#13491 
and issue#13376.

 * Timers are not taken into account.

Actually, {{StreamOperator#close}} tells the operator to finish all its 
processing and flush output (all remaining buffered data), while {{endInput}} 
indicates that no more data will arrive on some input of the operator. That is 
to say, for the non-tail operators on the operator chain, when the upstream 
operator is closed, the input of its downstream operator arrives at the end. So 
for an operator chain \{{OP1 -> OP2 -> ... }},  the logic should be:
 # {{(Source/Network)Input}} of {{OP1}} is finished.
 # call {{OP1#endInput}}
 # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new 
timers.
 # wait for the pending timers (in processing) of {{OP1}} to finish.
 # call {{OP1#close}}
 # call {{OP2#endInput}}
 # quiesce {{ProcessingTimeService}} to disallow {{OP2} from registering new 
timers.
 # ...


> The runtime support for Bounded[One|Multi]Input#endInput does not properly 
> implement their semantics
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14228
>                 URL: https://issues.apache.org/jira/browse/FLINK-14228
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0
>            Reporter: Haibo Sun
>            Assignee: Haibo Sun
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Currently, the runtime support implementation of 
> {{Bounded[One|Multi]Input#endInput}} has the following problems:
>  * The runtime are propagating {{endInput}} immediately on the operator chain 
> when input of the head operator is finished. Because some operators flush the 
> buffered data in {{close}}, the downstream operators still receive records 
> after executing {{endInput}}. This need the operators to flush the buffered 
> data in {{endInput}} instead of {{close}}, like the PRs for fixing 
> issue#13491 and issue#13376.
>  * Timers are not taken into account.
> Actually, {{StreamOperator#close}} tells the operator to finish all its 
> processing and flush output (all remaining buffered data), while {{endInput}} 
> indicates that no more data will arrive on some input of the operator. That 
> is to say, for the non-tail operators on the operator chain, when the 
> upstream operator is closed, the input of its downstream operator arrives at 
> the end. So for an operator chain {{OP1 -> OP2 -> ... }},  the logic should 
> be:
>  # {{(Source/Network)Input}} of {{OP1}} is finished.
>  # call {{OP1#endInput}}
>  # quiesce {{ProcessingTimeService}} to disallow {{OP1}} from registering new 
> timers.
>  # wait for the pending timers (in processing) of {{OP1}} to finish.
>  # call {{OP1#close}}
>  # call {{OP2#endInput}}
>  # quiesce {{ProcessingTimeService}} to disallow {{OP2}} from registering new 
> timers.
>  # ...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to