This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 65047113eceefd377df7017b6b8d553dda9a8758 Author: Qingsheng Ren <[email protected]> AuthorDate: Fri Feb 11 18:19:10 2022 +0800 [FLINK-26018][connector/common] Create per-split output on split addition in SourceOperator This change could avoid watermark being pushed forward by records from the first split in the first fetch when multiple splits are assigned to the source operator. --- flink-connectors/flink-connector-base/pom.xml | 8 ++ .../base/source/reader/SourceReaderBase.java | 3 + .../source/reader/splitreader/SplitReader.java | 5 + .../base/source/reader/SourceReaderBaseTest.java | 140 ++++++++++++++++++++- .../connector/kafka/source/KafkaSourceITCase.java | 63 ++++++++++ .../streaming/api/operators/SourceOperator.java | 47 +++++-- .../source/SourceOperatorEventTimeTest.java | 121 +++++++----------- .../operators/source/TestingSourceOperator.java | 57 +++++++++ 8 files changed, 359 insertions(+), 85 deletions(-) diff --git a/flink-connectors/flink-connector-base/pom.xml b/flink-connectors/flink-connector-base/pom.xml index e9d8d7d..fc72977 100644 --- a/flink-connectors/flink-connector-base/pom.xml +++ b/flink-connectors/flink-connector-base/pom.xml @@ -65,5 +65,13 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> </dependencies> </project> diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 416dbb2..18d49f3 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -324,6 +324,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) { if (sourceOutput == null) { + // The split output should have been created when AddSplitsEvent was processed in + // SourceOperator. Here we just use this method to get the previously created + // output. sourceOutput = mainOutput.createOutputForSplit(splitId); } return sourceOutput; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java index 4f2ff6a..550cb95 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java @@ -50,6 +50,11 @@ public interface SplitReader<E, SplitT extends SourceSplit> { /** * Handle the split changes. This call should be non-blocking. * + * <p>For the consistency of internal state in SourceReaderBase, if an invalid split is added to + * the reader (for example splits without any records), it should be put back into {@link + * RecordsWithSplitIds} as finished splits so that SourceReaderBase could be able to clean up + * resources created for it. + * * @param splitsChanges the split changes that the SplitReader needs to handle. */ void handleSplitsChanges(SplitsChange<SplitT> splitsChanges); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 07353c2..71ce413 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -18,10 +18,14 @@ package org.apache.flink.connector.base.source.reader; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader; @@ -37,11 +41,23 @@ import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; import org.apache.flink.core.io.InputStatus; - +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.streaming.api.operators.SourceOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import org.hamcrest.MatcherAssert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,6 +66,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import static org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -58,6 +76,8 @@ import static org.junit.Assert.assertTrue; /** A unit test class for {@link SourceReaderBase}. */ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBaseTest.class); + @Rule public ExpectedException expectedException = ExpectedException.none(); @Test @@ -235,6 +255,91 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> InputStatus.MORE_AVAILABLE, sourceReader.pollNext(new TestingReaderOutput<>())); } + @Test + public void testPerSplitWatermarkWithEmitBeforeSplitAddition() throws Exception { + testPerSplitWatermark(true); + } + + @Test + public void testPerSplitWatermarkWithoutEmitBeforeSplitAddition() throws Exception { + testPerSplitWatermark(false); + } + + private void testPerSplitWatermark(boolean emitRecordBeforeSplitAddition) throws Exception { + MockSplitReader mockSplitReader = + MockSplitReader.newBuilder() + .setNumRecordsPerSplitPerFetch(3) + .setBlockingFetch(true) + .build(); + + MockSourceReader reader = + new MockSourceReader( + new FutureCompletingBlockingQueue<>(), + () -> mockSplitReader, + new Configuration(), + new TestingReaderContext()); + + SourceOperator<Integer, MockSourceSplit> sourceOperator = + createTestOperator( + reader, + WatermarkStrategy.forGenerator( + (context) -> new OnEventWatermarkGenerator()), + true); + + MockSourceSplit splitA = new MockSourceSplit(0, 0, 3); + splitA.addRecord(100); + splitA.addRecord(200); + splitA.addRecord(300); + + MockSourceSplit splitB = new MockSourceSplit(1, 0, 3); + splitB.addRecord(150); + splitB.addRecord(250); + splitB.addRecord(350); + + WatermarkCollectingDataOutput output = new WatermarkCollectingDataOutput(); + + if (emitRecordBeforeSplitAddition) { + sourceOperator.emitNext(output); + } + + AddSplitEvent<MockSourceSplit> addSplitsEvent = + new AddSplitEvent<>(Arrays.asList(splitA, splitB), new MockSourceSplitSerializer()); + sourceOperator.handleOperatorEvent(addSplitsEvent); + + // First 3 records from split A should not generate any watermarks + CommonTestUtils.waitUtil( + () -> { + try { + sourceOperator.emitNext(output); + } catch (Exception e) { + LOG.warn("Exception caught at emitting records", e); + return false; + } + return output.numRecords == 3; + }, + Duration.ofSeconds(10), + String.format( + "%d out of 3 records are received within timeout", output.numRecords)); + assertTrue(output.watermarks.isEmpty()); + + CommonTestUtils.waitUtil( + () -> { + try { + sourceOperator.emitNext(output); + } catch (Exception e) { + LOG.warn("Exception caught at emitting records", e); + return false; + } + return output.numRecords == 6; + }, + Duration.ofSeconds(10), + String.format( + "%d out of 6 records are received within timeout", output.numRecords)); + + assertEquals(3, output.watermarks.size()); + MatcherAssert.assertThat(output.watermarks, contains(150L, 250L, 300L)); + } + // ---------------- helper methods ----------------- @Override @@ -375,4 +480,37 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } } } + + private static class OnEventWatermarkGenerator implements WatermarkGenerator<Integer> { + + @Override + public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) { + output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(event)); + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) {} + } + + private static class WatermarkCollectingDataOutput + implements PushingAsyncDataInput.DataOutput<Integer> { + int numRecords = 0; + final List<Long> watermarks = new ArrayList<>(); + + @Override + public void emitRecord(StreamRecord<Integer> streamRecord) { + numRecords++; + } + + @Override + public void emitWatermark(Watermark watermark) throws Exception { + watermarks.add(watermark.getTimestamp()); + } + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {} + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) {} + } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index cf8e501..1f740b5 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -20,6 +20,9 @@ package org.apache.flink.connector.kafka.source; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -37,6 +40,7 @@ import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamMap; @@ -68,10 +72,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Unite test class for {@link KafkaSource}. */ public class KafkaSourceITCase { @@ -252,6 +258,52 @@ public class KafkaSourceITCase { "testBasicReadWithoutGroupId"); executeAndVerify(env, stream); } + + @Test + public void testPerPartitionWatermark() throws Throwable { + String watermarkTopic = "watermarkTestTopic-" + UUID.randomUUID(); + KafkaSourceTestEnv.createTestTopic(watermarkTopic, 2, 1); + List<ProducerRecord<String, Integer>> records = + Arrays.asList( + new ProducerRecord<>(watermarkTopic, 0, 100L, null, 100), + new ProducerRecord<>(watermarkTopic, 0, 200L, null, 200), + new ProducerRecord<>(watermarkTopic, 0, 300L, null, 300), + new ProducerRecord<>(watermarkTopic, 1, 150L, null, 150), + new ProducerRecord<>(watermarkTopic, 1, 250L, null, 250), + new ProducerRecord<>(watermarkTopic, 1, 350L, null, 350)); + KafkaSourceTestEnv.produceToKafka(records); + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setTopics(watermarkTopic) + .setGroupId("watermark-test") + .setDeserializer(new TestingKafkaRecordDeserializationSchema(false)) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.fromSource( + source, + WatermarkStrategy.forGenerator( + (context) -> new OnEventWatermarkGenerator()), + "testPerPartitionWatermark") + .process( + new ProcessFunction<PartitionAndValue, Long>() { + @Override + public void processElement( + PartitionAndValue value, + ProcessFunction<PartitionAndValue, Long>.Context ctx, + Collector<Long> out) { + assertTrue( + ctx.timestamp() + >= ctx.timerService().currentWatermark(), + "Event time should never behind watermark " + + "because of per-split watermark multiplexing logic"); + } + }); + env.execute(); + } } /** Integration test based on connector testing framework. */ @@ -391,4 +443,15 @@ public class KafkaSourceITCase { } }); } + + private static class OnEventWatermarkGenerator + implements WatermarkGenerator<PartitionAndValue> { + @Override + public void onEvent(PartitionAndValue event, long eventTimestamp, WatermarkOutput output) { + output.emitWatermark(new Watermark(eventTimestamp)); + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) {} + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 796defb..a89e66a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -62,6 +62,7 @@ import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -144,6 +145,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final SourceOperatorAvailabilityHelper availabilityHelper = new SourceOperatorAvailabilityHelper(); + private final List<SplitT> outputPendingSplits = new ArrayList<>(); + private enum OperatingMode { READING, OUTPUT_NOT_INITIALIZED, @@ -356,10 +359,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception { switch (operatingMode) { case OUTPUT_NOT_INITIALIZED: - currentMainOutput = eventTimeLogic.createMainOutput(output); - initializeLatencyMarkerEmitter(output); - lastInvokedOutput = output; - this.operatingMode = OperatingMode.READING; + initializeMainOutput(output); return convertToInternalStatus(sourceReader.pollNext(currentMainOutput)); case SOURCE_STOPPED: this.operatingMode = OperatingMode.DATA_FINISHED; @@ -374,6 +374,21 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } } + private void initializeMainOutput(DataOutput<OUT> output) { + currentMainOutput = eventTimeLogic.createMainOutput(output); + initializeLatencyMarkerEmitter(output); + lastInvokedOutput = output; + // Create per-split output for pending splits added before main output is initialized + createOutputForSplits(outputPendingSplits); + this.operatingMode = OperatingMode.READING; + } + + private void createOutputForSplits(List<SplitT> newSplits) { + for (SplitT split : newSplits) { + currentMainOutput.createOutputForSplit(split.splitId()); + } + } + private void initializeLatencyMarkerEmitter(DataOutput<OUT> output) { long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() @@ -454,11 +469,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @SuppressWarnings("unchecked") public void handleOperatorEvent(OperatorEvent event) { if (event instanceof AddSplitEvent) { - try { - sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits(splitSerializer)); - } catch (IOException e) { - throw new FlinkRuntimeException("Failed to deserialize the splits.", e); - } + handleAddSplitsEvent(((AddSplitEvent<SplitT>) event)); } else if (event instanceof SourceEventWrapper) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof NoMoreSplitsEvent) { @@ -468,6 +479,24 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } } + private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) { + try { + List<SplitT> newSplits = event.splits(splitSerializer); + if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) { + // For splits arrived before the main output is initialized, store them into the + // pending list. Outputs of these splits will be created once the main output is + // ready. + outputPendingSplits.addAll(newSplits); + } else { + // Create output directly for new splits if the main output is already initialized. + createOutputForSplits(newSplits); + } + sourceReader.addSplits(newSplits); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to deserialize the splits.", e); + } + } + private void registerReader() { operatorEventGateway.sendEventToCoordinator( new ReaderRegistrationEvent( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index c0a84f9..3b120a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -18,30 +18,17 @@ package org.apache.flink.streaming.api.operators.source; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateInitializationContextImpl; -import org.apache.flink.runtime.state.TestTaskStateManager; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.streaming.api.operators.SourceOperator; import org.apache.flink.streaming.runtime.io.DataInputStatus; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.util.MockOutput; -import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; @@ -52,13 +39,13 @@ import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; @@ -159,6 +146,35 @@ public class SourceOperatorEventTimeTest { result, new Watermark(100L), new Watermark(150L), new Watermark(200L)); } + @Test + public void testCreatingPerSplitOutputOnSplitAddition() throws Exception { + final WatermarkStrategy<Integer> watermarkStrategy = + WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>()); + + InterpretingSourceReader reader = + new InterpretingSourceReader( + // No watermark (no record from split 2, whose watermark is Long.MIN_VALUE) + (output) -> output.createOutputForSplit("1").collect(0, 100L), + (output) -> output.createOutputForSplit("1").collect(0, 200L), + (output) -> output.createOutputForSplit("1").collect(0, 300L), + // Emit watermark 150 (from the 1st record of split 2) + (output) -> output.createOutputForSplit("2").collect(0, 150L), + // Emit watermark 300 (from the 3rd record in split 1) + (output) -> output.createOutputForSplit("2").collect(0, 400L)); + SourceOperator<Integer, MockSourceSplit> sourceOperator = + createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks); + + // Add two splits to SourceOperator. Output for two splits should be created during event + // handling. + sourceOperator.handleOperatorEvent( + new AddSplitEvent<>( + Arrays.asList(new MockSourceSplit(1), new MockSourceSplit(2)), + new MockSourceSplitSerializer())); + + final List<Watermark> result = testSequenceOfWatermarks(sourceOperator); + assertWatermarksOrEmpty(result, new Watermark(150L), new Watermark(300L)); + } + // ------------------------------------------------------------------------ // test execution helpers // ------------------------------------------------------------------------ @@ -186,9 +202,18 @@ public class SourceOperatorEventTimeTest { final WatermarkStrategy<Integer> watermarkStrategy, final Consumer<ReaderOutput<Integer>>... actions) throws Exception { + final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions); + final SourceOperator<Integer, MockSourceSplit> sourceOperator = + createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks); - final List<Object> allEvents = - testSequenceOfEvents(emitProgressiveWatermarks, watermarkStrategy, actions); + return testSequenceOfWatermarks(sourceOperator); + } + + @SuppressWarnings("FinalPrivateMethod") + private final List<Watermark> testSequenceOfWatermarks( + SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception { + + final List<Object> allEvents = testSequenceOfEvents(sourceOperator); return allEvents.stream() .filter((evt) -> evt instanceof org.apache.flink.streaming.api.watermark.Watermark) @@ -201,23 +226,13 @@ public class SourceOperatorEventTimeTest { } @SuppressWarnings("FinalPrivateMethod") - @SafeVarargs private final List<Object> testSequenceOfEvents( - final boolean emitProgressiveWatermarks, - final WatermarkStrategy<Integer> watermarkStrategy, - final Consumer<ReaderOutput<Integer>>... actions) - throws Exception { + final SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception { final CollectingDataOutput<Integer> out = new CollectingDataOutput<>(); - final TestProcessingTimeService timeService = new TestProcessingTimeService(); - timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero - - final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions); - - final SourceOperator<Integer, MockSourceSplit> sourceOperator = - createTestOperator( - reader, watermarkStrategy, timeService, emitProgressiveWatermarks); + final TestProcessingTimeService timeService = + ((TestProcessingTimeService) sourceOperator.getProcessingTimeService()); while (sourceOperator.emitNext(out) != DataInputStatus.END_OF_INPUT) { timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100); @@ -227,50 +242,6 @@ public class SourceOperatorEventTimeTest { } // ------------------------------------------------------------------------ - // test setup helpers - // ------------------------------------------------------------------------ - - private static <T> SourceOperator<T, MockSourceSplit> createTestOperator( - SourceReader<T, MockSourceSplit> reader, - WatermarkStrategy<T> watermarkStrategy, - ProcessingTimeService timeService, - boolean emitProgressiveWatermarks) - throws Exception { - - final OperatorStateStore operatorStateStore = - new MemoryStateBackend() - .createOperatorStateBackend( - new MockEnvironmentBuilder().build(), - "test-operator", - Collections.emptyList(), - new CloseableRegistry()); - - final StateInitializationContext stateContext = - new StateInitializationContextImpl(null, operatorStateStore, null, null, null); - - final SourceOperator<T, MockSourceSplit> sourceOperator = - new TestingSourceOperator<>( - reader, watermarkStrategy, timeService, emitProgressiveWatermarks); - - sourceOperator.setup( - new SourceOperatorStreamTask<Integer>( - new StreamMockEnvironment( - new Configuration(), - new Configuration(), - new ExecutionConfig(), - 1L, - new MockInputSplitProvider(), - 1, - new TestTaskStateManager())), - new MockStreamConfig(new Configuration(), 1), - new MockOutput<>(new ArrayList<>())); - sourceOperator.initializeState(stateContext); - sourceOperator.open(); - - return sourceOperator; - } - - // ------------------------------------------------------------------------ // test mocks // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 7becd40..281b572 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -20,19 +20,34 @@ package org.apache.flink.streaming.api.operators.source; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.operators.SourceOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import java.util.ArrayList; +import java.util.Collections; + /** A SourceOperator extension to simplify test setup. */ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> { @@ -117,4 +132,46 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> cfg.setAutoWatermarkInterval(100); return cfg; } + + public static <T> SourceOperator<T, MockSourceSplit> createTestOperator( + SourceReader<T, MockSourceSplit> reader, + WatermarkStrategy<T> watermarkStrategy, + boolean emitProgressiveWatermarks) + throws Exception { + + final OperatorStateStore operatorStateStore = + new HashMapStateBackend() + .createOperatorStateBackend( + new MockEnvironmentBuilder().build(), + "test-operator", + Collections.emptyList(), + new CloseableRegistry()); + + final StateInitializationContext stateContext = + new StateInitializationContextImpl(null, operatorStateStore, null, null, null); + + TestProcessingTimeService timeService = new TestProcessingTimeService(); + timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero + + final SourceOperator<T, MockSourceSplit> sourceOperator = + new TestingSourceOperator<>( + reader, watermarkStrategy, timeService, emitProgressiveWatermarks); + + sourceOperator.setup( + new SourceOperatorStreamTask<Integer>( + new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1L, + new MockInputSplitProvider(), + 1, + new TestTaskStateManager())), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>())); + sourceOperator.initializeState(stateContext); + sourceOperator.open(); + + return sourceOperator; + } }
