This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 476e27a37ed2da4acb954244dc2dacedafbf20b2
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 4 08:41:35 2025 +0200

    [hotfix][tests] Refactor SourceOperatorSplitWatermarkAlignmentTest
---
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 42 +++++++++-------------
 1 file changed, 16 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index b5246f7b518..2c26b920a77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -76,6 +76,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Unit test for split alignment in {@link SourceOperator}. */
 class SourceOperatorSplitWatermarkAlignmentTest {
 
+    private static final int updateIntervalMillis = 1;
+
     @Test
     void testSplitWatermarkAlignment() throws Exception {
 
@@ -83,29 +85,9 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                 new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);
         Environment env = getTestingEnvironment();
         SourceOperator<Integer, MockSourceSplit> operator =
-                new TestingSourceOperator<>(
-                        new StreamOperatorParameters<>(
-                                new SourceOperatorStreamTask<Integer>(env),
-                                new MockStreamConfig(new Configuration(), 1),
-                                new MockOutput<>(new ArrayList<>()),
-                                TestProcessingTimeService::new,
-                                null,
-                                null),
-                        sourceReader,
-                        WatermarkStrategy.forGenerator(ctx -> new 
TestWatermarkGenerator())
-                                .withTimestampAssigner((r, l) -> r)
-                                .withWatermarkAlignment("group-1", 
Duration.ofMillis(1)),
-                        new TestProcessingTimeService(),
-                        new MockOperatorEventGateway(),
-                        1,
-                        5,
-                        true,
-                        false,
-                        false);
-        operator.initializeState(
-                new StreamTaskStateInitializerImpl(env, new 
HashMapStateBackend()));
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, new TestProcessingTimeService(), 0);
 
-        operator.open();
         MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
         MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
         split1.addRecord(5);
@@ -531,6 +513,17 @@ class SourceOperatorSplitWatermarkAlignmentTest {
             long idleTimeout,
             Environment env)
             throws Exception {
+
+        WatermarkStrategy<Integer> watermarkStrategy =
+                WatermarkStrategy.forGenerator(ctx -> new 
TestWatermarkGenerator())
+                        .withTimestampAssigner((r, l) -> r)
+                        .withWatermarkAlignment(
+                                "group-1",
+                                Duration.ofMillis(1),
+                                Duration.ofMillis(updateIntervalMillis));
+        if (idleTimeout > 0) {
+            watermarkStrategy = 
watermarkStrategy.withIdleness(Duration.ofMillis(idleTimeout));
+        }
         SourceOperator<Integer, MockSourceSplit> operator =
                 new TestingSourceOperator<>(
                         new StreamOperatorParameters<>(
@@ -541,10 +534,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                                 null,
                                 null),
                         sourceReader,
-                        WatermarkStrategy.forGenerator(ctx -> new 
TestWatermarkGenerator())
-                                .withTimestampAssigner((r, l) -> r)
-                                .withWatermarkAlignment("group-1", 
Duration.ofMillis(1))
-                                .withIdleness(Duration.ofMillis(idleTimeout)),
+                        watermarkStrategy,
                         processingTimeService,
                         new MockOperatorEventGateway(),
                         1,

Reply via email to