This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f10d5e3be03f8427bc3dd295ea08638234fdf534
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Aug 5 15:24:46 2024 +0200

    [hotfix] Refactor SourceOperatorSlitWatermarkAlignmentTest and support 
pausing splits in MockSourceReader
---
 .../connector/source/mocks/MockSourceReader.java   | 25 ++++++++-
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 64 +++++++++-------------
 2 files changed, 47 insertions(+), 42 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 2c1d4b40867..facfcf346cd 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -26,11 +26,15 @@ import org.apache.flink.core.io.InputStatus;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 /** A mock {@link SourceReader} for unit tests. */
 public class MockSourceReader implements SourceReader<Integer, 
MockSourceSplit> {
+    private final Set<String> pausedSplits = new HashSet<>();
     private final List<MockSourceSplit> assignedSplits = new ArrayList<>();
     private final List<SourceEvent> receivedSourceEvents = new ArrayList<>();
     private final List<Long> completedCheckpoints = new ArrayList<>();
@@ -100,11 +104,16 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                         || waitingForSplitsBehaviour == 
WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS;
         currentSplitIndex = 0;
         // Find first splits with available records.
-        while (currentSplitIndex < assignedSplits.size()
-                && !assignedSplits.get(currentSplitIndex).isAvailable()) {
-            finished &= assignedSplits.get(currentSplitIndex).isFinished();
+        for (MockSourceSplit assignedSplit : assignedSplits) {
+            finished &= assignedSplit.isFinished();
+            if (!pausedSplits.contains(assignedSplit.splitId())) {
+                if (assignedSplit.isAvailable()) {
+                    break;
+                }
+            }
             currentSplitIndex++;
         }
+
         // Read from the split with available record.
         if (currentSplitIndex < assignedSplits.size()) {
             if (idle) {
@@ -151,6 +160,12 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
         markAvailable();
     }
 
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        pausedSplits.removeAll(splitsToResume);
+        pausedSplits.addAll(splitsToPause);
+    }
+
     @Override
     public void notifyNoMoreSplits() {
         splitsAssignmentState = SplitsAssignmentState.NO_MORE_SPLITS;
@@ -224,4 +239,8 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
     public List<Long> getAbortedCheckpoints() {
         return abortedCheckpoints;
     }
+
+    public Set<String> getPausedSplits() {
+        return pausedSplits;
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index de80d7f4ef5..8be17a021d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
+import 
org.apache.flink.api.connector.source.mocks.MockSourceReader.WaitingForSplits;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
@@ -42,46 +43,27 @@ import 
org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.MockOutput;
 import org.apache.flink.streaming.util.MockStreamConfig;
 
+import org.assertj.core.api.Condition;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for split alignment in {@link SourceOperator}. */
-public class SourceOperatorSplitWatermarkAlignmentTest {
-    public static final WatermarkGenerator<Integer> WATERMARK_GENERATOR =
-            new WatermarkGenerator<Integer>() {
-
-                private long maxWatermark = Long.MIN_VALUE;
-
-                @Override
-                public void onEvent(Integer event, long eventTimestamp, 
WatermarkOutput output) {
-                    if (eventTimestamp > maxWatermark) {
-                        this.maxWatermark = eventTimestamp;
-                        output.emitWatermark(new Watermark(maxWatermark));
-                    }
-                }
-
-                @Override
-                public void onPeriodicEmit(WatermarkOutput output) {
-                    output.emitWatermark(new Watermark(maxWatermark));
-                }
-            };
+class SourceOperatorSplitWatermarkAlignmentTest {
 
     @Test
     public void testSplitWatermarkAlignment() throws Exception {
 
-        final SplitAligningSourceReader sourceReader = new 
SplitAligningSourceReader();
+        MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);
         SourceOperator<Integer, MockSourceSplit> operator =
                 new TestingSourceOperator<>(
                         sourceReader,
-                        WatermarkStrategy.forGenerator(ctx -> 
WATERMARK_GENERATOR)
+                        WatermarkStrategy.forGenerator(ctx -> new 
TestWatermarkGenerator())
                                 .withTimestampAssigner((r, l) -> r)
                                 .withWatermarkAlignment("group-1", 
Duration.ofMillis(1)),
                         new TestProcessingTimeService(),
@@ -97,8 +79,8 @@ public class SourceOperatorSplitWatermarkAlignmentTest {
         operator.initializeState(new StreamTaskStateInitializerImpl(env, new 
MemoryStateBackend()));
 
         operator.open();
-        final MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
-        final MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
+        MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
+        MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
         split1.addRecord(5);
         split1.addRecord(11);
         split2.addRecord(3);
@@ -107,25 +89,25 @@ public class SourceOperatorSplitWatermarkAlignmentTest {
         operator.handleOperatorEvent(
                 new AddSplitEvent<>(
                         Arrays.asList(split1, split2), new 
MockSourceSplitSerializer()));
-        final CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
+        CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
 
         operator.emitNext(dataOutput); // split 1 emits 5
 
         operator.handleOperatorEvent(
                 new WatermarkAlignmentEvent(4)); // pause by coordinator 
message
-        assertThat(sourceReader.pausedSplits).containsExactly("0");
+        assertThat(sourceReader.getPausedSplits()).containsExactly("0");
 
         operator.handleOperatorEvent(new WatermarkAlignmentEvent(5));
-        assertThat(sourceReader.pausedSplits).isEmpty();
+        assertThat(sourceReader.getPausedSplits()).isEmpty();
 
         operator.emitNext(dataOutput); // split 1 emits 11
         operator.emitNext(dataOutput); // split 2 emits 3
 
-        assertThat(sourceReader.pausedSplits).containsExactly("0");
+        assertThat(sourceReader.getPausedSplits()).containsExactly("0");
 
         operator.emitNext(dataOutput); // split 2 emits 6
 
-        assertThat(sourceReader.pausedSplits).containsExactly("0", "1");
+        assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1");
     }
 
     private Environment getTestingEnvironment() {
@@ -139,17 +121,21 @@ public class SourceOperatorSplitWatermarkAlignmentTest {
                 new TestTaskStateManager());
     }
 
-    private static class SplitAligningSourceReader extends MockSourceReader {
-        Set<String> pausedSplits = new HashSet<>();
+    private static class TestWatermarkGenerator implements 
WatermarkGenerator<Integer> {
 
-        public SplitAligningSourceReader() {
-            super(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true);
+        private long maxWatermark = Long.MIN_VALUE;
+
+        @Override
+        public void onEvent(Integer event, long eventTimestamp, 
WatermarkOutput output) {
+            if (eventTimestamp > maxWatermark) {
+                this.maxWatermark = eventTimestamp;
+                output.emitWatermark(new Watermark(maxWatermark));
+            }
         }
 
-        public void pauseOrResumeSplits(
-                Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
-            pausedSplits.removeAll(splitsToResume);
-            pausedSplits.addAll(splitsToPause);
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {
+            output.emitWatermark(new Watermark(maxWatermark));
         }
     }
 }

Reply via email to