This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 98045aad2c0 [FLINK-36751] Fix PausableRelativeClock does not pause 
when the source only has one split
98045aad2c0 is described below

commit 98045aad2c0ace527893bb20a9daa054aa48adbf
Author: haishui <[email protected]>
AuthorDate: Fri Nov 22 16:49:38 2024 +0800

    [FLINK-36751] Fix PausableRelativeClock does not pause when the source only 
has one split
---
 .../streaming/api/operators/SourceOperator.java    | 13 ++----
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 46 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 11 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b915ab5ba88..f02e79dd118 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -173,7 +173,6 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private final List<SplitT> splitsToInitializeOutput = new ArrayList<>();
 
-    private int numSplits;
     private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
     private final Set<String> currentlyPausedSplits = new HashSet<>();
 
@@ -615,7 +614,6 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
         try {
             List<SplitT> newSplits = event.splits(splitSerializer);
-            numSplits += newSplits.size();
             if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
                 // For splits arrived before the main output is initialized, 
store them into the
                 // pending list. Outputs of these splits will be created once 
the main output is
@@ -656,9 +654,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     @Override
     public void updateCurrentSplitWatermark(String splitId, long watermark) {
         splitCurrentWatermarks.put(splitId, watermark);
-        if (numSplits > 1
-                && watermark > currentMaxDesiredWatermark
-                && !currentlyPausedSplits.contains(splitId)) {
+        if (watermark > currentMaxDesiredWatermark && 
!currentlyPausedSplits.contains(splitId)) {
             pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
             currentlyPausedSplits.add(splitId);
         }
@@ -676,11 +672,6 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
      * <p>Note: This takes effect only if there are multiple splits, otherwise 
it does nothing.
      */
     private void checkSplitWatermarkAlignment() {
-        if (numSplits <= 1) {
-            // A single split can't overtake any other splits assigned to this 
operator instance.
-            // It is sufficient for the source to stop processing.
-            return;
-        }
         Collection<String> splitsToPause = new ArrayList<>();
         Collection<String> splitsToResume = new ArrayList<>();
         splitCurrentWatermarks.forEach(
@@ -717,12 +708,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             if (shouldWaitForAlignment()) {
                 operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
                 waitingForAlignmentFuture = new CompletableFuture<>();
+                mainInputActivityClock.pause();
             }
         } else if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
             checkState(!waitingForAlignmentFuture.isDone());
             if (!shouldWaitForAlignment()) {
                 operatingMode = OperatingMode.READING;
                 waitingForAlignmentFuture.complete(null);
+                mainInputActivityClock.unPause();
             }
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 3031801b07c..80bfb95a898 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -48,10 +48,13 @@ import org.apache.flink.streaming.util.MockStreamConfig;
 
 import org.assertj.core.api.Condition;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -172,8 +175,49 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSingleSplitWatermarkAlignmentAndIdleness(boolean 
usePerSplitOutputs) throws Exception {
+        long idleTimeout = 100;
+        MockSourceReader sourceReader =
+                new MockSourceReader(
+                        WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, 
usePerSplitOutputs);
+        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        processingTimeService.setCurrentTime(1);
+        SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, processingTimeService, idleTimeout);
+
+        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+        int maxAllowedWatermark = 4;
+        int maxEmittedWatermark = maxAllowedWatermark + 1;
+        // enough records should emit from split0 to make the mainSplit or 
perSplit is idle,
+        // then split0 gets blocked and record (maxEmittedWatermark + 100) is 
never emitted from
+        // split0
+        split0.addRecord(1)
+                .addRecord(1)
+                .addRecord(1)
+                .addRecord(1)
+                .addRecord(maxEmittedWatermark)
+                .addRecord(maxEmittedWatermark + 100);
+
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Collections.singletonList(split0), new 
MockSourceSplitSerializer()));
+        CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
+
+        operator.handleOperatorEvent(
+                new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks 
split0
+
+        for (int i = 0; i < 10; i++) {
+            operator.emitNext(dataOutput);
+            processingTimeService.advance(idleTimeout);
+        }
+        
assertThat(dataOutput.getEvents()).doesNotContain(WatermarkStatus.IDLE);
+    }
+
     @Test
-    void testSplitWatermarkAlignmentAndIdleness() throws Exception {
+    void testMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
         long idleTimeout = 100;
         MockSourceReader sourceReader =
                 new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);

Reply via email to