This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 774fd077bd037232ab9a8104931712cf41a57383
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Apr 7 18:54:44 2025 +0200

    [FLINK-37399][tests] Randomize watermark alignment buffer size in ITCases
---
 .../connector/base/source/reader/AlignedWatermarksITCase.java    | 9 ++++++++-
 .../org/apache/flink/streaming/util/TestStreamEnvironment.java   | 3 +++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
index 3f7ae1d4dd3..10a8c12e7ef 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -76,7 +77,13 @@ class AlignedWatermarksITCase {
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
-                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .setConfiguration(
+                                    reporter.addToConfiguration(
+                                            new Configuration()
+                                                    .set(
+                                                            PipelineOptions
+                                                                    
.WATERMARK_ALIGNMENT_BUFFER_SIZE,
+                                                            0)))
                             .build());
 
     @Test
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index cbf2e9b8f83..c8284090264 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.execution.JobClient;
@@ -171,6 +172,8 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                 true,
                 false);
 
+        randomize(conf, PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE, 0, 1, 
2);
+
         // randomize ITTests for enabling state change log
         // TODO: remove the file merging check after FLINK-32085
         if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)

Reply via email to