This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c71582d5837b067f26f8aec610193bacf6ccb26b
Author: Qingsheng Ren <[email protected]>
AuthorDate: Fri Feb 11 18:19:10 2022 +0800

    [FLINK-26018][connector/common] Create per-split output on split addition 
in SourceOperator
    
    This change could avoid watermark being pushed forward by records from the 
first split in the first fetch when multiple splits are assigned to the source 
operator.
---
 flink-connectors/flink-connector-base/pom.xml      |   8 ++
 .../base/source/reader/SourceReaderBase.java       |   3 +
 .../source/reader/splitreader/SplitReader.java     |   5 +
 .../base/source/reader/SourceReaderBaseTest.java   | 130 +++++++++++++++++++++
 .../connector/kafka/source/KafkaSourceITCase.java  |  64 ++++++++++
 .../streaming/api/operators/SourceOperator.java    |  48 ++++++--
 .../source/SourceOperatorEventTimeTest.java        | 121 ++++++++-----------
 .../operators/source/TestingSourceOperator.java    |  58 +++++++++
 8 files changed, 352 insertions(+), 85 deletions(-)

diff --git a/flink-connectors/flink-connector-base/pom.xml 
b/flink-connectors/flink-connector-base/pom.xml
index 1af4fc8..4f8ff0e 100644
--- a/flink-connectors/flink-connector-base/pom.xml
+++ b/flink-connectors/flink-connector-base/pom.xml
@@ -72,6 +72,14 @@
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <!-- ArchUit test dependencies -->
 
                <dependency>
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 416dbb2..18d49f3 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -324,6 +324,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
 
         SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
             if (sourceOutput == null) {
+                // The split output should have been created when 
AddSplitsEvent was processed in
+                // SourceOperator. Here we just use this method to get the 
previously created
+                // output.
                 sourceOutput = mainOutput.createOutputForSplit(splitId);
             }
             return sourceOutput;
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 4f2ff6a..550cb95 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -50,6 +50,11 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
     /**
      * Handle the split changes. This call should be non-blocking.
      *
+     * <p>For the consistency of internal state in SourceReaderBase, if an 
invalid split is added to
+     * the reader (for example splits without any records), it should be put 
back into {@link
+     * RecordsWithSplitIds} as finished splits so that SourceReaderBase could 
be able to clean up
+     * resources created for it.
+     *
      * @param splitsChanges the split changes that the SplitReader needs to 
handle.
      */
     void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 4a5544d..d17e1cf 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
@@ -37,9 +41,22 @@ import 
org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -48,12 +65,15 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** A unit test class for {@link SourceReaderBase}. */
 public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SourceReaderBaseTest.class);
+
     @Test
     void testExceptionInSplitReader() {
         assertThatThrownBy(
@@ -239,6 +259,83 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                 .isEqualTo(InputStatus.MORE_AVAILABLE);
     }
 
+    @ParameterizedTest(name = "Emit record before split addition: {0}")
+    @ValueSource(booleans = {true, false})
+    void testPerSplitWatermark(boolean emitRecordBeforeSplitAddition) throws 
Exception {
+        MockSplitReader mockSplitReader =
+                MockSplitReader.newBuilder()
+                        .setNumRecordsPerSplitPerFetch(3)
+                        .setBlockingFetch(true)
+                        .build();
+
+        MockSourceReader reader =
+                new MockSourceReader(
+                        new FutureCompletingBlockingQueue<>(),
+                        () -> mockSplitReader,
+                        new Configuration(),
+                        new TestingReaderContext());
+
+        SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(
+                        reader,
+                        WatermarkStrategy.forGenerator(
+                                (context) -> new OnEventWatermarkGenerator()),
+                        true);
+
+        MockSourceSplit splitA = new MockSourceSplit(0, 0, 3);
+        splitA.addRecord(100);
+        splitA.addRecord(200);
+        splitA.addRecord(300);
+
+        MockSourceSplit splitB = new MockSourceSplit(1, 0, 3);
+        splitB.addRecord(150);
+        splitB.addRecord(250);
+        splitB.addRecord(350);
+
+        WatermarkCollectingDataOutput output = new 
WatermarkCollectingDataOutput();
+
+        if (emitRecordBeforeSplitAddition) {
+            sourceOperator.emitNext(output);
+        }
+
+        AddSplitEvent<MockSourceSplit> addSplitsEvent =
+                new AddSplitEvent<>(Arrays.asList(splitA, splitB), new 
MockSourceSplitSerializer());
+        sourceOperator.handleOperatorEvent(addSplitsEvent);
+
+        // First 3 records from split A should not generate any watermarks
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        sourceOperator.emitNext(output);
+                    } catch (Exception e) {
+                        LOG.warn("Exception caught at emitting records", e);
+                        return false;
+                    }
+                    return output.numRecords == 3;
+                },
+                Duration.ofSeconds(10),
+                String.format(
+                        "%d out of 3 records are received within timeout", 
output.numRecords));
+        assertThat(output.watermarks).isEmpty();
+
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        sourceOperator.emitNext(output);
+                    } catch (Exception e) {
+                        LOG.warn("Exception caught at emitting records", e);
+                        return false;
+                    }
+                    return output.numRecords == 6;
+                },
+                Duration.ofSeconds(10),
+                String.format(
+                        "%d out of 6 records are received within timeout", 
output.numRecords));
+
+        assertThat(output.watermarks).hasSize(3);
+        assertThat(output.watermarks).containsExactly(150L, 250L, 300L);
+    }
+
     // ---------------- helper methods -----------------
 
     @Override
