This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 6e01fa651ee [FLINK-37545][metrics] Fix StackOverflowError when using 
MetricGroup in custom WatermarkStrategy
6e01fa651ee is described below

commit 6e01fa651ee4459f715c1b6c60b8c2331cabf233
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu May 8 14:59:11 2025 +0200

    [FLINK-37545][metrics] Fix StackOverflowError when using MetricGroup in 
custom WatermarkStrategy
    
    Previously StackOverflowError was being thrown
---
 .../operators/TimestampsAndWatermarksOperator.java |  3 +-
 .../TimestampsAndWatermarksOperatorTest.java       | 41 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index a636ea9ab82..690631d7311 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -97,7 +97,8 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
                                 new WatermarkGeneratorSupplier.Context() {
                                     @Override
                                     public MetricGroup getMetricGroup() {
-                                        return this.getMetricGroup();
+                                        return 
TimestampsAndWatermarksOperator.this
+                                                .getMetricGroup();
                                     }
 
                                     @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 3f05af39cf3..7f3d74ec43a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.streaming.util.StreamRecordMatchers.streamRecord;
 import static 
org.apache.flink.streaming.util.WatermarkMatchers.legacyWatermark;
@@ -242,6 +245,25 @@ class TimestampsAndWatermarksOperatorTest {
         }
     }
 
+    @Test
+    void testGetMetricGroup() throws Exception {
+        AtomicReference<Gauge<Long>> lastTimestampGauge = new 
AtomicReference<>();
+        OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+                createTestHarness(
+                        WatermarkStrategy.forGenerator(
+                                        (ctx) -> {
+                                            WatermarkGeneratorWithMetrics 
generator =
+                                                    new 
WatermarkGeneratorWithMetrics(
+                                                            
ctx.getMetricGroup());
+                                            
lastTimestampGauge.set(generator.lastTimestampGauge);
+                                            return generator;
+                                        })
+                                .withTimestampAssigner((ctx) -> new 
LongExtractor()));
+
+        testHarness.processElement(new StreamRecord<>(42L));
+        assertThat(lastTimestampGauge.get().getValue()).isEqualTo(42L);
+    }
+
     @Test
     void watermarksWithIdlenessUnderBackpressure() throws Exception {
         long idleTimeout = 100;
@@ -392,4 +414,23 @@ class TimestampsAndWatermarksOperatorTest {
         @Override
         public void onPeriodicEmit(WatermarkOutput output) {}
     }
+
+    private static class WatermarkGeneratorWithMetrics
+            implements WatermarkGenerator<Long>, Serializable {
+
+        private long lastTimestamp;
+        Gauge<Long> lastTimestampGauge;
+
+        public WatermarkGeneratorWithMetrics(MetricGroup metricGroup) {
+            lastTimestampGauge = metricGroup.gauge("lastTimestamp", () -> 
lastTimestamp);
+        }
+
+        @Override
+        public void onEvent(Long event, long eventTimestamp, WatermarkOutput 
output) {
+            lastTimestamp = eventTimestamp;
+        }
+
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {}
+    }
 }

Reply via email to