This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 98045aad2c0 [FLINK-36751] Fix PausableRelativeClock does not pause
when the source only has one split
98045aad2c0 is described below
commit 98045aad2c0ace527893bb20a9daa054aa48adbf
Author: haishui <[email protected]>
AuthorDate: Fri Nov 22 16:49:38 2024 +0800
[FLINK-36751] Fix PausableRelativeClock does not pause when the source only
has one split
---
.../streaming/api/operators/SourceOperator.java | 13 ++----
.../SourceOperatorSplitWatermarkAlignmentTest.java | 46 +++++++++++++++++++++-
2 files changed, 48 insertions(+), 11 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b915ab5ba88..f02e79dd118 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -173,7 +173,6 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
private final List<SplitT> splitsToInitializeOutput = new ArrayList<>();
- private int numSplits;
private final Map<String, Long> splitCurrentWatermarks = new HashMap<>();
private final Set<String> currentlyPausedSplits = new HashSet<>();
@@ -615,7 +614,6 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
try {
List<SplitT> newSplits = event.splits(splitSerializer);
- numSplits += newSplits.size();
if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
// For splits arrived before the main output is initialized,
store them into the
// pending list. Outputs of these splits will be created once
the main output is
@@ -656,9 +654,7 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
@Override
public void updateCurrentSplitWatermark(String splitId, long watermark) {
splitCurrentWatermarks.put(splitId, watermark);
- if (numSplits > 1
- && watermark > currentMaxDesiredWatermark
- && !currentlyPausedSplits.contains(splitId)) {
+ if (watermark > currentMaxDesiredWatermark &&
!currentlyPausedSplits.contains(splitId)) {
pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
currentlyPausedSplits.add(splitId);
}
@@ -676,11 +672,6 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
* <p>Note: This takes effect only if there are multiple splits, otherwise
it does nothing.
*/
private void checkSplitWatermarkAlignment() {
- if (numSplits <= 1) {
- // A single split can't overtake any other splits assigned to this
operator instance.
- // It is sufficient for the source to stop processing.
- return;
- }
Collection<String> splitsToPause = new ArrayList<>();
Collection<String> splitsToResume = new ArrayList<>();
splitCurrentWatermarks.forEach(
@@ -717,12 +708,14 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
if (shouldWaitForAlignment()) {
operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
waitingForAlignmentFuture = new CompletableFuture<>();
+ mainInputActivityClock.pause();
}
} else if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
checkState(!waitingForAlignmentFuture.isDone());
if (!shouldWaitForAlignment()) {
operatingMode = OperatingMode.READING;
waitingForAlignmentFuture.complete(null);
+ mainInputActivityClock.unPause();
}
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 3031801b07c..80bfb95a898 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -48,10 +48,13 @@ import org.apache.flink.streaming.util.MockStreamConfig;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
@@ -172,8 +175,49 @@ class SourceOperatorSplitWatermarkAlignmentTest {
assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testSingleSplitWatermarkAlignmentAndIdleness(boolean
usePerSplitOutputs) throws Exception {
+ long idleTimeout = 100;
+ MockSourceReader sourceReader =
+ new MockSourceReader(
+ WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false,
usePerSplitOutputs);
+ TestProcessingTimeService processingTimeService = new
TestProcessingTimeService();
+ processingTimeService.setCurrentTime(1);
+ SourceOperator<Integer, MockSourceSplit> operator =
+ createAndOpenSourceOperatorWithIdleness(
+ sourceReader, processingTimeService, idleTimeout);
+
+ MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+ int maxAllowedWatermark = 4;
+ int maxEmittedWatermark = maxAllowedWatermark + 1;
+ // enough records should emit from split0 to make the mainSplit or
perSplit is idle,
+ // then split0 gets blocked and record (maxEmittedWatermark + 100) is
never emitted from
+ // split0
+ split0.addRecord(1)
+ .addRecord(1)
+ .addRecord(1)
+ .addRecord(1)
+ .addRecord(maxEmittedWatermark)
+ .addRecord(maxEmittedWatermark + 100);
+
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(
+ Collections.singletonList(split0), new
MockSourceSplitSerializer()));
+ CollectingDataOutput<Integer> dataOutput = new
CollectingDataOutput<>();
+
+ operator.handleOperatorEvent(
+ new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks
split0
+
+ for (int i = 0; i < 10; i++) {
+ operator.emitNext(dataOutput);
+ processingTimeService.advance(idleTimeout);
+ }
+
assertThat(dataOutput.getEvents()).doesNotContain(WatermarkStatus.IDLE);
+ }
+
@Test
- void testSplitWatermarkAlignmentAndIdleness() throws Exception {
+ void testMultiSplitWatermarkAlignmentAndIdleness() throws Exception {
long idleTimeout = 100;
MockSourceReader sourceReader =
new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS,
false, true);