@@ -379,4 +476,37 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
             }
         }
     }
+
+    private static class OnEventWatermarkGenerator implements 
WatermarkGenerator<Integer> {
+
+        @Override
+        public void onEvent(Integer event, long eventTimestamp, 
WatermarkOutput output) {
+            output.emitWatermark(new 
org.apache.flink.api.common.eventtime.Watermark(event));
+        }
+
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {}
+    }
+
+    private static class WatermarkCollectingDataOutput
+            implements PushingAsyncDataInput.DataOutput<Integer> {
+        int numRecords = 0;
+        final List<Long> watermarks = new ArrayList<>();
+
+        @Override
+        public void emitRecord(StreamRecord<Integer> streamRecord) {
+            numRecords++;
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) throws Exception {
+            watermarks.add(watermark.getTimestamp());
+        }
+
+        @Override
+        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+        @Override
+        public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 71108d2..7269096 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -20,6 +20,9 @@ package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,6 +41,7 @@ import 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -70,11 +74,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unite test class for {@link KafkaSource}. */
@@ -256,6 +262,53 @@ public class KafkaSourceITCase {
                             "testBasicReadWithoutGroupId");
             executeAndVerify(env, stream);
         }
+
+        @Test
+        public void testPerPartitionWatermark() throws Throwable {
+            String watermarkTopic = "watermarkTestTopic-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(watermarkTopic, 2, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    Arrays.asList(
+                            new ProducerRecord<>(watermarkTopic, 0, 100L, 
null, 100),
+                            new ProducerRecord<>(watermarkTopic, 0, 200L, 
null, 200),
+                            new ProducerRecord<>(watermarkTopic, 0, 300L, 
null, 300),
+                            new ProducerRecord<>(watermarkTopic, 1, 150L, 
null, 150),
+                            new ProducerRecord<>(watermarkTopic, 1, 250L, 
null, 250),
+                            new ProducerRecord<>(watermarkTopic, 1, 350L, 
null, 350));
+            KafkaSourceTestEnv.produceToKafka(records);
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(watermarkTopic)
+                            .setGroupId("watermark-test")
+                            .setDeserializer(new 
TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            env.fromSource(
+                            source,
+                            WatermarkStrategy.forGenerator(
+                                    (context) -> new 
OnEventWatermarkGenerator()),
+                            "testPerPartitionWatermark")
+                    .process(
+                            new ProcessFunction<PartitionAndValue, Long>() {
+                                @Override
+                                public void processElement(
+                                        PartitionAndValue value,
+                                        ProcessFunction<PartitionAndValue, 
Long>.Context ctx,
+                                        Collector<Long> out) {
+                                    assertThat(ctx.timestamp())
+                                            .as(
+                                                    "Event time should never 
behind watermark "
+                                                            + "because of 
per-split watermark multiplexing logic")
+                                            .isGreaterThanOrEqualTo(
+                                                    
ctx.timerService().currentWatermark());
+                                }
+                            });
+            env.execute();
+        }
     }
 
     /** Integration test based on connector testing framework. */
