This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ade9f8f8a1659cac3a635221b94b7b20d61d831
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Oct 31 17:26:34 2024 +0100

    [FLINK-39167][runtime] Initialize source output before emitting final 
watermark
    
    After FLINK-38939 / #27440, if the source operator was stopped while 
waiting for
    the first checkpoint then the output needs to be initialized so final 
watermark
    can be emitted; otherwise, final checkpoint might fail with 
java.lang.IllegalStateException
    
    This commit fixes the issue by calling initializeMainOutput if necessary.
---
 .../streaming/api/operators/SourceOperator.java    |  5 ++
 .../api/operators/SourceOperatorTest.java          | 82 +++++++++++++++-------
 2 files changed, 61 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 433fdfd3110..4b3691db8a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -563,6 +563,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                 return DataInputStatus.END_OF_DATA;
             case DATA_FINISHED:
                 if (watermarkAlignmentParams.isEnabled()) {
+                    if (currentMainOutput == null) {
+                        // if the source operator was stopped while waiting 
for the first checkpoint
+                        // then the output needs to be initialized so final 
watermark can be emitted
+                        initializeMainOutput(output);
+                    }
                     
this.sampledLatestWatermark.addLatest(Watermark.MAX_WATERMARK.getTimestamp());
                     sampleAndEmitLatestWatermark();
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 8ba9304344b..96b87a5afc1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,6 +62,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -69,6 +71,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.streaming.runtime.io.DataInputStatus.END_OF_INPUT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -248,11 +251,63 @@ class SourceOperatorTest {
 
     @TestTemplate
     public void testPausingUntilCheckpoint() throws Exception {
+        testRestoredSourceOperator(
+                (operator, out) -> {
+                    MockSourceSplit split = new MockSourceSplit(0);
+                    split.addRecord(0);
+                    operator.handleOperatorEvent(
+                            new AddSplitEvent<>(
+                                    Collections.singletonList(split),
+                                    new MockSourceSplitSerializer()));
+
+                    operator.emitNext(new 
DataOutputToOutput<>(operator.output));
+
+                    if (pauseSourcesUntilCheckpoint) {
+                        assertThat(out).isEmpty();
+                        assertThat(operator.isAvailable()).isFalse();
+                        // un-pause
+                        operator.snapshotState(
+                                2L,
+                                2L,
+                                CheckpointOptions.alignedNoTimeout(
+                                        CheckpointType.CHECKPOINT,
+                                        
CheckpointStorageLocationReference.getDefault()),
+                                new MemCheckpointStreamFactory(10240));
+                        operator.emitNext(new 
DataOutputToOutput<>(operator.output));
+                    }
+
+                    assertThat(operator.isAvailable()).isTrue();
+                    assertThat(out.stream().map(element -> 
element.asRecord().getValue()))
+                            .containsExactly(0);
+                });
+    }
+
+    @TestTemplate
+    public void testFinalSavepointRestoredSourceOperator() throws Exception {
+        testRestoredSourceOperator(
+                (operator, out) -> {
+                    DataOutputToOutput<Integer> output = new 
DataOutputToOutput<>(operator.output);
+                    operator.stop(StopMode.NO_DRAIN); // emulate stop with 
savepoint
+                    DataInputStatus status;
+                    do {
+                        status = operator.emitNext(output); // should not fail
+                    } while (status != END_OF_INPUT);
+                });
+    }
+
+    private void testRestoredSourceOperator(
+            BiConsumerWithException<
+                            SourceOperator<Integer, MockSourceSplit>,
+                            List<StreamElement>,
+                            Exception>
+                    test)
+            throws Exception {
         final List<StreamElement> out = new ArrayList<>();
         try (SourceOperatorTestContext context =
                 SourceOperatorTestContext.builder()
                         .setWatermarkStrategy(
                                 
WatermarkStrategy.<Integer>forMonotonousTimestamps()
+                                        .withWatermarkAlignment("ag-1", 
Duration.ofMillis(50))
                                         .withTimestampAssigner(
                                                 (element, recordTimestamp) -> 
element))
                         .setOutput(new CollectorOutput<>(out))
@@ -277,32 +332,7 @@ class SourceOperatorTest {
 
             final SourceOperator<Integer, MockSourceSplit> operator = 
context.getOperator();
             operator.open();
-
-            MockSourceSplit split = new MockSourceSplit(0);
-            split.addRecord(0);
-            operator.handleOperatorEvent(
-                    new AddSplitEvent<>(
-                            Collections.singletonList(split), new 
MockSourceSplitSerializer()));
-
-            operator.emitNext(new DataOutputToOutput<>(operator.output));
-
-            if (pauseSourcesUntilCheckpoint) {
-                assertThat(out).isEmpty();
-                assertThat(operator.isAvailable()).isFalse();
-                // un-pause
-                operator.snapshotState(
-                        2L,
-                        2L,
-                        CheckpointOptions.alignedNoTimeout(
-                                CheckpointType.CHECKPOINT,
-                                
CheckpointStorageLocationReference.getDefault()),
-                        new MemCheckpointStreamFactory(10240));
-                operator.emitNext(new DataOutputToOutput<>(operator.output));
-            }
-
-            assertThat(operator.isAvailable()).isTrue();
-            assertThat(out.stream().map(element -> 
element.asRecord().getValue()))
-                    .containsExactly(0);
+            test.accept(operator, out);
         }
     }
 

Reply via email to