pnowojski commented on code in PR #24941:
URL: https://github.com/apache/flink/pull/24941#discussion_r1642415271


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -136,19 +154,28 @@ private void advanceWatermark() {
 
     @Override
     public void onProcessingTime(long timestamp) throws Exception {
-        advanceWatermark();
+        // timestamp and now can be off in case TM is heavily overloaded.
+        long now = getProcessingTimeService().getCurrentProcessingTime();
 
-        if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
-            final long currentTime = 
getProcessingTimeService().getCurrentProcessingTime();
-            if (currentTime - lastRecordTime > idleTimeout) {
-                // mark the channel as idle to ignore watermarks from this 
channel
-                emitWatermarkStatus(WatermarkStatus.IDLE);
-            }
+        if (watermarkInterval > 0
+                && lastWatermarkPeriodicEmitTime + watermarkInterval <= 
timestamp) {
+            lastWatermarkPeriodicEmitTime = now;
+            advanceWatermark();
+        }
+        if (processedElements != lastIdleCheckProcessedElements) {
+            idleSince = now;
+        }
+        lastIdleCheckProcessedElements = processedElements;

Review Comment:
   It doesn't matter, does it? But anyway 👍 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -52,12 +53,22 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
 
     private transient long watermarkInterval;
 
+    private transient long timerInterval;
+
     private transient long currentWatermark;
 
-    private transient long lastRecordTime;
+    // Last time watermark have been (periodically) emitted
+    private transient long lastWatermarkPeriodicEmitTime;
+
+    // Last time idleness status has been checked
+    private transient long idleSince;

Review Comment:
   I've changed it to `timeSinceLastIdleCheck`. Or do you have other suggestion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to