This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit deecdc1b8dd2e5cf8a058492fecac573f59bbd4e Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Aug 6 17:52:12 2024 +0200 [FLINK-35886][task] Hide backpressure from idleness detection in TimestampsAndWatermarksOperator --- .../runtime/metrics/groups/TaskIOMetricGroup.java | 10 ++++++ .../operators/TimestampsAndWatermarksOperator.java | 23 ++++++++++-- .../TimestampsAndWatermarksOperatorTest.java | 42 ++++++++++++++++++++++ .../util/AbstractStreamOperatorTestHarness.java | 4 +++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 69ead952442..12ba23cea21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -312,6 +312,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { return mailboxSize; } + public void registerBackPressureListener(TimerGauge.StartStopListener backPressureListener) { + hardBackPressuredTimePerSecond.registerListener(backPressureListener); + softBackPressuredTimePerSecond.registerListener(backPressureListener); + } + + public void unregisterBackPressureListener(TimerGauge.StartStopListener backPressureListener) { + hardBackPressuredTimePerSecond.unregisterListener(backPressureListener); + softBackPressuredTimePerSecond.unregisterListener(backPressureListener); + } + // ============================================================================================ // Metric Reuse // ============================================================================================ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java index 02f72f3ba32..a636ea9ab82 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java @@ -29,10 +29,10 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.util.PausableRelativeClock; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.clock.RelativeClock; -import org.apache.flink.util.clock.SystemClock; import static org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -70,6 +70,9 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T /** Whether to emit intermediate watermarks or only one final watermark at the end of input. */ private final boolean emitProgressiveWatermarks; + /** {@link PausableRelativeClock} that will be paused in case of backpressure. */ + private transient PausableRelativeClock inputActivityClock; + public TimestampsAndWatermarksOperator( WatermarkStrategy<T> watermarkStrategy, boolean emitProgressiveWatermarks) { this.watermarkStrategy = checkNotNull(watermarkStrategy); @@ -80,6 +83,12 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T @Override public void open() throws Exception { super.open(); + inputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock()); + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .registerBackPressureListener(inputActivityClock); timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); watermarkGenerator = @@ -93,7 +102,7 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T @Override public RelativeClock getInputActivityClock() { - return SystemClock.getInstance(); + return inputActivityClock; } }) : new NoWatermarksGenerator<>(); @@ -107,6 +116,16 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T } } + @Override + public void close() throws Exception { + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .unregisterBackPressureListener(inputActivityClock); + super.close(); + } + @Override public void processElement(final StreamRecord<T> element) throws Exception { final T event = element.getValue(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java index 3f718efe3f7..3f05af39cf3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -31,6 +32,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; import java.io.Serializable; +import java.time.Duration; import static org.apache.flink.streaming.util.StreamRecordMatchers.streamRecord; import static org.apache.flink.streaming.util.WatermarkMatchers.legacyWatermark; @@ -240,6 +242,46 @@ class TimestampsAndWatermarksOperatorTest { } } + @Test + void watermarksWithIdlenessUnderBackpressure() throws Exception { + long idleTimeout = 100; + + TimestampsAndWatermarksOperator<Tuple2<Boolean, Long>> operator = + new TimestampsAndWatermarksOperator<>( + WatermarkStrategy.forGenerator((ctx) -> new PunctuatedWatermarkGenerator()) + .withTimestampAssigner((ctx) -> new TupleExtractor()) + .withIdleness(Duration.ofMillis(idleTimeout)), + true); + + OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, Tuple2<Boolean, Long>> + testHarness = new OneInputStreamOperatorTestHarness<>(operator); + testHarness.open(); + + TaskIOMetricGroup taskIOMetricGroup = + testHarness.getEnvironment().getMetricGroup().getIOMetricGroup(); + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart(); + + for (int i = 0; i < 10; i++) { + testHarness.advanceTime(idleTimeout); + } + assertThat(testHarness.getOutput()).isEmpty(); + + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd(); + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart(); + + for (int i = 10; i < 20; i++) { + testHarness.advanceTime(idleTimeout); + } + assertThat(testHarness.getOutput()).isEmpty(); + + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd(); + + for (int i = 20; i < 30; i++) { + testHarness.advanceTime(idleTimeout); + } + assertThat(testHarness.getOutput()).containsExactly(WatermarkStatus.IDLE); + } + private static <T> OneInputStreamOperatorTestHarness<T, T> createTestHarness( WatermarkStrategy<T> watermarkStrategy) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index b2e7a6b169e..d689f009842 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -776,6 +776,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { return factory; } + public void advanceTime(long delta) throws Exception { + processingTimeService.advance(delta); + } + public void setProcessingTime(long time) throws Exception { processingTimeService.setCurrentTime(time); }
