This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4d8e4b543aeef4898295b81bce0f32131807d5a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 4 14:44:27 2025 +0200

    [hotfix][tests] Create Builder for SourceOperatorTestContext
---
 .../api/operators/SourceOperatorAlignmentTest.java |  42 ++++----
 .../api/operators/SourceOperatorIdleTest.java      |   2 +-
 .../api/operators/SourceOperatorTest.java          |  67 ++++++------
 .../api/operators/SourceOperatorTestContext.java   | 117 +++++++++++----------
 .../operators/SourceOperatorWatermarksTest.java    |  20 ++--
 5 files changed, 130 insertions(+), 118 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index 5c50a044430..67f271994d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -63,15 +63,15 @@ class SourceOperatorAlignmentTest {
     @BeforeEach
     void setup() throws Exception {
         context =
-                new SourceOperatorTestContext(
-                        false,
-                        WatermarkStrategy.forGenerator(ctx -> new 
PunctuatedGenerator())
-                                .withTimestampAssigner((r, t) -> r)
-                                .withWatermarkAlignment(
-                                        "group1",
-                                        Duration.ofMillis(100),
-                                        
Duration.ofMillis(updateIntervalMillis)),
-                        false);
+                SourceOperatorTestContext.builder()
+                        .setWatermarkStrategy(
+                                WatermarkStrategy.forGenerator(ctx -> new 
PunctuatedGenerator())
+                                        .withTimestampAssigner((r, t) -> r)
+                                        .withWatermarkAlignment(
+                                                "group1",
+                                                Duration.ofMillis(100),
+                                                
Duration.ofMillis(updateIntervalMillis)))
+                        .build();
         operator = context.getOperator();
     }
 
@@ -137,16 +137,20 @@ class SourceOperatorAlignmentTest {
     void testWatermarkAlignmentWithIdleness(boolean allSubtasksIdle) throws 
Exception {
         // we use a separate context, because we need to enable idleness
         try (SourceOperatorTestContext context =
-                new SourceOperatorTestContext(
-                        true,
-                        WatermarkStrategy.forGenerator(
-                                        ctx ->
-                                                new PunctuatedGenerator(
-                                                        
PunctuatedGenerator.GenerationMode.ODD))
-                                .withWatermarkAlignment(
-                                        "group1", Duration.ofMillis(100), 
Duration.ofMillis(1))
-                                .withTimestampAssigner((r, t) -> r),
-                        false)) {
+                SourceOperatorTestContext.builder()
+                        .setIdle(true)
+                        .setWatermarkStrategy(
+                                WatermarkStrategy.forGenerator(
+                                                ctx ->
+                                                        new 
PunctuatedGenerator(
+                                                                
PunctuatedGenerator.GenerationMode
+                                                                        .ODD))
+                                        .withWatermarkAlignment(
+                                                "group1",
+                                                Duration.ofMillis(100),
+                                                Duration.ofMillis(1))
+                                        .withTimestampAssigner((r, t) -> r))
+                        .build()) {
             final SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
             operator.initializeState(context.createStateContext());
             operator.open();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java
index 6069ffcb497..8684272518c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java
@@ -41,7 +41,7 @@ class SourceOperatorIdleTest {
 
     @BeforeEach
     void setup() throws Exception {
-        context = new SourceOperatorTestContext();
+        context = SourceOperatorTestContext.builder().build();
         operator = context.getOperator();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index cd44bc08700..554e253e1fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -93,7 +93,10 @@ class SourceOperatorTest {
 
     @BeforeEach
     void setup() throws Exception {
-        context = new SourceOperatorTestContext(false, 
pauseSourcesUntilCheckpoint);
+        context =
+                SourceOperatorTestContext.builder()
+                        
.setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint)
+                        .build();
         operator = context.getOperator();
         mockSourceReader = context.getSourceReader();
         mockGateway = context.getGateway();
@@ -249,28 +252,29 @@ class SourceOperatorTest {
     public void testPausingUntilCheckpoint() throws Exception {
         final List<StreamElement> out = new ArrayList<>();
         try (SourceOperatorTestContext context =
-                new SourceOperatorTestContext(
-                        false,
-                        false,
-                        WatermarkStrategy.<Integer>forMonotonousTimestamps()
-                                .withTimestampAssigner((element, 
recordTimestamp) -> element),
-                        new CollectorOutput<>(out),
-                        false,
-                        pauseSourcesUntilCheckpoint,
-                        // recover with some state, so the source will pause 
until a checkpoint
-                        // to speedup recovery (if pauseSourcesUntilCheckpoint)
-                        (stateManager, operatorID) -> {
-                            long checkpointID = 1L;
-                            stateManager.setReportedCheckpointId(checkpointID);
-                            
stateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
-                                    singletonMap(
-                                            checkpointID,
-                                            new TaskStateSnapshot(
-                                                    singletonMap(
-                                                            operatorID,
-                                                            
OperatorSubtaskState.builder()
-                                                                    
.build()))));
-                        })) {
+                SourceOperatorTestContext.builder()
+                        .setWatermarkStrategy(
+                                
WatermarkStrategy.<Integer>forMonotonousTimestamps()
+                                        .withTimestampAssigner(
+                                                (element, recordTimestamp) -> 
element))
+                        .setOutput(new CollectorOutput<>(out))
+                        
.setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint)
+                        .setPreInit(
+                                // recover with some state, so the source will 
pause until a checkpoint
+                                // to speedup recovery (if 
pauseSourcesUntilCheckpoint)
+                                (stateManager, operatorID) -> {
+                                    long checkpointID = 1L;
+                                    
stateManager.setReportedCheckpointId(checkpointID);
+                                    
stateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
+                                            singletonMap(
+                                                    checkpointID,
+                                                    new TaskStateSnapshot(
+                                                            singletonMap(
+                                                                    operatorID,
+                                                                    
OperatorSubtaskState.builder()
+                                                                            
.build()))));
+                                })
+                        .build()) {
 
             final SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
             operator.open();
@@ -307,14 +311,15 @@ class SourceOperatorTest {
     void testHandleBacklogEvent() throws Exception {
         List<StreamElement> outputStreamElements = new ArrayList<>();
         context =
-                new SourceOperatorTestContext(
-                        false,
-                        false,
-                        WatermarkStrategy.<Integer>forMonotonousTimestamps()
-                                .withTimestampAssigner((element, 
recordTimestamp) -> element),
-                        new CollectorOutput<>(outputStreamElements),
-                        false,
-                        pauseSourcesUntilCheckpoint);
+                SourceOperatorTestContext.builder()
+                        .setWatermarkStrategy(
+                                
WatermarkStrategy.<Integer>forMonotonousTimestamps()
+                                        .withTimestampAssigner(
+                                                (element, recordTimestamp) -> 
element))
+                        .setOutput(new CollectorOutput<>(outputStreamElements))
+                        
.setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint)
+                        .build();
+
         operator = context.getOperator();
         operator.initializeState(context.createStateContext());
         operator.open();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index c0286a53405..2f88a71036a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -62,70 +62,73 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
     public static final int SUBTASK_INDEX = 1;
     public static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 
10);
 
-    private MockSourceReader mockSourceReader;
-    private MockOperatorEventGateway mockGateway;
-    private TestProcessingTimeService timeService;
-    private SourceOperator<Integer, MockSourceSplit> operator;
-    public Output<StreamRecord<Integer>> output;
-
-    public SourceOperatorTestContext() throws Exception {
-        this(false, false);
+    public static Builder builder() {
+        return new Builder();
     }
 
-    public SourceOperatorTestContext(boolean idle, boolean 
pauseSourcesUntilFirstCheckpoint)
-            throws Exception {
-        this(idle, WatermarkStrategy.noWatermarks(), 
pauseSourcesUntilFirstCheckpoint);
-    }
+    public static class Builder {
+        private boolean idle = false;
+        private boolean usePerSplitOutputs = false;
+        private WatermarkStrategy<Integer> watermarkStrategy = 
WatermarkStrategy.noWatermarks();
+        private Output<StreamRecord<Integer>> output = new MockOutput<>(new 
ArrayList<>());
+        private boolean supportsSplitReassignmentOnRecovery = false;
+        private boolean pauseSourcesUntilFirstCheckpoint = false;
+        private BiConsumer<TestTaskStateManager, OperatorID> preInit = (ign0, 
ign1) -> {};
+
+        public Builder setIdle(boolean idle) {
+            this.idle = idle;
+            return this;
+        }
 
-    public SourceOperatorTestContext(
-            boolean idle,
-            WatermarkStrategy<Integer> watermarkStrategy,
-            boolean pauseSourcesUntilFirstCheckpoint)
-            throws Exception {
-        this(
-                idle,
-                false,
-                watermarkStrategy,
-                new MockOutput<>(new ArrayList<>()),
-                false,
-                pauseSourcesUntilFirstCheckpoint);
-    }
+        public Builder setUsePerSplitOutputs(boolean usePerSplitOutputs) {
+            this.usePerSplitOutputs = usePerSplitOutputs;
+            return this;
+        }
 
-    public SourceOperatorTestContext(
-            boolean idle,
-            boolean usePerSplitOutputs,
-            WatermarkStrategy<Integer> watermarkStrategy,
-            Output<StreamRecord<Integer>> output,
-            boolean supportsSplitReassignmentOnRecovery)
-            throws Exception {
-        this(
-                idle,
-                usePerSplitOutputs,
-                watermarkStrategy,
-                output,
-                supportsSplitReassignmentOnRecovery,
-                false,
-                (ign0, ign1) -> {});
-    }
+        public Builder setWatermarkStrategy(WatermarkStrategy<Integer> 
watermarkStrategy) {
+            this.watermarkStrategy = watermarkStrategy;
+            return this;
+        }
 
-    public SourceOperatorTestContext(
-            boolean idle,
-            boolean usePerSplitOutputs,
-            WatermarkStrategy<Integer> watermarkStrategy,
-            Output<StreamRecord<Integer>> output,
-            boolean supportsSplitReassignmentOnRecovery,
-            boolean pauseSourcesUntilFirstCheckpoint)
-            throws Exception {
-        this(
-                idle,
-                usePerSplitOutputs,
-                watermarkStrategy,
-                output,
-                supportsSplitReassignmentOnRecovery,
-                pauseSourcesUntilFirstCheckpoint,
-                (ign0, ign1) -> {});
+        public Builder setOutput(Output<StreamRecord<Integer>> output) {
+            this.output = output;
+            return this;
+        }
+
+        public Builder setSupportsSplitReassignmentOnRecovery(boolean 
supportsSplitReassignmentOnRecovery) {
+            this.supportsSplitReassignmentOnRecovery = 
supportsSplitReassignmentOnRecovery;
+            return this;
+        }
+
+        public Builder setPauseSourcesUntilFirstCheckpoint(
+                boolean pauseSourcesUntilFirstCheckpoint) {
+            this.pauseSourcesUntilFirstCheckpoint = 
pauseSourcesUntilFirstCheckpoint;
+            return this;
+        }
+
+        public Builder setPreInit(BiConsumer<TestTaskStateManager, OperatorID> 
preInit) {
+            this.preInit = preInit;
+            return this;
+        }
+
+        public SourceOperatorTestContext build() throws Exception {
+            return new SourceOperatorTestContext(
+                    idle,
+                    usePerSplitOutputs,
+                    watermarkStrategy,
+                    output,
+                    supportsSplitReassignmentOnRecovery,
+                    pauseSourcesUntilFirstCheckpoint,
+                    preInit);
+        }
     }
 
+    private MockSourceReader mockSourceReader;
+    private MockOperatorEventGateway mockGateway;
+    private TestProcessingTimeService timeService;
+    private SourceOperator<Integer, MockSourceSplit> operator;
+    public Output<StreamRecord<Integer>> output;
+
     public SourceOperatorTestContext(
             boolean idle,
             boolean usePerSplitOutputs,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
index 7bef5d80c6c..2bedbc07fb5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
@@ -46,16 +46,16 @@ class SourceOperatorWatermarksTest {
     @BeforeEach
     void setup() throws Exception {
         context =
-                new SourceOperatorTestContext(
-                        false,
-                        true,
-                        WatermarkStrategy.forGenerator(
-                                        ctx ->
-                                                new SourceOperatorAlignmentTest
-                                                        .PunctuatedGenerator())
-                                .withTimestampAssigner((r, t) -> r),
-                        new MockOutput<>(new ArrayList<>()),
-                        false);
+                SourceOperatorTestContext.builder()
+                        .setUsePerSplitOutputs(true)
+                        .setWatermarkStrategy(
+                                WatermarkStrategy.forGenerator(
+                                                ctx ->
+                                                        new 
SourceOperatorAlignmentTest
+                                                                
.PunctuatedGenerator())
+                                        .withTimestampAssigner((r, t) -> r))
+                        .setOutput(new MockOutput<>(new ArrayList<>()))
+                        .build();
         operator = context.getOperator();
     }
 

Reply via email to