[
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553848#comment-17553848
]
YeAble commented on FLINK-28033:
--------------------------------
In my mind,it can do like this:
{code:java}
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;
// determine new overall watermark by considering only watermark-aligned
channels across all
// channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned
&& channelStatus.watermark != Long.MIN_VALUE) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it really is
aggregated
// from some remaining aligned channel, and is also larger than the last output
watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark));
}
{code}
I'm not sure is right, so i report this question in Jira and find any help.
> find and output new min watermark mybe wrong when in multichannel
> -----------------------------------------------------------------
>
> Key: FLINK-28033
> URL: https://issues.apache.org/jira/browse/FLINK-28033
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Reporter: YeAble
> Priority: Blocker
>
> File: StatusWatermarkValue.java
> Method: findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> hasAlignedChannels = true;
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> // we acknowledge and output the new overall watermark if it really is
> aggregated
> // from some remaining aligned channel, and is also larger than the last
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
> lastOutputWatermark = newMinWatermark;
> output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
> channelStatus's initalized watermark is Long.MIN_VALUE. when one
> channelStatus's watermark is changed,but other channelStatus's is not
> changed, the newMinWatermark is always Long.MIN_VALUE and output not
> emitwatermark。
--
This message was sent by Atlassian Jira
(v8.20.7#820007)