This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 6e01fa651ee [FLINK-37545][metrics] Fix StackOverflowError when using
MetricGroup in custom WatermarkStrategy
6e01fa651ee is described below
commit 6e01fa651ee4459f715c1b6c60b8c2331cabf233
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu May 8 14:59:11 2025 +0200
[FLINK-37545][metrics] Fix StackOverflowError when using MetricGroup in
custom WatermarkStrategy
Previously StackOverflowError was being thrown
---
.../operators/TimestampsAndWatermarksOperator.java | 3 +-
.../TimestampsAndWatermarksOperatorTest.java | 41 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index a636ea9ab82..690631d7311 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -97,7 +97,8 @@ public class TimestampsAndWatermarksOperator<T> extends
AbstractStreamOperator<T
new WatermarkGeneratorSupplier.Context() {
@Override
public MetricGroup getMetricGroup() {
- return this.getMetricGroup();
+ return
TimestampsAndWatermarksOperator.this
+ .getMetricGroup();
}
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 3f05af39cf3..7f3d74ec43a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -24,6 +24,8 @@ import
org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.io.Serializable;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.flink.streaming.util.StreamRecordMatchers.streamRecord;
import static
org.apache.flink.streaming.util.WatermarkMatchers.legacyWatermark;
@@ -242,6 +245,25 @@ class TimestampsAndWatermarksOperatorTest {
}
}
+ @Test
+ void testGetMetricGroup() throws Exception {
+ AtomicReference<Gauge<Long>> lastTimestampGauge = new
AtomicReference<>();
+ OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+ createTestHarness(
+ WatermarkStrategy.forGenerator(
+ (ctx) -> {
+ WatermarkGeneratorWithMetrics
generator =
+ new
WatermarkGeneratorWithMetrics(
+
ctx.getMetricGroup());
+
lastTimestampGauge.set(generator.lastTimestampGauge);
+ return generator;
+ })
+ .withTimestampAssigner((ctx) -> new
LongExtractor()));
+
+ testHarness.processElement(new StreamRecord<>(42L));
+ assertThat(lastTimestampGauge.get().getValue()).isEqualTo(42L);
+ }
+
@Test
void watermarksWithIdlenessUnderBackpressure() throws Exception {
long idleTimeout = 100;
@@ -392,4 +414,23 @@ class TimestampsAndWatermarksOperatorTest {
@Override
public void onPeriodicEmit(WatermarkOutput output) {}
}
+
+ private static class WatermarkGeneratorWithMetrics
+ implements WatermarkGenerator<Long>, Serializable {
+
+ private long lastTimestamp;
+ Gauge<Long> lastTimestampGauge;
+
+ public WatermarkGeneratorWithMetrics(MetricGroup metricGroup) {
+ lastTimestampGauge = metricGroup.gauge("lastTimestamp", () ->
lastTimestamp);
+ }
+
+ @Override
+ public void onEvent(Long event, long eventTimestamp, WatermarkOutput
output) {
+ lastTimestamp = eventTimestamp;
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {}
+ }
}