zhuzhurk commented on code in PR #21588:
URL: https://github.com/apache/flink/pull/21588#discussion_r1061255684


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java:
##########
@@ -226,13 +239,22 @@ private void 
findAndOutputMaxWatermarkAcrossAllChannels(DataOutput<?> output) th
      *   <li>the watermark status has resumed to be active, but the watermark 
of the channel hasn't
      *       caught up to the last output watermark from the valve yet.
      * </ul>
+     *
+     * <p>NOTE: This class implements {@link HeapPriorityQueueElement} to be 
managed by {@link
+     * #alignedChannelStatuses} to help find minimum watermark.
      */
     @VisibleForTesting
-    protected static class InputChannelStatus {
+    protected static class InputChannelStatus implements 
HeapPriorityQueueElement {
         protected long watermark;
         protected WatermarkStatus watermarkStatus;
         protected boolean isWatermarkAligned;
 
+        /**
+         * This field holds the current physical index of this channel status 
when it is managed by
+         * a {@link HeapPriorityQueue}.
+         */
+        private transient int heapIndex = 
HeapPriorityQueueElement.NOT_CONTAINED;

Review Comment:
   No need to be `transient` because it is not serializable.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java:
##########
@@ -94,11 +103,12 @@ public void inputWatermark(Watermark watermark, int 
channelIndex, DataOutput<?>
             if (watermarkMillis > channelStatuses[channelIndex].watermark) {
                 channelStatuses[channelIndex].watermark = watermarkMillis;
 
-                // previously unaligned input channels are now aligned if its 
watermark has caught
-                // up
-                if (!channelStatuses[channelIndex].isWatermarkAligned
-                        && watermarkMillis >= lastOutputWatermark) {
-                    channelStatuses[channelIndex].isWatermarkAligned = true;
+                if (channelStatuses[channelIndex].isWatermarkAligned) {
+                    
alignedChannelStatuses.adjustModifiedElement(channelStatuses[channelIndex]);

Review Comment:
   It's better to introduce a method like `adjustAlignedChannelStatuses(...)` 
and add some comments for it. It can be helpful to make the code easier for 
understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to