This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f4d8e4b543aeef4898295b81bce0f32131807d5a Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Apr 4 14:44:27 2025 +0200 [hotfix][tests] Create Builder for SourceOperatorTestContext --- .../api/operators/SourceOperatorAlignmentTest.java | 42 ++++---- .../api/operators/SourceOperatorIdleTest.java | 2 +- .../api/operators/SourceOperatorTest.java | 67 ++++++------ .../api/operators/SourceOperatorTestContext.java | 117 +++++++++++---------- .../operators/SourceOperatorWatermarksTest.java | 20 ++-- 5 files changed, 130 insertions(+), 118 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index 5c50a044430..67f271994d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -63,15 +63,15 @@ class SourceOperatorAlignmentTest { @BeforeEach void setup() throws Exception { context = - new SourceOperatorTestContext( - false, - WatermarkStrategy.forGenerator(ctx -> new PunctuatedGenerator()) - .withTimestampAssigner((r, t) -> r) - .withWatermarkAlignment( - "group1", - Duration.ofMillis(100), - Duration.ofMillis(updateIntervalMillis)), - false); + SourceOperatorTestContext.builder() + .setWatermarkStrategy( + WatermarkStrategy.forGenerator(ctx -> new PunctuatedGenerator()) + .withTimestampAssigner((r, t) -> r) + .withWatermarkAlignment( + "group1", + Duration.ofMillis(100), + Duration.ofMillis(updateIntervalMillis))) + .build(); operator = context.getOperator(); } @@ -137,16 +137,20 @@ class SourceOperatorAlignmentTest { void testWatermarkAlignmentWithIdleness(boolean allSubtasksIdle) throws Exception { // we use a separate context, because we need to enable idleness try (SourceOperatorTestContext context = - new SourceOperatorTestContext( - true, - WatermarkStrategy.forGenerator( - ctx -> - new PunctuatedGenerator( - PunctuatedGenerator.GenerationMode.ODD)) - .withWatermarkAlignment( - "group1", Duration.ofMillis(100), Duration.ofMillis(1)) - .withTimestampAssigner((r, t) -> r), - false)) { + SourceOperatorTestContext.builder() + .setIdle(true) + .setWatermarkStrategy( + WatermarkStrategy.forGenerator( + ctx -> + new PunctuatedGenerator( + PunctuatedGenerator.GenerationMode + .ODD)) + .withWatermarkAlignment( + "group1", + Duration.ofMillis(100), + Duration.ofMillis(1)) + .withTimestampAssigner((r, t) -> r)) + .build()) { final SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); operator.initializeState(context.createStateContext()); operator.open(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java index 6069ffcb497..8684272518c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java @@ -41,7 +41,7 @@ class SourceOperatorIdleTest { @BeforeEach void setup() throws Exception { - context = new SourceOperatorTestContext(); + context = SourceOperatorTestContext.builder().build(); operator = context.getOperator(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index cd44bc08700..554e253e1fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -93,7 +93,10 @@ class SourceOperatorTest { @BeforeEach void setup() throws Exception { - context = new SourceOperatorTestContext(false, pauseSourcesUntilCheckpoint); + context = + SourceOperatorTestContext.builder() + .setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint) + .build(); operator = context.getOperator(); mockSourceReader = context.getSourceReader(); mockGateway = context.getGateway(); @@ -249,28 +252,29 @@ class SourceOperatorTest { public void testPausingUntilCheckpoint() throws Exception { final List<StreamElement> out = new ArrayList<>(); try (SourceOperatorTestContext context = - new SourceOperatorTestContext( - false, - false, - WatermarkStrategy.<Integer>forMonotonousTimestamps() - .withTimestampAssigner((element, recordTimestamp) -> element), - new CollectorOutput<>(out), - false, - pauseSourcesUntilCheckpoint, - // recover with some state, so the source will pause until a checkpoint - // to speedup recovery (if pauseSourcesUntilCheckpoint) - (stateManager, operatorID) -> { - long checkpointID = 1L; - stateManager.setReportedCheckpointId(checkpointID); - stateManager.setJobManagerTaskStateSnapshotsByCheckpointId( - singletonMap( - checkpointID, - new TaskStateSnapshot( - singletonMap( - operatorID, - OperatorSubtaskState.builder() - .build())))); - })) { + SourceOperatorTestContext.builder() + .setWatermarkStrategy( + WatermarkStrategy.<Integer>forMonotonousTimestamps() + .withTimestampAssigner( + (element, recordTimestamp) -> element)) + .setOutput(new CollectorOutput<>(out)) + .setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint) + .setPreInit( + // recover with some state, so the source will pause until a checkpoint + // to speedup recovery (if pauseSourcesUntilCheckpoint) + (stateManager, operatorID) -> { + long checkpointID = 1L; + stateManager.setReportedCheckpointId(checkpointID); + stateManager.setJobManagerTaskStateSnapshotsByCheckpointId( + singletonMap( + checkpointID, + new TaskStateSnapshot( + singletonMap( + operatorID, + OperatorSubtaskState.builder() + .build())))); + }) + .build()) { final SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); operator.open(); @@ -307,14 +311,15 @@ class SourceOperatorTest { void testHandleBacklogEvent() throws Exception { List<StreamElement> outputStreamElements = new ArrayList<>(); context = - new SourceOperatorTestContext( - false, - false, - WatermarkStrategy.<Integer>forMonotonousTimestamps() - .withTimestampAssigner((element, recordTimestamp) -> element), - new CollectorOutput<>(outputStreamElements), - false, - pauseSourcesUntilCheckpoint); + SourceOperatorTestContext.builder() + .setWatermarkStrategy( + WatermarkStrategy.<Integer>forMonotonousTimestamps() + .withTimestampAssigner( + (element, recordTimestamp) -> element)) + .setOutput(new CollectorOutput<>(outputStreamElements)) + .setPauseSourcesUntilFirstCheckpoint(pauseSourcesUntilCheckpoint) + .build(); + operator = context.getOperator(); operator.initializeState(context.createStateContext()); operator.open(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index c0286a53405..2f88a71036a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -62,70 +62,73 @@ public class SourceOperatorTestContext implements AutoCloseable { public static final int SUBTASK_INDEX = 1; public static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10); - private MockSourceReader mockSourceReader; - private MockOperatorEventGateway mockGateway; - private TestProcessingTimeService timeService; - private SourceOperator<Integer, MockSourceSplit> operator; - public Output<StreamRecord<Integer>> output; - - public SourceOperatorTestContext() throws Exception { - this(false, false); + public static Builder builder() { + return new Builder(); } - public SourceOperatorTestContext(boolean idle, boolean pauseSourcesUntilFirstCheckpoint) - throws Exception { - this(idle, WatermarkStrategy.noWatermarks(), pauseSourcesUntilFirstCheckpoint); - } + public static class Builder { + private boolean idle = false; + private boolean usePerSplitOutputs = false; + private WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategy.noWatermarks(); + private Output<StreamRecord<Integer>> output = new MockOutput<>(new ArrayList<>()); + private boolean supportsSplitReassignmentOnRecovery = false; + private boolean pauseSourcesUntilFirstCheckpoint = false; + private BiConsumer<TestTaskStateManager, OperatorID> preInit = (ign0, ign1) -> {}; + + public Builder setIdle(boolean idle) { + this.idle = idle; + return this; + } - public SourceOperatorTestContext( - boolean idle, - WatermarkStrategy<Integer> watermarkStrategy, - boolean pauseSourcesUntilFirstCheckpoint) - throws Exception { - this( - idle, - false, - watermarkStrategy, - new MockOutput<>(new ArrayList<>()), - false, - pauseSourcesUntilFirstCheckpoint); - } + public Builder setUsePerSplitOutputs(boolean usePerSplitOutputs) { + this.usePerSplitOutputs = usePerSplitOutputs; + return this; + } - public SourceOperatorTestContext( - boolean idle, - boolean usePerSplitOutputs, - WatermarkStrategy<Integer> watermarkStrategy, - Output<StreamRecord<Integer>> output, - boolean supportsSplitReassignmentOnRecovery) - throws Exception { - this( - idle, - usePerSplitOutputs, - watermarkStrategy, - output, - supportsSplitReassignmentOnRecovery, - false, - (ign0, ign1) -> {}); - } + public Builder setWatermarkStrategy(WatermarkStrategy<Integer> watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + return this; + } - public SourceOperatorTestContext( - boolean idle, - boolean usePerSplitOutputs, - WatermarkStrategy<Integer> watermarkStrategy, - Output<StreamRecord<Integer>> output, - boolean supportsSplitReassignmentOnRecovery, - boolean pauseSourcesUntilFirstCheckpoint) - throws Exception { - this( - idle, - usePerSplitOutputs, - watermarkStrategy, - output, - supportsSplitReassignmentOnRecovery, - pauseSourcesUntilFirstCheckpoint, - (ign0, ign1) -> {}); + public Builder setOutput(Output<StreamRecord<Integer>> output) { + this.output = output; + return this; + } + + public Builder setSupportsSplitReassignmentOnRecovery(boolean supportsSplitReassignmentOnRecovery) { + this.supportsSplitReassignmentOnRecovery = supportsSplitReassignmentOnRecovery; + return this; + } + + public Builder setPauseSourcesUntilFirstCheckpoint( + boolean pauseSourcesUntilFirstCheckpoint) { + this.pauseSourcesUntilFirstCheckpoint = pauseSourcesUntilFirstCheckpoint; + return this; + } + + public Builder setPreInit(BiConsumer<TestTaskStateManager, OperatorID> preInit) { + this.preInit = preInit; + return this; + } + + public SourceOperatorTestContext build() throws Exception { + return new SourceOperatorTestContext( + idle, + usePerSplitOutputs, + watermarkStrategy, + output, + supportsSplitReassignmentOnRecovery, + pauseSourcesUntilFirstCheckpoint, + preInit); + } } + private MockSourceReader mockSourceReader; + private MockOperatorEventGateway mockGateway; + private TestProcessingTimeService timeService; + private SourceOperator<Integer, MockSourceSplit> operator; + public Output<StreamRecord<Integer>> output; + public SourceOperatorTestContext( boolean idle, boolean usePerSplitOutputs, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java index 7bef5d80c6c..2bedbc07fb5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java @@ -46,16 +46,16 @@ class SourceOperatorWatermarksTest { @BeforeEach void setup() throws Exception { context = - new SourceOperatorTestContext( - false, - true, - WatermarkStrategy.forGenerator( - ctx -> - new SourceOperatorAlignmentTest - .PunctuatedGenerator()) - .withTimestampAssigner((r, t) -> r), - new MockOutput<>(new ArrayList<>()), - false); + SourceOperatorTestContext.builder() + .setUsePerSplitOutputs(true) + .setWatermarkStrategy( + WatermarkStrategy.forGenerator( + ctx -> + new SourceOperatorAlignmentTest + .PunctuatedGenerator()) + .withTimestampAssigner((r, t) -> r)) + .setOutput(new MockOutput<>(new ArrayList<>())) + .build(); operator = context.getOperator(); }
