This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ade9f8f8a1659cac3a635221b94b7b20d61d831 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Oct 31 17:26:34 2024 +0100 [FLINK-39167][runtime] Initialize source output before emitting final watermark After FLINK-38939 / #27440, if the source operator was stopped while waiting for the first checkpoint then the output needs to be initialized so final watermark can be emitted; otherwise, final checkpoint might fail with java.lang.IllegalStateException This commit fixes the issue by calling initializeMainOutput if necessary. --- .../streaming/api/operators/SourceOperator.java | 5 ++ .../api/operators/SourceOperatorTest.java | 82 +++++++++++++++------- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 433fdfd3110..4b3691db8a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -563,6 +563,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr return DataInputStatus.END_OF_DATA; case DATA_FINISHED: if (watermarkAlignmentParams.isEnabled()) { + if (currentMainOutput == null) { + // if the source operator was stopped while waiting for the first checkpoint + // then the output needs to be initialized so final watermark can be emitted + initializeMainOutput(output); + } this.sampledLatestWatermark.addLatest(Watermark.MAX_WATERMARK.getTimestamp()); sampleAndEmitLatestWatermark(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 8ba9304344b..96b87a5afc1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -53,6 +53,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.function.BiConsumerWithException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -61,6 +62,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -69,6 +71,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import static java.util.Collections.singletonMap; +import static org.apache.flink.streaming.runtime.io.DataInputStatus.END_OF_INPUT; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -248,11 +251,63 @@ class SourceOperatorTest { @TestTemplate public void testPausingUntilCheckpoint() throws Exception { + testRestoredSourceOperator( + (operator, out) -> { + MockSourceSplit split = new MockSourceSplit(0); + split.addRecord(0); + operator.handleOperatorEvent( + new AddSplitEvent<>( + Collections.singletonList(split), + new MockSourceSplitSerializer())); + + operator.emitNext(new DataOutputToOutput<>(operator.output)); + + if (pauseSourcesUntilCheckpoint) { + assertThat(out).isEmpty(); + assertThat(operator.isAvailable()).isFalse(); + // un-pause + operator.snapshotState( + 2L, + 2L, + CheckpointOptions.alignedNoTimeout( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault()), + new MemCheckpointStreamFactory(10240)); + operator.emitNext(new DataOutputToOutput<>(operator.output)); + } + + assertThat(operator.isAvailable()).isTrue(); + assertThat(out.stream().map(element -> element.asRecord().getValue())) + .containsExactly(0); + }); + } + + @TestTemplate + public void testFinalSavepointRestoredSourceOperator() throws Exception { + testRestoredSourceOperator( + (operator, out) -> { + DataOutputToOutput<Integer> output = new DataOutputToOutput<>(operator.output); + operator.stop(StopMode.NO_DRAIN); // emulate stop with savepoint + DataInputStatus status; + do { + status = operator.emitNext(output); // should not fail + } while (status != END_OF_INPUT); + }); + } + + private void testRestoredSourceOperator( + BiConsumerWithException< + SourceOperator<Integer, MockSourceSplit>, + List<StreamElement>, + Exception> + test) + throws Exception { final List<StreamElement> out = new ArrayList<>(); try (SourceOperatorTestContext context = SourceOperatorTestContext.builder() .setWatermarkStrategy( WatermarkStrategy.<Integer>forMonotonousTimestamps() + .withWatermarkAlignment("ag-1", Duration.ofMillis(50)) .withTimestampAssigner( (element, recordTimestamp) -> element)) .setOutput(new CollectorOutput<>(out)) @@ -277,32 +332,7 @@ class SourceOperatorTest { final SourceOperator<Integer, MockSourceSplit> operator = context.getOperator(); operator.open(); - - MockSourceSplit split = new MockSourceSplit(0); - split.addRecord(0); - operator.handleOperatorEvent( - new AddSplitEvent<>( - Collections.singletonList(split), new MockSourceSplitSerializer())); - - operator.emitNext(new DataOutputToOutput<>(operator.output)); - - if (pauseSourcesUntilCheckpoint) { - assertThat(out).isEmpty(); - assertThat(operator.isAvailable()).isFalse(); - // un-pause - operator.snapshotState( - 2L, - 2L, - CheckpointOptions.alignedNoTimeout( - CheckpointType.CHECKPOINT, - CheckpointStorageLocationReference.getDefault()), - new MemCheckpointStreamFactory(10240)); - operator.emitNext(new DataOutputToOutput<>(operator.output)); - } - - assertThat(operator.isAvailable()).isTrue(); - assertThat(out.stream().map(element -> element.asRecord().getValue())) - .containsExactly(0); + test.accept(operator, out); } }
