This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54884a7ad4057863c2fc3fa956e2149edc904688
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Nov 23 23:52:22 2020 +0800

    [FLINK-20270][runtime] Add support for ExternallyInducedSource based on 
FLIP-27 to SourceOperatorStreamTask.
---
 .../source/ExternallyInducedSourceReader.java      |  57 +++++++++
 .../io/StreamTaskExternallyInducedSourceInput.java |  51 ++++++++
 .../runtime/io/StreamTaskSourceInput.java          |   2 +-
 .../runtime/tasks/SourceOperatorStreamTask.java    |  57 ++++++++-
 .../tasks/SourceOperatorStreamTaskTest.java        | 130 ++++++++++++++++++++-
 5 files changed, 291 insertions(+), 6 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ExternallyInducedSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ExternallyInducedSourceReader.java
new file mode 100644
index 0000000..96ffd92
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ExternallyInducedSourceReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Optional;
+
+/**
+ * Sources that implement this interface do not trigger checkpoints when 
receiving a
+ * trigger message from the checkpoint coordinator, but when their input 
data/events
+ * indicate that a checkpoint should be triggered.
+ *
+ * <p>The ExternallyInducedSourceReader tells the Flink runtime that a 
checkpoint needs to
+ * be made by returning a checkpointId when shouldTriggerCheckpoint() is 
invoked.
+ *
+ * <p>The implementations typically works together with the SplitEnumerator 
which informs
+ * the external system to trigger a checkpoint.
+ *
+ * @param <T>        The type of records produced by the source.
+ * @param <SplitT>   The type of splits handled by the source.
+ */
+@PublicEvolving
+public interface ExternallyInducedSourceReader<T, SplitT extends SourceSplit>
+               extends SourceReader<T, SplitT> {
+
+       /**
+        * A method that informs the Flink runtime whether a checkpoint should 
be triggered on
+        * this Source.
+        *
+        * <p>This method is invoked when the previous {@link 
#pollNext(ReaderOutput)}
+        * returns {@link 
org.apache.flink.core.io.InputStatus#NOTHING_AVAILABLE}, to check
+        * if the source needs to be checkpointed.
+        *
+        * <p>If a CheckpointId is returned, a checkpoint will be triggered on 
this source reader.
+        * Otherwise, Flink runtime will continue to process the records.
+        *
+        * @return An optional checkpoint ID that Flink runtime should take a 
checkpoint for.
+        */
+       Optional<Long> shouldTriggerCheckpoint();
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
new file mode 100644
index 0000000..03581d5
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+
+import java.util.function.Consumer;
+
+/**
+ * A subclass of {@link StreamTaskSourceInput} for {@link 
ExternallyInducedSourceReader}.
+ */
+public class StreamTaskExternallyInducedSourceInput<T> extends 
StreamTaskSourceInput<T> {
+       private final Consumer<Long> checkpointTriggeringHook;
+       private final ExternallyInducedSourceReader<T, ?> sourceReader;
+
+       @SuppressWarnings("unchecked")
+       public StreamTaskExternallyInducedSourceInput(
+                       SourceOperator<T, ?> operator,
+                       Consumer<Long> checkpointTriggeringHook) {
+               super(operator);
+               this.checkpointTriggeringHook = checkpointTriggeringHook;
+               this.sourceReader = (ExternallyInducedSourceReader<T, ?>) 
operator.getSourceReader();
+       }
+
+       @Override
+       public InputStatus emitNext(DataOutput<T> output) throws Exception {
+               InputStatus status = super.emitNext(output);
+               if (status == InputStatus.NOTHING_AVAILABLE) {
+                       
sourceReader.shouldTriggerCheckpoint().ifPresent(checkpointTriggeringHook);
+               }
+               return status;
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
index 0b2d065..722afa9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
@@ -34,7 +34,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * unavailable or finished.
  */
 @Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
 
        private final SourceOperator<T, ?> operator;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index bd4fe74..d55e542 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -19,6 +19,10 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.SourceOperator;
@@ -26,12 +30,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
 import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
+import 
org.apache.flink.streaming.runtime.io.StreamTaskExternallyInducedSourceInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -39,7 +47,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class SourceOperatorStreamTask<T> extends StreamTask<T, 
SourceOperator<T, ?>> {
+
        private AsyncDataOutputToOutput<T> output;
+       private boolean isExternallyInducedSource;
 
        public SourceOperatorStreamTask(Environment env) throws Exception {
                super(env);
@@ -54,7 +64,21 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                // input processors
                sourceOperator.initReader();
 
-               StreamTaskInput<T> input = new 
StreamTaskSourceInput<>(headOperator);
+               final SourceReader<T, ?> sourceReader = 
headOperator.getSourceReader();
+               final StreamTaskInput<T> input;
+
+               if (sourceReader instanceof ExternallyInducedSourceReader) {
+                       isExternallyInducedSource = true;
+
+                       input = new StreamTaskExternallyInducedSourceInput<>(
+                               sourceOperator,
+                               
this::triggerCheckpointForExternallyInducedSource);
+               } else {
+                       input = new StreamTaskSourceInput<>(sourceOperator);
+               }
+
+               // The SourceOperatorStreamTask doesn't have any inputs, so 
there is no need for
+               // a WatermarkGauge on the input.
                output = new AsyncDataOutputToOutput<>(
                        operatorChain.getChainEntryPoint(),
                        getStreamStatusMaintainer());
@@ -66,6 +90,21 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
        }
 
        @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData checkpointMetaData,
+                       CheckpointOptions checkpointOptions,
+                       boolean advanceToEndOfEventTime) {
+               if (!isExternallyInducedSource) {
+                       return super.triggerCheckpointAsync(
+                               checkpointMetaData,
+                               checkpointOptions,
+                               advanceToEndOfEventTime);
+               } else {
+                       return CompletableFuture.completedFuture(isRunning());
+               }
+       }
+
+       @Override
        protected void advanceToEndOfEventTime() {
                output.emitWatermark(Watermark.MAX_WATERMARK);
        }
@@ -78,6 +117,22 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                super.afterInvoke();
        }
 
+       // --------------------------
+
+       private void triggerCheckpointForExternallyInducedSource(long 
checkpointId) {
+               final CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation(
+                       configuration.isExactlyOnceCheckpointMode(),
+                       configuration.isUnalignedCheckpointsEnabled());
+               final long timestamp = System.currentTimeMillis();
+
+               final CheckpointMetaData checkpointMetaData =
+                       new CheckpointMetaData(checkpointId, timestamp);
+
+               super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions, false);
+       }
+
+       // ---------------------------
+
        /**
         * Implementation of {@link DataOutput} that wraps a specific {@link 
Output}.
         */
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 8c4dc0a..ce60719 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -22,10 +22,15 @@ import 
org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -43,10 +48,13 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
@@ -56,6 +64,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for verifying that the {@link SourceOperator} as a task input can be 
integrated
@@ -125,6 +134,26 @@ public class SourceOperatorStreamTaskTest {
                }
        }
 
