[ 
https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reopened FLINK-7721:
----------------------------------------

> StatusWatermarkValve should output a new min watermark only if it was 
> aggregated from aligned chhanels
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7721
>                 URL: https://issues.apache.org/jira/browse/FLINK-7721
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.1, 1.4.0, 1.3.2
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Context:
> {code}
> long newMinWatermark = Long.MAX_VALUE;
> for (InputChannelStatus channelStatus : channelStatuses) {
>     if (channelStatus.isWatermarkAligned) {
>         newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
>     }
> }
> {code}
> In the calculation of the new min watermark in 
> {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, 
> there is not verification that the calculated new min watermark 
> {{newMinWatermark}} really is aggregated from some aligned channel.
> In the corner case where all input channels are currently not aligned but 
> actually some are active, we would then incorrectly determine that the final 
> aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.
> The fix would simply be to only emit the aggregated watermark IFF it was 
> really calculated from some aligned input channel (as well as the already 
> existing constraint that it needs to be larger than the last emitted 
> watermark). This change should also safely cover the case that a 
> {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to