This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c7dcee149eb4b7b5bc8f5921b2de34f61b5a5b78 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Aug 9 11:10:40 2024 +0200 [FLINK-35886][task] Hide backpressure from idleness detection in WatermarkAssignerOperator --- .../wmassigners/WatermarkAssignerOperator.java | 21 ++++++++++++++-- .../wmassigners/WatermarkAssignerOperatorTest.java | 29 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java index 5fd7c8a22e1..486ee5274e9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.PausableRelativeClock; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -70,6 +71,9 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> private transient long lastIdleCheckProcessedElements = -1; + /** {@link PausableRelativeClock} that will be paused in case of backpressure. */ + private transient PausableRelativeClock inputActivityClock; + /** * Create a watermark assigner operator. * @@ -95,6 +99,12 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> @Override public void open() throws Exception { super.open(); + inputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock()); + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .registerBackPressureListener(inputActivityClock); // watermark and timestamp should start from 0 this.currentWatermark = 0; @@ -154,7 +164,9 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> @Override public void onProcessingTime(long timestamp) throws Exception { // timestamp and now can be off in case TM is heavily overloaded. + // now and inputActivityNow are using different clocks and can have very different values. long now = getProcessingTimeService().getCurrentProcessingTime(); + long inputActivityNow = inputActivityClock.relativeTimeMillis(); if (watermarkInterval > 0 && lastWatermarkPeriodicEmitTime + watermarkInterval <= now) { lastWatermarkPeriodicEmitTime = now; @@ -162,13 +174,13 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> } if (processedElements != lastIdleCheckProcessedElements) { - timeSinceLastIdleCheck = now; + timeSinceLastIdleCheck = inputActivityNow; lastIdleCheckProcessedElements = processedElements; } if (isIdlenessEnabled() && currentStatus.equals(WatermarkStatus.ACTIVE) - && timeSinceLastIdleCheck + idleTimeout <= now) { + && timeSinceLastIdleCheck + idleTimeout <= inputActivityNow) { // mark the channel as idle to ignore watermarks from this channel emitWatermarkStatus(WatermarkStatus.IDLE); } @@ -213,6 +225,11 @@ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> @Override public void close() throws Exception { + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .unregisterBackPressureListener(inputActivityClock); FunctionUtils.closeFunction(watermarkGenerator); super.close(); } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java index 9cd558629bc..2a5419eafbb 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.wmassigners; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; @@ -139,6 +140,34 @@ public class WatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTest assertThat(extractWatermarkStatuses(output)).doesNotContain(WatermarkStatus.IDLE); } + @Test + public void testIdleTimeoutUnderBackpressure() throws Exception { + long idleTimeout = 100; + + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(0, WATERMARK_GENERATOR, idleTimeout); + testHarness.getExecutionConfig().setAutoWatermarkInterval(idleTimeout); + testHarness.open(); + + TaskIOMetricGroup taskIOMetricGroup = + testHarness.getEnvironment().getMetricGroup().getIOMetricGroup(); + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart(); + + stepProcessingTime(testHarness, 0, idleTimeout * 10, idleTimeout / 10); + assertThat(testHarness.getOutput()).isEmpty(); + + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd(); + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart(); + + stepProcessingTime(testHarness, idleTimeout * 10, idleTimeout * 20, idleTimeout / 10); + assertThat(testHarness.getOutput()).isEmpty(); + + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd(); + + stepProcessingTime(testHarness, idleTimeout * 20, idleTimeout * 30, idleTimeout / 10); + assertThat(testHarness.getOutput()).containsExactly(WatermarkStatus.IDLE); + } + private void stepProcessingTime( OneInputStreamOperatorTestHarness<?, ?> testHarness, long fromInclusive,
