This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06d4c04324962826c47698d432fa5e839cb34889 Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Thu Jun 22 23:39:45 2023 +0800 [FLINK-32414][connectors/common] Don't emitLatestWatermark when lastEmittedWatermark is UNINITIALIZED --- .../streaming/api/operators/SourceOperator.java | 3 ++ .../api/operators/SourceOperatorAlignmentTest.java | 41 ++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 948d33b7081..e636b9a7bba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -469,6 +469,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private void emitLatestWatermark(long time) { checkState(currentMainOutput != null); + if (lastEmittedWatermark == Watermark.UNINITIALIZED.getTimestamp()) { + return; + } operatorEventGateway.sendEventToCoordinator( new ReportedWatermarkEvent(lastEmittedWatermark)); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index 3f8849c5011..ce7637ceb25 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -192,6 +192,39 @@ class SourceOperatorAlignmentTest { } } + @Test + void testWatermarkAlignmentWithoutSplit() throws Exception { + operator.initializeState(context.createStateContext()); + operator.open(); + + CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); + + // Don't report any ReportedWatermarkEvent + context.getTimeService().advance(1); + assertNoReportedWatermarkEvent(context); + + context.getTimeService().advance(1); + assertNoReportedWatermarkEvent(context); + + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE); + + MockSourceSplit newSplit = new MockSourceSplit(2); + int record = 10; + newSplit.addRecord(record); + operator.handleOperatorEvent( + new AddSplitEvent<>( + Collections.singletonList(newSplit), new MockSourceSplitSerializer())); + + List<Integer> expectedOutput = new ArrayList<>(); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); + expectedOutput.add(record); + + context.getTimeService().advance(1); + assertLatestReportedWatermarkEvent(record); + assertOutput(actualOutput, expectedOutput); + } + @Test void testStopWhileWaitingForWatermarkAlignment() throws Exception { testWatermarkAlignment(); @@ -260,6 +293,14 @@ class SourceOperatorAlignmentTest { .isEqualTo(new ReportedWatermarkEvent(expectedWatermark)); } + private void assertNoReportedWatermarkEvent(SourceOperatorTestContext context) { + List<OperatorEvent> events = + context.getGateway().getEventsSent().stream() + .filter(event -> event instanceof ReportedWatermarkEvent) + .collect(Collectors.toList()); + assertThat(events).isEmpty(); + } + private static class PunctuatedGenerator implements WatermarkGenerator<Integer> { private enum GenerationMode {