This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit deadc2154f81529acbd3a257d9c9b61438d8c31a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Apr 3 18:41:43 2025 +0200

    [hotfix][tests] Extract updateIntervalMillis in SourceOperatorAlignmentTest
---
 .../api/operators/SourceOperatorAlignmentTest.java | 28 ++++++++++++----------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index b531d9b5d18..5c50a044430 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -55,6 +55,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 @SuppressWarnings("serial")
 class SourceOperatorAlignmentTest {
 
+    private static final int updateIntervalMillis = 1;
+
     @Nullable private SourceOperatorTestContext context;
     @Nullable private SourceOperator<Integer, MockSourceSplit> operator;
 
@@ -66,7 +68,9 @@ class SourceOperatorAlignmentTest {
                         WatermarkStrategy.forGenerator(ctx -> new 
PunctuatedGenerator())
                                 .withTimestampAssigner((r, t) -> r)
                                 .withWatermarkAlignment(
-                                        "group1", Duration.ofMillis(100), 
Duration.ofMillis(1)),
+                                        "group1",
+                                        Duration.ofMillis(100),
+                                        
Duration.ofMillis(updateIntervalMillis)),
                         false);
         operator = context.getOperator();
     }
@@ -99,7 +103,7 @@ class SourceOperatorAlignmentTest {
 
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
         expectedOutput.add(record1);
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertLatestReportedWatermarkEvent(record1);
         assertOutput(actualOutput, expectedOutput);
         assertThat(operator.isAvailable()).isTrue();
@@ -122,7 +126,7 @@ class SourceOperatorAlignmentTest {
         // operator must be unavailable.
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
         expectedOutput.add(record2);
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertLatestReportedWatermarkEvent(record2);
         assertOutput(actualOutput, expectedOutput);
         assertThat(operator.isAvailable()).isFalse();
@@ -159,7 +163,7 @@ class SourceOperatorAlignmentTest {
 
             
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
             expectedOutput.add(record1);
-            context.getTimeService().advance(1);
+            context.getTimeService().advance(updateIntervalMillis);
             assertLatestReportedWatermarkEvent(context, record1);
             // mock WatermarkAlignmentEvent from SourceCoordinator
             operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 
100));
@@ -169,7 +173,7 @@ class SourceOperatorAlignmentTest {
             // source becomes idle, it should report Long.MAX_VALUE as the 
watermark
             assertThat(operator.emitNext(actualOutput))
                     .isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
-            context.getTimeService().advance(1);
+            context.getTimeService().advance(updateIntervalMillis);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
 
             if (allSubtasksIdle) {
@@ -195,7 +199,7 @@ class SourceOperatorAlignmentTest {
 
             
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
             expectedOutput.add(record2);
-            context.getTimeService().advance(1);
+            context.getTimeService().advance(updateIntervalMillis);
             // becomes active again, should go back to the previously emitted
             // watermark, as the record2 does not emit watermarks
             assertLatestReportedWatermarkEvent(context, record1);
@@ -214,10 +218,10 @@ class SourceOperatorAlignmentTest {
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
 
         // Don't report any ReportedWatermarkEvent
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertNoReportedWatermarkEvent(context);
 
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertNoReportedWatermarkEvent(context);
 
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
@@ -233,7 +237,7 @@ class SourceOperatorAlignmentTest {
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
         expectedOutput.add(record);
 
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertLatestReportedWatermarkEvent(record);
         assertOutput(actualOutput, expectedOutput);
     }
@@ -267,7 +271,7 @@ class SourceOperatorAlignmentTest {
         CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
 
         operator.emitNext(actualOutput);
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertLatestReportedWatermarkEvent(record1);
 
         operator.handleOperatorEvent(
@@ -275,7 +279,7 @@ class SourceOperatorAlignmentTest {
                         Collections.singletonList(split2), new 
MockSourceSplitSerializer()));
 
         operator.emitNext(actualOutput);
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         assertLatestReportedWatermarkEvent(record1);
     }
 
@@ -305,7 +309,7 @@ class SourceOperatorAlignmentTest {
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_DATA);
 
         
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_INPUT);
-        context.getTimeService().advance(1);
+        context.getTimeService().advance(updateIntervalMillis);
         
assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp());
     }
 

Reply via email to