This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 774fd077bd037232ab9a8104931712cf41a57383 Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Apr 7 18:54:44 2025 +0200 [FLINK-37399][tests] Randomize watermark alignment buffer size in ITCases --- .../connector/base/source/reader/AlignedWatermarksITCase.java | 9 ++++++++- .../org/apache/flink/streaming/util/TestStreamEnvironment.java | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java index 3f7ae1d4dd3..10a8c12e7ef 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -76,7 +77,13 @@ class AlignedWatermarksITCase { new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) - .setConfiguration(reporter.addToConfiguration(new Configuration())) + .setConfiguration( + reporter.addToConfiguration( + new Configuration() + .set( + PipelineOptions + .WATERMARK_ALIGNMENT_BUFFER_SIZE, + 0))) .build()); @Test diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index cbf2e9b8f83..c8284090264 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.core.execution.JobClient; @@ -171,6 +172,8 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { true, false); + randomize(conf, PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE, 0, 1, 2); + // randomize ITTests for enabling state change log // TODO: remove the file merging check after FLINK-32085 if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)