+       @Test
+       public void testExternallyInducedSource() throws Exception {
+               final int numEventsBeforeCheckpoint = 10;
+               final int totalNumEvents = 20;
+               TestingExternallyInducedSourceReader testingReader =
+                       new 
TestingExternallyInducedSourceReader(numEventsBeforeCheckpoint, totalNumEvents);
+               try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                                       createTestHarness(new 
TestingExternallyInducedSource(testingReader), 0, null)) {
+                       TestingExternallyInducedSourceReader 
runtimeTestingReader =
+                               (TestingExternallyInducedSourceReader) 
((SourceOperator) testHarness.getStreamTask().headOperator).getSourceReader();
+
+                       testHarness.processWhileAvailable();
+
+                       assertEquals(totalNumEvents, 
runtimeTestingReader.numEmittedEvents);
+                       assertTrue(runtimeTestingReader.checkpointed);
+                       
assertEquals(TestingExternallyInducedSourceReader.CHECKPOINT_ID, 
runtimeTestingReader.checkpointedId);
+                       assertEquals(numEventsBeforeCheckpoint, 
runtimeTestingReader.checkpointedAt);
+               }
+       }
+
        private TaskStateSnapshot executeAndWaitForCheckpoint(
                        long checkpointId,
                        TaskStateSnapshot initialSnapshot,
@@ -168,7 +197,7 @@ public class SourceOperatorStreamTaskTest {
 
                // Wait until the checkpoint finishes.
                // We have to mark the source reader as available here, 
otherwise the runMailboxStep() call after
-               // checkpiont is completed will block.
+               // checkpoint is completed will block.
                getSourceReaderFromTask(testHarness).markAvailable();
                processUntil(testHarness, checkpointFuture::isDone);
                waitForAcknowledgeLatch.await();
@@ -187,10 +216,20 @@ public class SourceOperatorStreamTaskTest {
        private StreamTaskMailboxTestHarness<Integer> createTestHarness(
                        long checkpointId,
                        TaskStateSnapshot snapshot) throws Exception {
+               return createTestHarness(
+                       new MockSource(Boundedness.BOUNDED, 1),
+                       checkpointId,
+                       snapshot
+               );
+       }
+
+       private StreamTaskMailboxTestHarness<Integer> createTestHarness(
+                       MockSource source,
+                       long checkpointId,
+                       TaskStateSnapshot snapshot) throws Exception {
                // get a source operator.
-               SourceOperatorFactory<Integer> sourceOperatorFactory = new 
SourceOperatorFactory<>(
-                               new MockSource(Boundedness.BOUNDED, 1),
-                               WatermarkStrategy.noWatermarks());
+               SourceOperatorFactory<Integer> sourceOperatorFactory =
+                       new SourceOperatorFactory<>(source, 
WatermarkStrategy.noWatermarks());
 
                // build a test harness.
                StreamTaskMailboxTestHarnessBuilder<Integer> builder =
@@ -236,4 +275,87 @@ public class SourceOperatorStreamTaskTest {
        private MockSourceReader 
getSourceReaderFromTask(StreamTaskMailboxTestHarness<Integer> testHarness) {
                return (MockSourceReader) ((SourceOperator) 
testHarness.getStreamTask().headOperator).getSourceReader();
        }
+
+       // ------------- private testing classes ----------
+
+       private static class TestingExternallyInducedSource extends MockSource {
+               private static final long serialVersionUID = 
3078454109555893721L;
+               private final TestingExternallyInducedSourceReader reader;
+
+               private 
TestingExternallyInducedSource(TestingExternallyInducedSourceReader reader) {
+                       super(Boundedness.CONTINUOUS_UNBOUNDED, 1);
+                       this.reader = reader;
+               }
+
+               @Override
+               public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
+                       return reader;
+               }
+       }
+
+       private static class TestingExternallyInducedSourceReader
+                       implements ExternallyInducedSourceReader<Integer, 
MockSourceSplit>, Serializable {
+               private static final long CHECKPOINT_ID = 1234L;
+               private final int numEventsBeforeCheckpoint;
+               private final int totalNumEvents;
+               private int numEmittedEvents;
+
+               private boolean checkpointed;
+               private int checkpointedAt;
+               private long checkpointedId;
+
+               TestingExternallyInducedSourceReader(int 
numEventsBeforeCheckpoint, int totalNumEvents) {
+                       this.numEventsBeforeCheckpoint = 
numEventsBeforeCheckpoint;
+                       this.totalNumEvents = totalNumEvents;
+                       this.numEmittedEvents = 0;
+                       this.checkpointed = false;
+                       this.checkpointedAt = -1;
+               }
+
+               @Override
+               public Optional<Long> shouldTriggerCheckpoint() {
+                       if (numEmittedEvents == numEventsBeforeCheckpoint && 
!checkpointed) {
+                               return Optional.of(CHECKPOINT_ID);
+                       } else {
+                               return Optional.empty();
+                       }
+               }
+
+               @Override
+               public void start() {}
+
+               @Override
+               public InputStatus pollNext(ReaderOutput<Integer> output) 
throws Exception {
+                       numEmittedEvents++;
+                       if (numEmittedEvents == numEventsBeforeCheckpoint) {
+                               return InputStatus.NOTHING_AVAILABLE;
+                       } else if (numEmittedEvents < totalNumEvents) {
+                               return InputStatus.MORE_AVAILABLE;
+                       } else {
+                               return InputStatus.END_OF_INPUT;
+                       }
+               }
+
+               @Override
+               public List<MockSourceSplit> snapshotState(long checkpointId) {
+                       checkpointed = true;
+                       checkpointedAt = numEmittedEvents;
+                       checkpointedId = checkpointId;
+                       return Collections.emptyList();
+               }
+
+               @Override
+               public CompletableFuture<Void> isAvailable() {
+                       return CompletableFuture.completedFuture(null);
+               }
+
+               @Override
+               public void addSplits(List<MockSourceSplit> splits) {}
+
+               @Override
+               public void notifyNoMoreSplits() {}
+
+               @Override
+               public void close() throws Exception {}
+       }
 }

Reply via email to