This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54884a7ad4057863c2fc3fa956e2149edc904688 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Mon Nov 23 23:52:22 2020 +0800 [FLINK-20270][runtime] Add support for ExternallyInducedSource based on FLIP-27 to SourceOperatorStreamTask. --- .../source/ExternallyInducedSourceReader.java | 57 +++++++++ .../io/StreamTaskExternallyInducedSourceInput.java | 51 ++++++++ .../runtime/io/StreamTaskSourceInput.java | 2 +- .../runtime/tasks/SourceOperatorStreamTask.java | 57 ++++++++- .../tasks/SourceOperatorStreamTaskTest.java | 130 ++++++++++++++++++++- 5 files changed, 291 insertions(+), 6 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ExternallyInducedSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ExternallyInducedSourceReader.java new file mode 100644 index 0000000..96ffd92 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ExternallyInducedSourceReader.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Optional; + +/** + * Sources that implement this interface do not trigger checkpoints when receiving a + * trigger message from the checkpoint coordinator, but when their input data/events + * indicate that a checkpoint should be triggered. + * + * <p>The ExternallyInducedSourceReader tells the Flink runtime that a checkpoint needs to + * be made by returning a checkpointId when shouldTriggerCheckpoint() is invoked. + * + * <p>The implementations typically works together with the SplitEnumerator which informs + * the external system to trigger a checkpoint. + * + * @param <T> The type of records produced by the source. + * @param <SplitT> The type of splits handled by the source. + */ +@PublicEvolving +public interface ExternallyInducedSourceReader<T, SplitT extends SourceSplit> + extends SourceReader<T, SplitT> { + + /** + * A method that informs the Flink runtime whether a checkpoint should be triggered on + * this Source. + * + * <p>This method is invoked when the previous {@link #pollNext(ReaderOutput)} + * returns {@link org.apache.flink.core.io.InputStatus#NOTHING_AVAILABLE}, to check + * if the source needs to be checkpointed. + * + * <p>If a CheckpointId is returned, a checkpoint will be triggered on this source reader. + * Otherwise, Flink runtime will continue to process the records. + * + * @return An optional checkpoint ID that Flink runtime should take a checkpoint for. + */ + Optional<Long> shouldTriggerCheckpoint(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java new file mode 100644 index 0000000..03581d5 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.connector.source.ExternallyInducedSourceReader; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.streaming.api.operators.SourceOperator; + +import java.util.function.Consumer; + +/** + * A subclass of {@link StreamTaskSourceInput} for {@link ExternallyInducedSourceReader}. + */ +public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceInput<T> { + private final Consumer<Long> checkpointTriggeringHook; + private final ExternallyInducedSourceReader<T, ?> sourceReader; + + @SuppressWarnings("unchecked") + public StreamTaskExternallyInducedSourceInput( + SourceOperator<T, ?> operator, + Consumer<Long> checkpointTriggeringHook) { + super(operator); + this.checkpointTriggeringHook = checkpointTriggeringHook; + this.sourceReader = (ExternallyInducedSourceReader<T, ?>) operator.getSourceReader(); + } + + @Override + public InputStatus emitNext(DataOutput<T> output) throws Exception { + InputStatus status = super.emitNext(output); + if (status == InputStatus.NOTHING_AVAILABLE) { + sourceReader.shouldTriggerCheckpoint().ifPresent(checkpointTriggeringHook); + } + return status; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java index 0b2d065..722afa9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java @@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * unavailable or finished. */ @Internal -public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> { +public class StreamTaskSourceInput<T> implements StreamTaskInput<T> { private final SourceOperator<T, ?> operator; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index bd4fe74..d55e542 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -19,6 +19,10 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ExternallyInducedSourceReader; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.SourceOperator; @@ -26,12 +30,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.AbstractDataOutput; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTaskExternallyInducedSourceInput; import org.apache.flink.streaming.runtime.io.StreamTaskInput; import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -39,7 +47,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> { + private AsyncDataOutputToOutput<T> output; + private boolean isExternallyInducedSource; public SourceOperatorStreamTask(Environment env) throws Exception { super(env); @@ -54,7 +64,21 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, // input processors sourceOperator.initReader(); - StreamTaskInput<T> input = new StreamTaskSourceInput<>(headOperator); + final SourceReader<T, ?> sourceReader = headOperator.getSourceReader(); + final StreamTaskInput<T> input; + + if (sourceReader instanceof ExternallyInducedSourceReader) { + isExternallyInducedSource = true; + + input = new StreamTaskExternallyInducedSourceInput<>( + sourceOperator, + this::triggerCheckpointForExternallyInducedSource); + } else { + input = new StreamTaskSourceInput<>(sourceOperator); + } + + // The SourceOperatorStreamTask doesn't have any inputs, so there is no need for + // a WatermarkGauge on the input. output = new AsyncDataOutputToOutput<>( operatorChain.getChainEntryPoint(), getStreamStatusMaintainer()); @@ -66,6 +90,21 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, } @Override + public Future<Boolean> triggerCheckpointAsync( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + boolean advanceToEndOfEventTime) { + if (!isExternallyInducedSource) { + return super.triggerCheckpointAsync( + checkpointMetaData, + checkpointOptions, + advanceToEndOfEventTime); + } else { + return CompletableFuture.completedFuture(isRunning()); + } + } + + @Override protected void advanceToEndOfEventTime() { output.emitWatermark(Watermark.MAX_WATERMARK); } @@ -78,6 +117,22 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, super.afterInvoke(); } + // -------------------------- + + private void triggerCheckpointForExternallyInducedSource(long checkpointId) { + final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation( + configuration.isExactlyOnceCheckpointMode(), + configuration.isUnalignedCheckpointsEnabled()); + final long timestamp = System.currentTimeMillis(); + + final CheckpointMetaData checkpointMetaData = + new CheckpointMetaData(checkpointId, timestamp); + + super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false); + } + + // --------------------------- + /** * Implementation of {@link DataOutput} that wraps a specific {@link Output}. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 8c4dc0a..ce60719 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -22,10 +22,15 @@ import org.apache.flink.api.common.eventtime.TimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ExternallyInducedSourceReader; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -43,10 +48,13 @@ import org.apache.flink.util.SerializedValue; import org.junit.Test; +import java.io.Serializable; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -56,6 +64,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * Tests for verifying that the {@link SourceOperator} as a task input can be integrated @@ -125,6 +134,26 @@ public class SourceOperatorStreamTaskTest { } } + @Test + public void testExternallyInducedSource() throws Exception { + final int numEventsBeforeCheckpoint = 10; + final int totalNumEvents = 20; + TestingExternallyInducedSourceReader testingReader = + new TestingExternallyInducedSourceReader(numEventsBeforeCheckpoint, totalNumEvents); + try (StreamTaskMailboxTestHarness<Integer> testHarness = + createTestHarness(new TestingExternallyInducedSource(testingReader), 0, null)) { + TestingExternallyInducedSourceReader runtimeTestingReader = + (TestingExternallyInducedSourceReader) ((SourceOperator) testHarness.getStreamTask().headOperator).getSourceReader(); + + testHarness.processWhileAvailable(); + + assertEquals(totalNumEvents, runtimeTestingReader.numEmittedEvents); + assertTrue(runtimeTestingReader.checkpointed); + assertEquals(TestingExternallyInducedSourceReader.CHECKPOINT_ID, runtimeTestingReader.checkpointedId); + assertEquals(numEventsBeforeCheckpoint, runtimeTestingReader.checkpointedAt); + } + } + private TaskStateSnapshot executeAndWaitForCheckpoint( long checkpointId, TaskStateSnapshot initialSnapshot, @@ -168,7 +197,7 @@ public class SourceOperatorStreamTaskTest { // Wait until the checkpoint finishes. // We have to mark the source reader as available here, otherwise the runMailboxStep() call after - // checkpiont is completed will block. + // checkpoint is completed will block. getSourceReaderFromTask(testHarness).markAvailable(); processUntil(testHarness, checkpointFuture::isDone); waitForAcknowledgeLatch.await(); @@ -187,10 +216,20 @@ public class SourceOperatorStreamTaskTest { private StreamTaskMailboxTestHarness<Integer> createTestHarness( long checkpointId, TaskStateSnapshot snapshot) throws Exception { + return createTestHarness( + new MockSource(Boundedness.BOUNDED, 1), + checkpointId, + snapshot + ); + } + + private StreamTaskMailboxTestHarness<Integer> createTestHarness( + MockSource source, + long checkpointId, + TaskStateSnapshot snapshot) throws Exception { // get a source operator. - SourceOperatorFactory<Integer> sourceOperatorFactory = new SourceOperatorFactory<>( - new MockSource(Boundedness.BOUNDED, 1), - WatermarkStrategy.noWatermarks()); + SourceOperatorFactory<Integer> sourceOperatorFactory = + new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks()); // build a test harness. StreamTaskMailboxTestHarnessBuilder<Integer> builder = @@ -236,4 +275,87 @@ public class SourceOperatorStreamTaskTest { private MockSourceReader getSourceReaderFromTask(StreamTaskMailboxTestHarness<Integer> testHarness) { return (MockSourceReader) ((SourceOperator) testHarness.getStreamTask().headOperator).getSourceReader(); } + + // ------------- private testing classes ---------- + + private static class TestingExternallyInducedSource extends MockSource { + private static final long serialVersionUID = 3078454109555893721L; + private final TestingExternallyInducedSourceReader reader; + + private TestingExternallyInducedSource(TestingExternallyInducedSourceReader reader) { + super(Boundedness.CONTINUOUS_UNBOUNDED, 1); + this.reader = reader; + } + + @Override + public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) { + return reader; + } + } + + private static class TestingExternallyInducedSourceReader + implements ExternallyInducedSourceReader<Integer, MockSourceSplit>, Serializable { + private static final long CHECKPOINT_ID = 1234L; + private final int numEventsBeforeCheckpoint; + private final int totalNumEvents; + private int numEmittedEvents; + + private boolean checkpointed; + private int checkpointedAt; + private long checkpointedId; + + TestingExternallyInducedSourceReader(int numEventsBeforeCheckpoint, int totalNumEvents) { + this.numEventsBeforeCheckpoint = numEventsBeforeCheckpoint; + this.totalNumEvents = totalNumEvents; + this.numEmittedEvents = 0; + this.checkpointed = false; + this.checkpointedAt = -1; + } + + @Override + public Optional<Long> shouldTriggerCheckpoint() { + if (numEmittedEvents == numEventsBeforeCheckpoint && !checkpointed) { + return Optional.of(CHECKPOINT_ID); + } else { + return Optional.empty(); + } + } + + @Override + public void start() {} + + @Override + public InputStatus pollNext(ReaderOutput<Integer> output) throws Exception { + numEmittedEvents++; + if (numEmittedEvents == numEventsBeforeCheckpoint) { + return InputStatus.NOTHING_AVAILABLE; + } else if (numEmittedEvents < totalNumEvents) { + return InputStatus.MORE_AVAILABLE; + } else { + return InputStatus.END_OF_INPUT; + } + } + + @Override + public List<MockSourceSplit> snapshotState(long checkpointId) { + checkpointed = true; + checkpointedAt = numEmittedEvents; + checkpointedId = checkpointId; + return Collections.emptyList(); + } + + @Override + public CompletableFuture<Void> isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List<MockSourceSplit> splits) {} + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void close() throws Exception {} + } }
