This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 476e27a37ed2da4acb954244dc2dacedafbf20b2 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Apr 4 08:41:35 2025 +0200 [hotfix][tests] Refactor SourceOperatorSplitWatermarkAlignmentTest --- .../SourceOperatorSplitWatermarkAlignmentTest.java | 42 +++++++++------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index b5246f7b518..2c26b920a77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -76,6 +76,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** Unit test for split alignment in {@link SourceOperator}. */ class SourceOperatorSplitWatermarkAlignmentTest { + private static final int updateIntervalMillis = 1; + @Test void testSplitWatermarkAlignment() throws Exception { @@ -83,29 +85,9 @@ class SourceOperatorSplitWatermarkAlignmentTest { new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); Environment env = getTestingEnvironment(); SourceOperator<Integer, MockSourceSplit> operator = - new TestingSourceOperator<>( - new StreamOperatorParameters<>( - new SourceOperatorStreamTask<Integer>(env), - new MockStreamConfig(new Configuration(), 1), - new MockOutput<>(new ArrayList<>()), - TestProcessingTimeService::new, - null, - null), - sourceReader, - WatermarkStrategy.forGenerator(ctx -> new TestWatermarkGenerator()) - .withTimestampAssigner((r, l) -> r) - .withWatermarkAlignment("group-1", Duration.ofMillis(1)), - new TestProcessingTimeService(), - new MockOperatorEventGateway(), - 1, - 5, - true, - false, - false); - operator.initializeState( - new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); + createAndOpenSourceOperatorWithIdleness( + sourceReader, new TestProcessingTimeService(), 0); - operator.open(); MockSourceSplit split1 = new MockSourceSplit(0, 0, 10); MockSourceSplit split2 = new MockSourceSplit(1, 10, 20); split1.addRecord(5); @@ -531,6 +513,17 @@ class SourceOperatorSplitWatermarkAlignmentTest { long idleTimeout, Environment env) throws Exception { + + WatermarkStrategy<Integer> watermarkStrategy = + WatermarkStrategy.forGenerator(ctx -> new TestWatermarkGenerator()) + .withTimestampAssigner((r, l) -> r) + .withWatermarkAlignment( + "group-1", + Duration.ofMillis(1), + Duration.ofMillis(updateIntervalMillis)); + if (idleTimeout > 0) { + watermarkStrategy = watermarkStrategy.withIdleness(Duration.ofMillis(idleTimeout)); + } SourceOperator<Integer, MockSourceSplit> operator = new TestingSourceOperator<>( new StreamOperatorParameters<>( @@ -541,10 +534,7 @@ class SourceOperatorSplitWatermarkAlignmentTest { null, null), sourceReader, - WatermarkStrategy.forGenerator(ctx -> new TestWatermarkGenerator()) - .withTimestampAssigner((r, l) -> r) - .withWatermarkAlignment("group-1", Duration.ofMillis(1)) - .withIdleness(Duration.ofMillis(idleTimeout)), + watermarkStrategy, processingTimeService, new MockOperatorEventGateway(), 1,