@@ -400,4 +453,15 @@ public class KafkaSourceITCase {
                     }
                 });
     }
+
+    private static class OnEventWatermarkGenerator
+            implements WatermarkGenerator<PartitionAndValue> {
+        @Override
+        public void onEvent(PartitionAndValue event, long eventTimestamp, 
WatermarkOutput output) {
+            output.emitWatermark(new Watermark(eventTimestamp));
+        }
+
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {}
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 5660bc1..53621bd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -67,6 +67,7 @@ import org.apache.flink.util.function.FunctionWithException;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -154,6 +155,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private final SourceOperatorAvailabilityHelper availabilityHelper =
             new SourceOperatorAvailabilityHelper();
 
+    private final List<SplitT> outputPendingSplits = new ArrayList<>();
+
     private enum OperatingMode {
         READING,
         WAITING_FOR_ALIGNMENT,
@@ -396,11 +399,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                             watermarkAlignmentParams.getUpdateInterval(),
                             watermarkAlignmentParams.getUpdateInterval());
                 }
-                currentMainOutput =
-                        eventTimeLogic.createMainOutput(output, 
this::onWatermarkEmitted);
-                initializeLatencyMarkerEmitter(output);
-                lastInvokedOutput = output;
-                this.operatingMode = OperatingMode.READING;
+                initializeMainOutput(output);
                 return 
convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
             case SOURCE_STOPPED:
                 this.operatingMode = OperatingMode.DATA_FINISHED;
@@ -423,6 +422,15 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
     }
 
+    private void initializeMainOutput(DataOutput<OUT> output) {
+        currentMainOutput = eventTimeLogic.createMainOutput(output, 
this::onWatermarkEmitted);
+        initializeLatencyMarkerEmitter(output);
+        lastInvokedOutput = output;
+        // Create per-split output for pending splits added before main output 
is initialized
+        createOutputForSplits(outputPendingSplits);
+        this.operatingMode = OperatingMode.READING;
+    }
+
     private void initializeLatencyMarkerEmitter(DataOutput<OUT> output) {
         long latencyTrackingInterval =
                 getExecutionConfig().isLatencyTrackingConfigured()
@@ -515,11 +523,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);
             checkWatermarkAlignment();
         } else if (event instanceof AddSplitEvent) {
-            try {
-                sourceReader.addSplits(((AddSplitEvent<SplitT>) 
event).splits(splitSerializer));
-            } catch (IOException e) {
-                throw new FlinkRuntimeException("Failed to deserialize the 
splits.", e);
-            }
+            handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));
         } else if (event instanceof SourceEventWrapper) {
             sourceReader.handleSourceEvents(((SourceEventWrapper) 
event).getSourceEvent());
         } else if (event instanceof NoMoreSplitsEvent) {
@@ -529,6 +533,30 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
     }
 
+    private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
+        try {
+            List<SplitT> newSplits = event.splits(splitSerializer);
+            if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
+                // For splits arrived before the main output is initialized, 
store them into the
+                // pending list. Outputs of these splits will be created once 
the main output is
+                // ready.
+                outputPendingSplits.addAll(newSplits);
+            } else {
+                // Create output directly for new splits if the main output is 
already initialized.
+                createOutputForSplits(newSplits);
+            }
+            sourceReader.addSplits(newSplits);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Failed to deserialize the 
splits.", e);
+        }
+    }
+
+    private void createOutputForSplits(List<SplitT> newSplits) {
+        for (SplitT split : newSplits) {
+            currentMainOutput.createOutputForSplit(split.splitId());
+        }
+    }
+
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 41b1bdf..959bc82 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -18,30 +18,17 @@
 
 package org.apache.flink.streaming.api.operators.source;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.MockOutput;
