[ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553848#comment-17553848
 ] 

YeAble commented on FLINK-28033:
--------------------------------

In my mind,it can do like this:
{code:java}
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;

// determine new overall watermark by considering only watermark-aligned 
channels across all
// channels
for (InputChannelStatus channelStatus : channelStatuses) {
    if (channelStatus.isWatermarkAligned  
        && channelStatus.watermark != Long.MIN_VALUE) {
        hasAlignedChannels = true;
        newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
    }
}

// we acknowledge and output the new overall watermark if it really is 
aggregated
// from some remaining aligned channel, and is also larger than the last output 
watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
    lastOutputWatermark = newMinWatermark;
    output.emitWatermark(new Watermark(lastOutputWatermark));
}
{code}
I'm not sure is right, so i report  this question in Jira and find any help.

> find and output new min watermark mybe wrong when in multichannel
> -----------------------------------------------------------------
>
>                 Key: FLINK-28033
>                 URL: https://issues.apache.org/jira/browse/FLINK-28033
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>            Reporter: YeAble
>            Priority: Blocker
>
> File: StatusWatermarkValue.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
>     if (channelStatus.isWatermarkAligned) {
>         hasAlignedChannels = true;
>         newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
>     }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
>     lastOutputWatermark = newMinWatermark;
>     output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to