This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f10d5e3be03f8427bc3dd295ea08638234fdf534 Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Aug 5 15:24:46 2024 +0200 [hotfix] Refactor SourceOperatorSlitWatermarkAlignmentTest and support pausing splits in MockSourceReader --- .../connector/source/mocks/MockSourceReader.java | 25 ++++++++- .../SourceOperatorSplitWatermarkAlignmentTest.java | 64 +++++++++------------- 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index 2c1d4b40867..facfcf346cd 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -26,11 +26,15 @@ import org.apache.flink.core.io.InputStatus; import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; /** A mock {@link SourceReader} for unit tests. */ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> { + private final Set<String> pausedSplits = new HashSet<>(); private final List<MockSourceSplit> assignedSplits = new ArrayList<>(); private final List<SourceEvent> receivedSourceEvents = new ArrayList<>(); private final List<Long> completedCheckpoints = new ArrayList<>(); @@ -100,11 +104,16 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> || waitingForSplitsBehaviour == WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS; currentSplitIndex = 0; // Find first splits with available records. - while (currentSplitIndex < assignedSplits.size() - && !assignedSplits.get(currentSplitIndex).isAvailable()) { - finished &= assignedSplits.get(currentSplitIndex).isFinished(); + for (MockSourceSplit assignedSplit : assignedSplits) { + finished &= assignedSplit.isFinished(); + if (!pausedSplits.contains(assignedSplit.splitId())) { + if (assignedSplit.isAvailable()) { + break; + } + } currentSplitIndex++; } + // Read from the split with available record. if (currentSplitIndex < assignedSplits.size()) { if (idle) { @@ -151,6 +160,12 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> markAvailable(); } + public void pauseOrResumeSplits( + Collection<String> splitsToPause, Collection<String> splitsToResume) { + pausedSplits.removeAll(splitsToResume); + pausedSplits.addAll(splitsToPause); + } + @Override public void notifyNoMoreSplits() { splitsAssignmentState = SplitsAssignmentState.NO_MORE_SPLITS; @@ -224,4 +239,8 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> public List<Long> getAbortedCheckpoints() { return abortedCheckpoints; } + + public Set<String> getPausedSplits() { + return pausedSplits; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index de80d7f4ef5..8be17a021d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.mocks.MockSourceReader; +import org.apache.flink.api.connector.source.mocks.MockSourceReader.WaitingForSplits; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; @@ -42,46 +43,27 @@ import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.MockOutput; import org.apache.flink.streaming.util.MockStreamConfig; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; /** Unit test for split alignment in {@link SourceOperator}. */ -public class SourceOperatorSplitWatermarkAlignmentTest { - public static final WatermarkGenerator<Integer> WATERMARK_GENERATOR = - new WatermarkGenerator<Integer>() { - - private long maxWatermark = Long.MIN_VALUE; - - @Override - public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) { - if (eventTimestamp > maxWatermark) { - this.maxWatermark = eventTimestamp; - output.emitWatermark(new Watermark(maxWatermark)); - } - } - - @Override - public void onPeriodicEmit(WatermarkOutput output) { - output.emitWatermark(new Watermark(maxWatermark)); - } - }; +class SourceOperatorSplitWatermarkAlignmentTest { @Test public void testSplitWatermarkAlignment() throws Exception { - final SplitAligningSourceReader sourceReader = new SplitAligningSourceReader(); + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); SourceOperator<Integer, MockSourceSplit> operator = new TestingSourceOperator<>( sourceReader, - WatermarkStrategy.forGenerator(ctx -> WATERMARK_GENERATOR) + WatermarkStrategy.forGenerator(ctx -> new TestWatermarkGenerator()) .withTimestampAssigner((r, l) -> r) .withWatermarkAlignment("group-1", Duration.ofMillis(1)), new TestProcessingTimeService(), @@ -97,8 +79,8 @@ public class SourceOperatorSplitWatermarkAlignmentTest { operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend())); operator.open(); - final MockSourceSplit split1 = new MockSourceSplit(0, 0, 10); - final MockSourceSplit split2 = new MockSourceSplit(1, 10, 20); + MockSourceSplit split1 = new MockSourceSplit(0, 0, 10); + MockSourceSplit split2 = new MockSourceSplit(1, 10, 20); split1.addRecord(5); split1.addRecord(11); split2.addRecord(3); @@ -107,25 +89,25 @@ public class SourceOperatorSplitWatermarkAlignmentTest { operator.handleOperatorEvent( new AddSplitEvent<>( Arrays.asList(split1, split2), new MockSourceSplitSerializer())); - final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); + CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); operator.emitNext(dataOutput); // split 1 emits 5 operator.handleOperatorEvent( new WatermarkAlignmentEvent(4)); // pause by coordinator message - assertThat(sourceReader.pausedSplits).containsExactly("0"); + assertThat(sourceReader.getPausedSplits()).containsExactly("0"); operator.handleOperatorEvent(new WatermarkAlignmentEvent(5)); - assertThat(sourceReader.pausedSplits).isEmpty(); + assertThat(sourceReader.getPausedSplits()).isEmpty(); operator.emitNext(dataOutput); // split 1 emits 11 operator.emitNext(dataOutput); // split 2 emits 3 - assertThat(sourceReader.pausedSplits).containsExactly("0"); + assertThat(sourceReader.getPausedSplits()).containsExactly("0"); operator.emitNext(dataOutput); // split 2 emits 6 - assertThat(sourceReader.pausedSplits).containsExactly("0", "1"); + assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1"); } private Environment getTestingEnvironment() { @@ -139,17 +121,21 @@ public class SourceOperatorSplitWatermarkAlignmentTest { new TestTaskStateManager()); } - private static class SplitAligningSourceReader extends MockSourceReader { - Set<String> pausedSplits = new HashSet<>(); + private static class TestWatermarkGenerator implements WatermarkGenerator<Integer> { - public SplitAligningSourceReader() { - super(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + private long maxWatermark = Long.MIN_VALUE; + + @Override + public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) { + if (eventTimestamp > maxWatermark) { + this.maxWatermark = eventTimestamp; + output.emitWatermark(new Watermark(maxWatermark)); + } } - public void pauseOrResumeSplits( - Collection<String> splitsToPause, Collection<String> splitsToResume) { - pausedSplits.removeAll(splitsToResume); - pausedSplits.addAll(splitsToPause); + @Override + public void onPeriodicEmit(WatermarkOutput output) { + output.emitWatermark(new Watermark(maxWatermark)); } } }