-import org.apache.flink.streaming.util.MockStreamConfig;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
@@ -52,13 +39,13 @@ import org.junit.runners.Parameterized;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
@@ -159,6 +146,35 @@ public class SourceOperatorEventTimeTest {
                 result, new Watermark(100L), new Watermark(150L), new 
Watermark(200L));
     }
 
+    @Test
+    public void testCreatingPerSplitOutputOnSplitAddition() throws Exception {
+        final WatermarkStrategy<Integer> watermarkStrategy =
+                WatermarkStrategy.forGenerator((ctx) -> new 
OnEventTestWatermarkGenerator<>());
+
+        InterpretingSourceReader reader =
+                new InterpretingSourceReader(
+                        // No watermark (no record from split 2, whose 
watermark is Long.MIN_VALUE)
+                        (output) -> 
output.createOutputForSplit("1").collect(0, 100L),
+                        (output) -> 
output.createOutputForSplit("1").collect(0, 200L),
+                        (output) -> 
output.createOutputForSplit("1").collect(0, 300L),
+                        // Emit watermark 150 (from the 1st record of split 2)
+                        (output) -> 
output.createOutputForSplit("2").collect(0, 150L),
+                        // Emit watermark 300 (from the 3rd record in split 1)
+                        (output) -> 
output.createOutputForSplit("2").collect(0, 400L));
+        SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(reader, watermarkStrategy, 
emitProgressiveWatermarks);
+
+        // Add two splits to SourceOperator. Output for two splits should be 
created during event
+        // handling.
+        sourceOperator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Arrays.asList(new MockSourceSplit(1), new 
MockSourceSplit(2)),
+                        new MockSourceSplitSerializer()));
+
+        final List<Watermark> result = 
testSequenceOfWatermarks(sourceOperator);
+        assertWatermarksOrEmpty(result, new Watermark(150L), new 
Watermark(300L));
+    }
+
     // ------------------------------------------------------------------------
     //   test execution helpers
     // ------------------------------------------------------------------------
