This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit deadc2154f81529acbd3a257d9c9b61438d8c31a Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Apr 3 18:41:43 2025 +0200 [hotfix][tests] Extract updateIntervalMillis in SourceOperatorAlignmentTest --- .../api/operators/SourceOperatorAlignmentTest.java | 28 ++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index b531d9b5d18..5c50a044430 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -55,6 +55,8 @@ import static org.assertj.core.api.Assertions.assertThat; @SuppressWarnings("serial") class SourceOperatorAlignmentTest { + private static final int updateIntervalMillis = 1; + @Nullable private SourceOperatorTestContext context; @Nullable private SourceOperator<Integer, MockSourceSplit> operator; @@ -66,7 +68,9 @@ class SourceOperatorAlignmentTest { WatermarkStrategy.forGenerator(ctx -> new PunctuatedGenerator()) .withTimestampAssigner((r, t) -> r) .withWatermarkAlignment( - "group1", Duration.ofMillis(100), Duration.ofMillis(1)), + "group1", + Duration.ofMillis(100), + Duration.ofMillis(updateIntervalMillis)), false); operator = context.getOperator(); } @@ -99,7 +103,7 @@ class SourceOperatorAlignmentTest { assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record1); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(record1); assertOutput(actualOutput, expectedOutput); assertThat(operator.isAvailable()).isTrue(); @@ -122,7 +126,7 @@ class SourceOperatorAlignmentTest { // operator must be unavailable. assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); expectedOutput.add(record2); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(record2); assertOutput(actualOutput, expectedOutput); assertThat(operator.isAvailable()).isFalse(); @@ -159,7 +163,7 @@ class SourceOperatorAlignmentTest { assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record1); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(context, record1); // mock WatermarkAlignmentEvent from SourceCoordinator operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100)); @@ -169,7 +173,7 @@ class SourceOperatorAlignmentTest { // source becomes idle, it should report Long.MAX_VALUE as the watermark assertThat(operator.emitNext(actualOutput)) .isEqualTo(DataInputStatus.NOTHING_AVAILABLE); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE); if (allSubtasksIdle) { @@ -195,7 +199,7 @@ class SourceOperatorAlignmentTest { assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record2); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); // becomes active again, should go back to the previously emitted // watermark, as the record2 does not emit watermarks assertLatestReportedWatermarkEvent(context, record1); @@ -214,10 +218,10 @@ class SourceOperatorAlignmentTest { assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); // Don't report any ReportedWatermarkEvent - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertNoReportedWatermarkEvent(context); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertNoReportedWatermarkEvent(context); assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); @@ -233,7 +237,7 @@ class SourceOperatorAlignmentTest { assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); expectedOutput.add(record); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(record); assertOutput(actualOutput, expectedOutput); } @@ -267,7 +271,7 @@ class SourceOperatorAlignmentTest { CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); operator.emitNext(actualOutput); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(record1); operator.handleOperatorEvent( @@ -275,7 +279,7 @@ class SourceOperatorAlignmentTest { Collections.singletonList(split2), new MockSourceSplitSerializer())); operator.emitNext(actualOutput); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(record1); } @@ -305,7 +309,7 @@ class SourceOperatorAlignmentTest { assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_DATA); assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_INPUT); - context.getTimeService().advance(1); + context.getTimeService().advance(updateIntervalMillis); assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp()); }
