zhuzhurk commented on code in PR #21588:
URL: https://github.com/apache/flink/pull/21588#discussion_r1061255684
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java:
##########
@@ -226,13 +239,22 @@ private void
findAndOutputMaxWatermarkAcrossAllChannels(DataOutput<?> output) th
* <li>the watermark status has resumed to be active, but the watermark
of the channel hasn't
* caught up to the last output watermark from the valve yet.
* </ul>
+ *
+ * <p>NOTE: This class implements {@link HeapPriorityQueueElement} to be
managed by {@link
+ * #alignedChannelStatuses} to help find minimum watermark.
*/
@VisibleForTesting
- protected static class InputChannelStatus {
+ protected static class InputChannelStatus implements
HeapPriorityQueueElement {
protected long watermark;
protected WatermarkStatus watermarkStatus;
protected boolean isWatermarkAligned;
+ /**
+ * This field holds the current physical index of this channel status
when it is managed by
+ * a {@link HeapPriorityQueue}.
+ */
+ private transient int heapIndex =
HeapPriorityQueueElement.NOT_CONTAINED;
Review Comment:
No need to be `transient` because it is not serializable.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java:
##########
@@ -94,11 +103,12 @@ public void inputWatermark(Watermark watermark, int
channelIndex, DataOutput<?>
if (watermarkMillis > channelStatuses[channelIndex].watermark) {
channelStatuses[channelIndex].watermark = watermarkMillis;
- // previously unaligned input channels are now aligned if its
watermark has caught
- // up
- if (!channelStatuses[channelIndex].isWatermarkAligned
- && watermarkMillis >= lastOutputWatermark) {
- channelStatuses[channelIndex].isWatermarkAligned = true;
+ if (channelStatuses[channelIndex].isWatermarkAligned) {
+
alignedChannelStatuses.adjustModifiedElement(channelStatuses[channelIndex]);
Review Comment:
It's better to introduce a method like `adjustAlignedChannelStatuses(...)`
and add some comments for it. It can be helpful to make the code easier for
understanding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]