@@ -186,9 +202,18 @@ public class SourceOperatorEventTimeTest {
             final WatermarkStrategy<Integer> watermarkStrategy,
             final Consumer<ReaderOutput<Integer>>... actions)
             throws Exception {
+        final SourceReader<Integer, MockSourceSplit> reader = new 
InterpretingSourceReader(actions);
+        final SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(reader, watermarkStrategy, 
emitProgressiveWatermarks);
 
-        final List<Object> allEvents =
-                testSequenceOfEvents(emitProgressiveWatermarks, 
watermarkStrategy, actions);
+        return testSequenceOfWatermarks(sourceOperator);
+    }
+
+    @SuppressWarnings("FinalPrivateMethod")
+    private final List<Watermark> testSequenceOfWatermarks(
+            SourceOperator<Integer, MockSourceSplit> sourceOperator) throws 
Exception {
+
+        final List<Object> allEvents = testSequenceOfEvents(sourceOperator);
 
         return allEvents.stream()
                 .filter((evt) -> evt instanceof Watermark)
@@ -197,23 +222,13 @@ public class SourceOperatorEventTimeTest {
     }
 
     @SuppressWarnings("FinalPrivateMethod")
-    @SafeVarargs
     private final List<Object> testSequenceOfEvents(
-            final boolean emitProgressiveWatermarks,
-            final WatermarkStrategy<Integer> watermarkStrategy,
-            final Consumer<ReaderOutput<Integer>>... actions)
-            throws Exception {
+            final SourceOperator<Integer, MockSourceSplit> sourceOperator) 
throws Exception {
 
         final CollectingDataOutput<Integer> out = new CollectingDataOutput<>();
 
-        final TestProcessingTimeService timeService = new 
TestProcessingTimeService();
-        timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that 
is not zero
-
-        final SourceReader<Integer, MockSourceSplit> reader = new 
InterpretingSourceReader(actions);
-
-        final SourceOperator<Integer, MockSourceSplit> sourceOperator =
-                createTestOperator(
-                        reader, watermarkStrategy, timeService, 
emitProgressiveWatermarks);
+        final TestProcessingTimeService timeService =
+                ((TestProcessingTimeService) 
sourceOperator.getProcessingTimeService());
 
         while (sourceOperator.emitNext(out) != DataInputStatus.END_OF_INPUT) {
             timeService.setCurrentTime(timeService.getCurrentProcessingTime() 
+ 100);
@@ -223,50 +238,6 @@ public class SourceOperatorEventTimeTest {
     }
 
     // ------------------------------------------------------------------------
-    //   test setup helpers
-    // ------------------------------------------------------------------------
-
-    private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
-            SourceReader<T, MockSourceSplit> reader,
-            WatermarkStrategy<T> watermarkStrategy,
-            ProcessingTimeService timeService,
-            boolean emitProgressiveWatermarks)
-            throws Exception {
-
-        final OperatorStateStore operatorStateStore =
-                new MemoryStateBackend()
-                        .createOperatorStateBackend(
-                                new MockEnvironmentBuilder().build(),
-                                "test-operator",
-                                Collections.emptyList(),
-                                new CloseableRegistry());
-
-        final StateInitializationContext stateContext =
-                new StateInitializationContextImpl(null, operatorStateStore, 
null, null, null);
-
-        final SourceOperator<T, MockSourceSplit> sourceOperator =
-                new TestingSourceOperator<>(
-                        reader, watermarkStrategy, timeService, 
emitProgressiveWatermarks);
-
-        sourceOperator.setup(
-                new SourceOperatorStreamTask<Integer>(
-                        new StreamMockEnvironment(
-                                new Configuration(),
-                                new Configuration(),
-                                new ExecutionConfig(),
-                                1L,
-                                new MockInputSplitProvider(),
-                                1,
-                                new TestTaskStateManager())),
-                new MockStreamConfig(new Configuration(), 1),
-                new MockOutput<>(new ArrayList<>()));
-        sourceOperator.initializeState(stateContext);
-        sourceOperator.open();
-
-        return sourceOperator;
-    }
-
-    // ------------------------------------------------------------------------
     //   test mocks
     // ------------------------------------------------------------------------
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 5fbfd7c..d25226e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -20,18 +20,34 @@ package org.apache.flink.streaming.api.operators.source;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 
+import java.util.ArrayList;
+import java.util.Collections;
+
 /** A SourceOperator extension to simplify test setup. */
 public class TestingSourceOperator<T> extends SourceOperator<T, 
MockSourceSplit> {
 
@@ -100,4 +116,46 @@ public class TestingSourceOperator<T> extends 
SourceOperator<T, MockSourceSplit>
         cfg.setAutoWatermarkInterval(100);
         return cfg;
     }
+
+    public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+            SourceReader<T, MockSourceSplit> reader,
+            WatermarkStrategy<T> watermarkStrategy,
+            boolean emitProgressiveWatermarks)
+            throws Exception {
+
+        final OperatorStateStore operatorStateStore =
+                new HashMapStateBackend()
+                        .createOperatorStateBackend(
+                                new MockEnvironmentBuilder().build(),
+                                "test-operator",
+                                Collections.emptyList(),
+                                new CloseableRegistry());
+
+        final StateInitializationContext stateContext =
+                new StateInitializationContextImpl(null, operatorStateStore, 
null, null, null);
+
+        TestProcessingTimeService timeService = new 
TestProcessingTimeService();
+        timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that 
is not zero
+
+        final SourceOperator<T, MockSourceSplit> sourceOperator =
+                new TestingSourceOperator<>(
+                        reader, watermarkStrategy, timeService, 
emitProgressiveWatermarks);
+
+        sourceOperator.setup(
+                new SourceOperatorStreamTask<Integer>(
+                        new StreamMockEnvironment(
+                                new Configuration(),
+                                new Configuration(),
+                                new ExecutionConfig(),
+                                1L,
+                                new MockInputSplitProvider(),
+                                1,
+                                new TestTaskStateManager())),
+                new MockStreamConfig(new Configuration(), 1),
+                new MockOutput<>(new ArrayList<>()));
+        sourceOperator.initializeState(stateContext);
+        sourceOperator.open();
+
+        return sourceOperator;
+    }
 }

Reply via email to