This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06d4c04324962826c47698d432fa5e839cb34889
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Thu Jun 22 23:39:45 2023 +0800

    [FLINK-32414][connectors/common] Don't emitLatestWatermark when 
lastEmittedWatermark is UNINITIALIZED
---
 .../streaming/api/operators/SourceOperator.java    |  3 ++
 .../api/operators/SourceOperatorAlignmentTest.java | 41 ++++++++++++++++++++++
 2 files changed, 44 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 948d33b7081..e636b9a7bba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -469,6 +469,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private void emitLatestWatermark(long time) {
         checkState(currentMainOutput != null);
+        if (lastEmittedWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
+            return;
+        }
         operatorEventGateway.sendEventToCoordinator(
                 new ReportedWatermarkEvent(lastEmittedWatermark));
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index 3f8849c5011..ce7637ceb25 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -192,6 +192,39 @@ class SourceOperatorAlignmentTest {
         }
     }
 
+    @Test
+    void testWatermarkAlignmentWithoutSplit() throws Exception {
+        operator.initializeState(context.createStateContext());
+        operator.open();
+
+        CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+
+        // Don't report any ReportedWatermarkEvent
+        context.getTimeService().advance(1);
+        assertNoReportedWatermarkEvent(context);
+
+        context.getTimeService().advance(1);
+        assertNoReportedWatermarkEvent(context);
+
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
+
+        MockSourceSplit newSplit = new MockSourceSplit(2);
+        int record = 10;
+        newSplit.addRecord(record);
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
+
+        List<Integer> expectedOutput = new ArrayList<>();
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        expectedOutput.add(record);
+
+        context.getTimeService().advance(1);
+        assertLatestReportedWatermarkEvent(record);
+        assertOutput(actualOutput, expectedOutput);
+    }
+
     @Test
     void testStopWhileWaitingForWatermarkAlignment() throws Exception {
         testWatermarkAlignment();
@@ -260,6 +293,14 @@ class SourceOperatorAlignmentTest {
                 .isEqualTo(new ReportedWatermarkEvent(expectedWatermark));
     }
 
+    private void assertNoReportedWatermarkEvent(SourceOperatorTestContext 
context) {
+        List<OperatorEvent> events =
+                context.getGateway().getEventsSent().stream()
+                        .filter(event -> event instanceof 
ReportedWatermarkEvent)
+                        .collect(Collectors.toList());
+        assertThat(events).isEmpty();
+    }
+
     private static class PunctuatedGenerator implements 
WatermarkGenerator<Integer> {
 
         private enum GenerationMode {

Reply via email to