This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7dcee149eb4b7b5bc8f5921b2de34f61b5a5b78
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Aug 9 11:10:40 2024 +0200

    [FLINK-35886][task] Hide backpressure from idleness detection in 
WatermarkAssignerOperator
---
 .../wmassigners/WatermarkAssignerOperator.java     | 21 ++++++++++++++--
 .../wmassigners/WatermarkAssignerOperatorTest.java | 29 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
index 5fd7c8a22e1..486ee5274e9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -70,6 +71,9 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
 
     private transient long lastIdleCheckProcessedElements = -1;
 
+    /** {@link PausableRelativeClock} that will be paused in case of 
backpressure. */
+    private transient PausableRelativeClock inputActivityClock;
+
     /**
      * Create a watermark assigner operator.
      *
@@ -95,6 +99,12 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
     @Override
     public void open() throws Exception {
         super.open();
+        inputActivityClock = new 
PausableRelativeClock(getProcessingTimeService().getClock());
+        getContainingTask()
+                .getEnvironment()
+                .getMetricGroup()
+                .getIOMetricGroup()
+                .registerBackPressureListener(inputActivityClock);
 
         // watermark and timestamp should start from 0
         this.currentWatermark = 0;
@@ -154,7 +164,9 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
     @Override
     public void onProcessingTime(long timestamp) throws Exception {
         // timestamp and now can be off in case TM is heavily overloaded.
+        // now and inputActivityNow are using different clocks and can have 
very different values.
         long now = getProcessingTimeService().getCurrentProcessingTime();
+        long inputActivityNow = inputActivityClock.relativeTimeMillis();
 
         if (watermarkInterval > 0 && lastWatermarkPeriodicEmitTime + 
watermarkInterval <= now) {
             lastWatermarkPeriodicEmitTime = now;
@@ -162,13 +174,13 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
         }
 
         if (processedElements != lastIdleCheckProcessedElements) {
-            timeSinceLastIdleCheck = now;
+            timeSinceLastIdleCheck = inputActivityNow;
             lastIdleCheckProcessedElements = processedElements;
         }
 
         if (isIdlenessEnabled()
                 && currentStatus.equals(WatermarkStatus.ACTIVE)
-                && timeSinceLastIdleCheck + idleTimeout <= now) {
+                && timeSinceLastIdleCheck + idleTimeout <= inputActivityNow) {
             // mark the channel as idle to ignore watermarks from this channel
             emitWatermarkStatus(WatermarkStatus.IDLE);
         }
@@ -213,6 +225,11 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
 
     @Override
     public void close() throws Exception {
+        getContainingTask()
+                .getEnvironment()
+                .getMetricGroup()
+                .getIOMetricGroup()
+                .unregisterBackPressureListener(inputActivityClock);
         FunctionUtils.closeFunction(watermarkGenerator);
         super.close();
     }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
index 9cd558629bc..2a5419eafbb 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.wmassigners;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -139,6 +140,34 @@ public class WatermarkAssignerOperatorTest extends 
WatermarkAssignerOperatorTest
         
assertThat(extractWatermarkStatuses(output)).doesNotContain(WatermarkStatus.IDLE);
     }
 
+    @Test
+    public void testIdleTimeoutUnderBackpressure() throws Exception {
+        long idleTimeout = 100;
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(0, WATERMARK_GENERATOR, idleTimeout);
+        testHarness.getExecutionConfig().setAutoWatermarkInterval(idleTimeout);
+        testHarness.open();
+
+        TaskIOMetricGroup taskIOMetricGroup =
+                
testHarness.getEnvironment().getMetricGroup().getIOMetricGroup();
+        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();
+
+        stepProcessingTime(testHarness, 0, idleTimeout * 10, idleTimeout / 10);
+        assertThat(testHarness.getOutput()).isEmpty();
+
+        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
+        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();
+
+        stepProcessingTime(testHarness, idleTimeout * 10, idleTimeout * 20, 
idleTimeout / 10);
+        assertThat(testHarness.getOutput()).isEmpty();
+
+        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();
+
+        stepProcessingTime(testHarness, idleTimeout * 20, idleTimeout * 30, 
idleTimeout / 10);
+        
assertThat(testHarness.getOutput()).containsExactly(WatermarkStatus.IDLE);
+    }
+
     private void stepProcessingTime(
             OneInputStreamOperatorTestHarness<?, ?> testHarness,
             long fromInclusive,

Reply via email to