This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e03da5e053888512d85a40620bf52c300b233fc3
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 4 15:28:14 2025 +0200

    [FLINK-37399][runtime] Add SamplingWatermarkRingBuffer
---
 .../api/operators/source/WatermarkSampler.java     | 66 +++++++++++++++
 .../api/operators/source/WatermarkSamplerTest.java | 97 ++++++++++++++++++++++
 2 files changed, 163 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkSampler.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkSampler.java
new file mode 100644
index 00000000000..ae99e9cd97b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkSampler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+
+/**
+ * Allows to sample latest watermark values and store those samples in a ring 
buffer. Returns {@link
+ * Watermark.UNINITIALIZED} until fully initialized.
+ *
+ * <p>Special case for capacity = 0, then sampling is not used, and then 
always latest value is
+ * reported as oldest
+ */
+@Internal
+public class WatermarkSampler {
+    private final ArrayDeque<Long> watermarksRingBuffer;
+    private final int capacity;
+    private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+    public WatermarkSampler(int capacity) {
+        this.capacity = capacity;
+        watermarksRingBuffer = new ArrayDeque<>(capacity);
+        for (int i = 0; i < capacity; i++) {
+            watermarksRingBuffer.add(Watermark.UNINITIALIZED.getTimestamp());
+        }
+    }
+
+    public void addLatest(long watermark) {
+        latestWatermark = watermark;
+    }
+
+    public void sample() {
+        watermarksRingBuffer.add(latestWatermark);
+        watermarksRingBuffer.remove();
+    }
+
+    public long getLatest() {
+        return latestWatermark;
+    }
+
+    public long getOldestSample() {
+        if (capacity == 0) {
+            return getLatest();
+        }
+        return watermarksRingBuffer.getFirst();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkSamplerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkSamplerTest.java
new file mode 100644
index 00000000000..b67f54a404e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkSamplerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link WatermarkSampler}. */
+public class WatermarkSamplerTest {
+
+    @Test
+    public void testCapacity0() {
+        WatermarkSampler ringBuffer = new WatermarkSampler(0);
+
+        
assertThat(ringBuffer.getOldestSample()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+        
assertThat(ringBuffer.getLatest()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+
+        ringBuffer.addLatest(13L);
+        assertThat(ringBuffer.getOldestSample()).isEqualTo(13L);
+        assertThat(ringBuffer.getLatest()).isEqualTo(13L);
+        ringBuffer.addLatest(42L);
+        ringBuffer.sample();
+        assertThat(ringBuffer.getOldestSample()).isEqualTo(42L);
+        assertThat(ringBuffer.getLatest()).isEqualTo(42L);
+    }
+
+    @Test
+    public void testCapacity2() {
+        WatermarkSampler ringBuffer = new WatermarkSampler(2);
+
+        
assertThat(ringBuffer.getOldestSample()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+        
assertThat(ringBuffer.getLatest()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+
+        ringBuffer.addLatest(13L);
+        ringBuffer.addLatest(42L);
+        ringBuffer.sample();
+        
assertThat(ringBuffer.getOldestSample()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+        assertThat(ringBuffer.getLatest()).isEqualTo(42L);
+
+        ringBuffer.addLatest(44L);
+        ringBuffer.sample();
+        assertThat(ringBuffer.getOldestSample()).isEqualTo(42L);
+        assertThat(ringBuffer.getLatest()).isEqualTo(44L);
+
+        ringBuffer.addLatest(1337L);
+        ringBuffer.sample();
+        assertThat(ringBuffer.getOldestSample()).isEqualTo(44L);
+        assertThat(ringBuffer.getLatest()).isEqualTo(1337L);
+    }
+
+    @Test
+    public void testCapacity3() {
+        WatermarkSampler ringBuffer = new WatermarkSampler(3);
+
+        
assertThat(ringBuffer.getOldestSample()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+        
assertThat(ringBuffer.getLatest()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+
+        ringBuffer.addLatest(42L);
+        ringBuffer.sample();
+        
assertThat(ringBuffer.getOldestSample()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+        assertThat(ringBuffer.getLatest()).isEqualTo(42L);
+
+        ringBuffer.addLatest(44L);
+        ringBuffer.sample();
+        
assertThat(ringBuffer.getOldestSample()).isEqualTo(Watermark.UNINITIALIZED.getTimestamp());
+        assertThat(ringBuffer.getLatest()).isEqualTo(44L);
+
+        ringBuffer.addLatest(1337L);
+        ringBuffer.sample();
+        assertThat(ringBuffer.getOldestSample()).isEqualTo(42L);
+        assertThat(ringBuffer.getLatest()).isEqualTo(1337L);
+
+        ringBuffer.addLatest(0L);
+        ringBuffer.sample();
+        assertThat(ringBuffer.getOldestSample()).isEqualTo(44L);
+        assertThat(ringBuffer.getLatest()).isEqualTo(0L);
+    }
+}

Reply via email to