This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6452dc79822 [Flink Runner] Add new Source classes that are based on
FLIP-27 Source API. (#25525)
6452dc79822 is described below
commit 6452dc7982240819a763aaf9ff3efc4a01fc1d2b
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Mar 7 02:12:58 2023 +0800
[Flink Runner] Add new Source classes that are based on FLIP-27 Source API.
(#25525)
---
.../io/source/compat/FlinkSourceCompat.java | 31 ++
.../io/source/compat/SplitEnumeratorCompat.java | 27 ++
.../streaming/io/source/compat/package-info.java | 20 +
.../streaming/io/source/SourceTestCompat.java | 62 ++++
.../io/source/compat/SplitEnumeratorCompat.java | 27 ++
.../io/source/compat/FlinkSourceCompat.java | 28 ++
.../streaming/io/source/SourceTestCompat.java | 93 +++++
runners/flink/flink_runner.gradle | 2 +
.../flink/translation/utils/SerdeUtils.java | 85 +++++
.../wrappers/streaming/io/source/FlinkSource.java | 152 ++++++++
.../streaming/io/source/FlinkSourceReaderBase.java | 399 ++++++++++++++++++++
.../streaming/io/source/FlinkSourceSplit.java | 76 ++++
.../io/source/FlinkSourceSplitEnumerator.java | 181 +++++++++
.../io/source/bounded/FlinkBoundedSource.java | 67 ++++
.../source/bounded/FlinkBoundedSourceReader.java | 146 ++++++++
.../streaming/io/source/bounded/package-info.java | 21 ++
.../io/source/impulse/BeamImpulseSource.java | 107 ++++++
.../streaming/io/source/impulse/package-info.java | 21 ++
.../wrappers/streaming/io/source/package-info.java | 21 ++
.../io/source/unbounded/FlinkUnboundedSource.java | 64 ++++
.../unbounded/FlinkUnboundedSourceReader.java | 272 ++++++++++++++
.../io/source/unbounded/package-info.java | 21 ++
.../streaming/io/TestBoundedCountingSource.java | 150 ++++++++
.../wrappers/streaming/io/TestCountingSource.java | 63 +++-
.../io/source/FlinkSourceReaderTestBase.java | 403 +++++++++++++++++++++
.../io/source/FlinkSourceSplitEnumeratorTest.java | 151 ++++++++
.../wrappers/streaming/io/source/TestSource.java | 38 ++
.../bounded/FlinkBoundedSourceReaderTest.java | 146 ++++++++
.../unbounded/FlinkUnboundedSourceReaderTest.java | 318 ++++++++++++++++
29 files changed, 3186 insertions(+), 6 deletions(-)
diff --git
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
new file mode 100644
index 00000000000..2f9e69fe4f7
--- /dev/null
+++
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+
+public class FlinkSourceCompat {
+
+ public static Counter getNumRecordsInCounter(SourceReaderContext context) {
+ return ((OperatorMetricGroup) context.metricGroup())
+ .getIOMetricGroup()
+ .getNumRecordsInCounter();
+ }
+}
diff --git
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
new file mode 100644
index 00000000000..d6bed940470
--- /dev/null
+++
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
+ extends SplitEnumerator<SplitT, CheckpointT> {
+
+ CheckpointT snapshotState(long checkpointId) throws Exception;
+}
diff --git
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
new file mode 100644
index 00000000000..08bba20e576
--- /dev/null
+++
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Classes helping maintain backwards compatibility across Flink versions. */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
diff --git
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
new file mode 100644
index 00000000000..1ddc2a957b7
--- /dev/null
+++
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+public class SourceTestCompat {
+
+ /** A MetricGroup implementation which records the registered gauge. */
+ public static class TestMetricGroup
+ extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {
+ public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
+ public final Counter numRecordsInCounter = new SimpleCounter();
+
+ @Override
+ public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT
gauge) {
+ registeredGauge.put(name, gauge);
+ return gauge;
+ }
+
+ @Override
+ public OperatorIOMetricGroup getIOMetricGroup() {
+ return new OperatorIOMetricGroup(this) {
+ @Override
+ public Counter getNumRecordsInCounter() {
+ return numRecordsInCounter;
+ }
+ };
+ }
+ }
+
+ public interface ReaderOutputCompat<T> extends ReaderOutput<T> {
+ void markActive();
+ }
+
+ public interface SourceOutputCompat<T> extends SourceOutput<T> {
+ void markActive();
+ }
+}
diff --git
a/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
b/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
new file mode 100644
index 00000000000..06fdd781fc5
--- /dev/null
+++
b/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
+ extends SplitEnumerator<SplitT, CheckpointT> {
+
+ CheckpointT snapshotState() throws Exception;
+}
diff --git
a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
new file mode 100644
index 00000000000..f68ae75d38e
--- /dev/null
+++
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+
+public class FlinkSourceCompat {
+
+ public static Counter getNumRecordsInCounter(SourceReaderContext context) {
+ return context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+ }
+}
diff --git
a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
new file mode 100644
index 00000000000..62b16eedca0
--- /dev/null
+++
b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+public class SourceTestCompat {
+
+ /** A MetricGroup implementation which records the registered gauge. */
+ public static class TestMetricGroup extends UnregisteredMetricsGroup
+ implements SourceReaderMetricGroup {
+ public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
+ public final Counter numRecordsInCounter = new SimpleCounter();
+
+ @Override
+ public OperatorIOMetricGroup getIOMetricGroup() {
+ return new UnregisteredOperatorIOMetricGroup() {
+ @Override
+ public Counter getNumRecordsInCounter() {
+ return numRecordsInCounter;
+ }
+ };
+ }
+
+ @Override
+ public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT
gauge) {
+ registeredGauge.put(name, gauge);
+ return gauge;
+ }
+
+ @Override
+ public Counter getNumRecordsInErrorsCounter() {
+ return new SimpleCounter();
+ }
+
+ @Override
+ public void setPendingBytesGauge(Gauge<Long> pendingBytesGauge) {}
+
+ @Override
+ public void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge) {}
+ }
+
+ private static class UnregisteredOperatorIOMetricGroup extends
UnregisteredMetricsGroup
+ implements OperatorIOMetricGroup {
+ @Override
+ public Counter getNumRecordsInCounter() {
+ return new SimpleCounter();
+ }
+
+ @Override
+ public Counter getNumRecordsOutCounter() {
+ return new SimpleCounter();
+ }
+
+ @Override
+ public Counter getNumBytesInCounter() {
+ return new SimpleCounter();
+ }
+
+ @Override
+ public Counter getNumBytesOutCounter() {
+ return new SimpleCounter();
+ }
+ }
+
+ public interface ReaderOutputCompat<T> extends ReaderOutput<T> {}
+
+ public interface SourceOutputCompat<T> extends SourceOutput<T> {}
+}
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index ccd4f75d3b7..cdfe921c1b2 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -179,6 +179,7 @@ dependencies {
if (flink_version.compareTo("1.14") >= 0) {
implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-optimizer:$flink_version"
+ implementation "org.apache.flink:flink-metrics-core:$flink_version"
testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
testImplementation "org.apache.flink:flink-rpc-akka:$flink_version"
} else {
@@ -197,6 +198,7 @@ dependencies {
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.jackson_dataformat_yaml
testImplementation "org.apache.flink:flink-core:$flink_version:tests"
+ testImplementation
"org.apache.flink:flink-connector-test-utils:$flink_version"
testImplementation project(":sdks:java:harness")
testRuntimeOnly library.java.slf4j_simple
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java
new file mode 100644
index 00000000000..c502faeb7a6
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/** Util methods to help with serialization / deserialization. */
+public class SerdeUtils {
+
+ // Private constructor for a util class.
+ private SerdeUtils() {}
+
+ public static @Nonnull byte[] serializeObject(@Nullable Object obj) throws
IOException {
+ if (obj == null) {
+ return new byte[0];
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ oos.close();
+ return baos.toByteArray();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static @Nullable Object deserializeObject(byte[] serialized) throws
IOException {
+ if (serialized == null || serialized.length == 0) {
+ return null;
+ }
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static <T> SimpleVersionedSerializer<T> getNaiveObjectSerializer() {
+ return new SimpleVersionedSerializer<T>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(T obj) throws IOException {
+ return serializeObject(obj);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T deserialize(int version, byte[] serialized) throws IOException {
+ if (version > getVersion()) {
+ throw new IOException(
+ String.format(
+ "Received serialized object of version %d, which is higher
than "
+ + "the highest supported version %d.",
+ version, getVersion()));
+ }
+ return (T) deserializeObject(serialized);
+ }
+ };
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
new file mode 100644
index 00000000000..c001b263340
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse.BeamImpulseSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The base class for {@link FlinkBoundedSource} and {@link
FlinkUnboundedSource}.
+ *
+ * @param <T> The data type of the records emitted by the raw Beam sources.
+ * @param <OutputT> The data type of the records emitted by the Flink Source.
+ */
+public abstract class FlinkSource<T, OutputT>
+ implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>> {
+ protected final org.apache.beam.sdk.io.Source<T> beamSource;
+ protected final Boundedness boundedness;
+ protected final SerializablePipelineOptions serializablePipelineOptions;
+
+ private final int numSplits;
+
+ // ----------------- public static methods to construct sources
--------------------
+
+ public static <T> FlinkBoundedSource<T> bounded(
+ BoundedSource<T> boundedSource,
+ SerializablePipelineOptions serializablePipelineOptions,
+ int numSplits) {
+ return new FlinkBoundedSource<>(
+ boundedSource, serializablePipelineOptions, Boundedness.BOUNDED,
numSplits);
+ }
+
+ public static <T> FlinkUnboundedSource<T> unbounded(
+ UnboundedSource<T, ?> source,
+ SerializablePipelineOptions serializablePipelineOptions,
+ int numSplits) {
+ return new FlinkUnboundedSource<>(source, serializablePipelineOptions,
numSplits);
+ }
+
+ public static FlinkBoundedSource<byte[]> unboundedImpulse(long
shutdownSourceAfterIdleMs) {
+ FlinkPipelineOptions flinkPipelineOptions =
FlinkPipelineOptions.defaults();
+
flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
+ // Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but
overriding its
+ // boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will
treat this
+ // source as an unbounded source and execute the job in streaming mode.
This also
+ // works well with checkpoint, because the FlinkSourceSplit containing the
+ // BeamImpulseSource will be discarded after the impulse emission. So the
streaming
+ // job won't see another impulse after failover.
+ return new FlinkBoundedSource<>(
+ new BeamImpulseSource(),
+ new SerializablePipelineOptions(flinkPipelineOptions),
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ 1,
+ record -> Watermark.MAX_WATERMARK.getTimestamp());
+ }
+
+ public static FlinkBoundedSource<byte[]> boundedImpulse() {
+ return new FlinkBoundedSource<>(
+ new BeamImpulseSource(),
+ new SerializablePipelineOptions(FlinkPipelineOptions.defaults()),
+ Boundedness.BOUNDED,
+ 1,
+ record -> Watermark.MAX_WATERMARK.getTimestamp());
+ }
+
+ // ------ Common implementations for both bounded and unbounded source
---------
+
+ protected FlinkSource(
+ org.apache.beam.sdk.io.Source<T> beamSource,
+ SerializablePipelineOptions serializablePipelineOptions,
+ Boundedness boundedness,
+ int numSplits) {
+ this.beamSource = beamSource;
+ this.serializablePipelineOptions = serializablePipelineOptions;
+ this.boundedness = boundedness;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness;
+ }
+
+ @Override
+ public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>>
+ createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>>
enumContext) throws Exception {
+ return new FlinkSourceSplitEnumerator<>(
+ enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
+ }
+
+ @Override
+ public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>>
+ restoreEnumerator(
+ SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,
+ Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)
+ throws Exception {
+ FlinkSourceSplitEnumerator<T> enumerator =
+ new FlinkSourceSplitEnumerator<>(
+ enumContext, beamSource, serializablePipelineOptions.get(),
numSplits);
+ checkpoint.forEach(
+ (subtaskId, splitsForSubtask) ->
enumerator.addSplitsBack(splitsForSubtask, subtaskId));
+ return enumerator;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<FlinkSourceSplit<T>> getSplitSerializer() {
+ return FlinkSourceSplit.serializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Map<Integer, List<FlinkSourceSplit<T>>>>
+ getEnumeratorCheckpointSerializer() {
+ return SerdeUtils.getNaiveObjectSerializer();
+ }
+
+ public int getNumSplits() {
+ return numSplits;
+ }
+
+ @FunctionalInterface
+ public interface TimestampExtractor<T> extends Function<T, Long>,
Serializable {}
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
new file mode 100644
index 00000000000..27b84910ac2
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates
{@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ * <li>Idle timeout support.
+ * <li>Splits addition handling.
+ * <li>Split reader creation and management.
+ * <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link
BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link
Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+ implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+ protected static final CompletableFuture<Void> AVAILABLE_NOW =
+ CompletableFuture.completedFuture(null);
+ // Some dummy instances to make the annotation checker happy with
AtomicReference.
+ protected static final CompletableFuture<Void> DUMMY_FUTURE = new
CompletableFuture<>();
+ protected static final Exception NO_EXCEPTION = new Exception();
+
+ protected final PipelineOptions pipelineOptions;
+ protected final @Nullable Function<OutputT, Long> timestampExtractor;
+ private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+ // This needs to be a ConcurrentHashMap because the metric retrieving thread
may access it.
+ private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+ protected final SourceReaderContext context;
+ private final ScheduledExecutorService executor;
+
+ protected final Counter numRecordsInCounter;
+ protected final long idleTimeoutMs;
+ private final CompletableFuture<Void> idleTimeoutFuture;
+ private final AtomicReference<Throwable> exception;
+ private boolean idleTimeoutCountingDown;
+ private CompletableFuture<Void> waitingForSplitChangeFuture;
+ private boolean noMoreSplits;
+
+ protected FlinkSourceReaderBase(
+ SourceReaderContext context,
+ PipelineOptions pipelineOptions,
+ @Nullable Function<OutputT, Long> timestampExtractor) {
+ this(
+ Executors.newSingleThreadScheduledExecutor(
+ r -> new Thread(r, "FlinkSource-Executor-Thread-" +
context.getIndexOfSubtask())),
+ context,
+ pipelineOptions,
+ timestampExtractor);
+ }
+
+ protected FlinkSourceReaderBase(
+ ScheduledExecutorService executor,
+ SourceReaderContext context,
+ PipelineOptions pipelineOptions,
+ @Nullable Function<OutputT, Long> timestampExtractor) {
+ this.context = context;
+ this.pipelineOptions = pipelineOptions;
+ this.timestampExtractor = timestampExtractor;
+ this.beamSourceReaders = new ConcurrentHashMap<>();
+ this.exception = new AtomicReference<>(NO_EXCEPTION);
+ this.executor = executor;
+ this.idleTimeoutMs =
+
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+ this.idleTimeoutFuture = new CompletableFuture<>();
+ this.waitingForSplitChangeFuture = new CompletableFuture<>();
+ this.idleTimeoutCountingDown = false;
+ // TODO: Remove the casting and use SourceReaderMetricGroup after minimum
FLink version is
+ // upgraded to 1.14 and above.
+ this.numRecordsInCounter =
FlinkSourceCompat.getNumRecordsInCounter(context);
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+ checkExceptionAndMaybeThrow();
+ // Add all the source splits whose readers haven't been created.
+ List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+ // Add all the source splits being actively read.
+ beamSourceReaders.forEach(
+ (splitId, readerAndOutput) -> {
+ Source.Reader<T> reader = readerAndOutput.reader;
+ if (reader instanceof BoundedSource.BoundedReader) {
+ // Sometimes users may decide to run a bounded source in streaming
mode as "finite
+ // stream."
+ // For bounded source, the checkpoint granularity is the entire
source split.
+ // So, in case of failure, all the data from this split will be
consumed again.
+ splitsState.add(new FlinkSourceSplit<>(splitId,
reader.getCurrentSource()));
+ } else if (reader instanceof UnboundedSource.UnboundedReader) {
+ // The checkpoint for unbounded sources is fine granular.
+ byte[] checkpointState =
+
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+ splitsState.add(
+ new FlinkSourceSplit<>(splitId, reader.getCurrentSource(),
checkpointState));
+ }
+ });
+ return splitsState;
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ checkExceptionAndMaybeThrow();
+ if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+ // There are still live readers.
+ CompletableFuture<Void> aliveReaderAvailableFuture =
isAvailableForAliveReaders();
+ // Regardless of whether there is data available from the alive readers,
the
+ // main thread needs to be woken up if there is a split change. Hence, we
+ // need to combine the data available future with the split change
future.
+ if (waitingForSplitChangeFuture.isDone()) {
+ waitingForSplitChangeFuture = new CompletableFuture<>();
+ }
+ return CompletableFuture.anyOf(aliveReaderAvailableFuture,
waitingForSplitChangeFuture)
+ .thenAccept(ignored -> {});
+ } else if (noMoreSplits) {
+ // All the splits have been read, wait for idle timeout.
+ checkIdleTimeoutAndMaybeStartCountdown();
+ return idleTimeoutFuture;
+ } else {
+ // There is no live readers, waiting for new split assignments or no
more splits notification.
+ if (waitingForSplitChangeFuture.isDone()) {
+ waitingForSplitChangeFuture = new CompletableFuture<>();
+ }
+ return waitingForSplitChangeFuture;
+ }
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ checkExceptionAndMaybeThrow();
+ LOG.info("Received NoMoreSplits signal from enumerator.");
+ noMoreSplits = true;
+ waitingForSplitChangeFuture.complete(null);
+ }
+
+ @Override
+ public void addSplits(List<FlinkSourceSplit<T>> splits) {
+ checkExceptionAndMaybeThrow();
+ LOG.info("Adding splits {}", splits);
+ sourceSplits.addAll(splits);
+ waitingForSplitChangeFuture.complete(null);
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+ readerAndOutput.reader.close();
+ }
+ executor.shutdown();
+ }
+
+ // ----------------- protected abstract methods ----------------------
+
+ /**
+ * This method needs to be overridden by subclasses to determine if data is
available when there
+ * are alive readers. For example, an unbounded source may not have any
source split ready for
+ * data emission even if all the sources are still alive. Whereas for the
bounded source, data is
+ * always available as long as there are alive readers.
+ */
+ protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+ // ----------------- protected helper methods for subclasses
--------------------
+
+ protected Optional<ReaderAndOutput> createAndTrackNextReader() throws
IOException {
+ FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+ if (sourceSplit != null) {
+ Source.Reader<T> reader = createReader(sourceSplit);
+ ReaderAndOutput readerAndOutput = new
ReaderAndOutput(sourceSplit.splitId(), reader, false);
+ beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+ return Optional.of(readerAndOutput);
+ }
+ return Optional.empty();
+ }
+
+ protected void finishSplit(int splitIndex) throws IOException {
+ ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+ if (readerAndOutput != null) {
+ LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+ readerAndOutput.reader.close();
+ } else {
+ throw new IllegalStateException(
+ "SourceReader for split " + splitIndex + " should never be null!");
+ }
+ }
+
+ protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+ if (idleTimeoutMs <= 0) {
+ idleTimeoutFuture.complete(null);
+ } else if (!idleTimeoutCountingDown) {
+ scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+ idleTimeoutCountingDown = true;
+ }
+ return idleTimeoutFuture.isDone();
+ }
+
+ protected boolean noMoreSplits() {
+ return noMoreSplits;
+ }
+
+ protected void scheduleTask(Runnable runnable, long delayMs) {
+ ignoreReturnValue(
+ executor.schedule(new ErrorRecordingRunnable(runnable), delayMs,
TimeUnit.MILLISECONDS));
+ }
+
+ protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long
periodMs) {
+ ignoreReturnValue(
+ executor.scheduleAtFixedRate(
+ new ErrorRecordingRunnable(runnable), delayMs, periodMs,
TimeUnit.MILLISECONDS));
+ }
+
+ protected void execute(Runnable runnable) {
+ executor.execute(new ErrorRecordingRunnable(runnable));
+ }
+
+ protected void recordException(Throwable e) {
+ if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+ exception.get().addSuppressed(e);
+ }
+ }
+
+ protected void checkExceptionAndMaybeThrow() {
+ if (exception.get() != NO_EXCEPTION) {
+ throw new RuntimeException("The source reader received exception.",
exception.get());
+ }
+ }
+
+ protected boolean hasException() {
+ return exception.get() != NO_EXCEPTION;
+ }
+
+ protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+ return Collections.unmodifiableCollection(sourceSplits);
+ }
+
+ protected Map<Integer, ReaderAndOutput> allReaders() {
+ return Collections.unmodifiableMap(beamSourceReaders);
+ }
+
+ protected static void ignoreReturnValue(Object o) {
+ // do nothing.
+ }
+ // ------------------------------ private methods
------------------------------
+
+ @SuppressWarnings("unchecked")
+ private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+ byte[]
getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
+ UnboundedSource<OutputT, CheckpointMarkT> source =
+ (UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
+ CheckpointMarkT checkpointMark = (CheckpointMarkT)
reader.getCheckpointMark();
+ Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ coder.encode(checkpointMark, baos);
+ return baos.toByteArray();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
+ }
+ }
+
+ private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T>
sourceSplit)
+ throws IOException {
+ Source<T> beamSource = sourceSplit.getBeamSplitSource();
+ if (beamSource instanceof BoundedSource) {
+ return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
+ } else if (beamSource instanceof UnboundedSource) {
+ return createUnboundedSourceReader(beamSource,
sourceSplit.getSplitState());
+ } else {
+ throw new IllegalStateException("Unknown source type " +
beamSource.getClass());
+ }
+ }
+
+ private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+ Source.Reader<T> createUnboundedSourceReader(
+ Source<T> beamSource, @Nullable byte[] splitState) throws
IOException {
+ UnboundedSource<T, CheckpointMarkT> unboundedSource =
+ (UnboundedSource<T, CheckpointMarkT>) beamSource;
+ Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
+ if (splitState == null) {
+ return unboundedSource.createReader(pipelineOptions, null);
+ } else {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) {
+ return unboundedSource.createReader(pipelineOptions,
coder.decode(bais));
+ }
+ }
+ }
+
+ // -------------------- protected helper class ---------------------
+
+ /** A wrapper for the reader and its associated information. */
+ protected final class ReaderAndOutput {
+ public final String splitId;
+ public final Source.Reader<T> reader;
+ private boolean started;
+ private @Nullable SourceOutput<OutputT> outputForSplit;
+
+ public ReaderAndOutput(String splitId, Source.Reader<T> reader, boolean
started) {
+ this.splitId = splitId;
+ this.reader = reader;
+ this.started = started;
+ this.outputForSplit = null;
+ }
+
+ public SourceOutput<OutputT>
getAndMaybeCreateSplitOutput(ReaderOutput<OutputT> output) {
+ if (outputForSplit == null) {
+ outputForSplit = output.createOutputForSplit(splitId);
+ }
+ return outputForSplit;
+ }
+
+ public boolean startOrAdvance() throws IOException {
+ if (started) {
+ return reader.advance();
+ } else {
+ started = true;
+ return reader.start();
+ }
+ }
+
+ public @Nullable SourceOutput<OutputT> sourceOutput() {
+ return outputForSplit;
+ }
+ }
+
+ private final class ErrorRecordingRunnable implements Runnable {
+ private final Runnable runnable;
+
+ ErrorRecordingRunnable(Runnable r) {
+ this.runnable = r;
+ }
+
+ @Override
+ public void run() {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ recordException(t);
+ }
+ }
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java
new file mode 100644
index 00000000000..32fcd23344d
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
+import org.apache.beam.sdk.io.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * A Flink {@link SourceSplit} implementation that encapsulates a Beam {@link
Source}. This class
+ * also serves as the holder of the checkpoint state of the Beam {@link
+ * org.apache.beam.sdk.io.Source.Reader Reader} created from the encapsulated
source. So, the Source
+ * can recreate the Reader from the checkpointed state upon failure recovery.
+ *
+ * @param <T> The output type of the encapsulated Beam {@link Source}.
+ */
+public class FlinkSourceSplit<T> implements SourceSplit, Serializable {
+ // The index of the split.
+ private final int splitIndex;
+ private final Source<T> beamSplitSource;
+ private final @Nullable byte[] splitState;
+
+ public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource) {
+ this(splitIndex, beamSplitSource, null);
+ }
+
+ public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource, @Nullable
byte[] splitState) {
+ this.splitIndex = splitIndex;
+ this.beamSplitSource = beamSplitSource;
+ this.splitState = splitState;
+ }
+
+ public int splitIndex() {
+ return splitIndex;
+ }
+
+ public @Nullable byte[] getSplitState() {
+ return splitState;
+ }
+
+ public Source<T> getBeamSplitSource() {
+ return beamSplitSource;
+ }
+
+ @Override
+ public String splitId() {
+ return Integer.toString(splitIndex);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[SplitIndex: %d, BeamSource: %s]", splitIndex,
beamSplitSource);
+ }
+
+ public static <T> SimpleVersionedSerializer<FlinkSourceSplit<T>>
serializer() {
+ return SerdeUtils.getNaiveObjectSerializer();
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
new file mode 100644
index 00000000000..292697479bc
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SplitEnumerator
SplitEnumerator}
+ * implementation that holds a Beam {@link Source} and does the following:
+ *
+ * <ul>
+ * <li>Split the Beam {@link Source} to desired number of splits.
+ * <li>Assign the splits to the Flink Source Reader.
+ * </ul>
+ *
+ * <p>Note that at this point, this class has a static round-robin split
assignment strategy.
+ *
+ * @param <T> The output type of the encapsulated Beam {@link Source}.
+ */
+public class FlinkSourceSplitEnumerator<T>
+ implements SplitEnumeratorCompat<FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);
+ private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;
+ private final Source<T> beamSource;
+ private final PipelineOptions pipelineOptions;
+ private final int numSplits;
+ private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;
+ private boolean splitsInitialized;
+
+ public FlinkSourceSplitEnumerator(
+ SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+ Source<T> beamSource,
+ PipelineOptions pipelineOptions,
+ int numSplits) {
+ this.context = context;
+ this.beamSource = beamSource;
+ this.pipelineOptions = pipelineOptions;
+ this.numSplits = numSplits;
+ this.pendingSplits = new HashMap<>(numSplits);
+ this.splitsInitialized = false;
+ }
+
+ @Override
+ public void start() {
+ context.callAsync(
+ () -> {
+ try {
+ List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
+ Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList =
new HashMap<>();
+ int i = 0;
+ for (Source<T> beamSplitSource : beamSplitSourceList) {
+ int targetSubtask = i % context.currentParallelism();
+ List<FlinkSourceSplit<T>> splitsForTask =
+ flinkSourceSplitsList.computeIfAbsent(
+ targetSubtask, ignored -> new ArrayList<>());
+ splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
+ i++;
+ }
+ return flinkSourceSplitsList;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ (sourceSplits, error) -> {
+ if (error != null) {
+ throw new RuntimeException("Failed to start source enumerator.",
error);
+ } else {
+ pendingSplits.putAll(sourceSplits);
+ splitsInitialized = true;
+ sendPendingSplitsToSourceReaders();
+ }
+ });
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ // Not used.
+ }
+
+ @Override
+ public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
+ LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
+ List<FlinkSourceSplit<T>> splitsForSubtask =
+ pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
+ splitsForSubtask.addAll(splits);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ List<FlinkSourceSplit<T>> splitsForSubtask =
pendingSplits.remove(subtaskId);
+ if (splitsForSubtask != null) {
+ assignSplitsAndLog(splitsForSubtask, subtaskId);
+ pendingSplits.remove(subtaskId);
+ } else {
+ if (splitsInitialized) {
+ LOG.info("There is no split for subtask {}. Signaling no more
splits.", subtaskId);
+ context.signalNoMoreSplits(subtaskId);
+ }
+ }
+ }
+
+ @Override
+ public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState(long
checkpointId) throws Exception {
+ LOG.info("Taking snapshot for checkpoint {}", checkpointId);
+ return snapshotState();
+ }
+
+ @Override
+ public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState() throws
Exception {
+ return pendingSplits;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // NoOp
+ }
+
+ // -------------- Private helper methods ----------------------
+ private List<? extends Source<T>> splitBeamSource() throws Exception {
+ if (beamSource instanceof BoundedSource) {
+ BoundedSource<T> boundedSource = (BoundedSource<T>) beamSource;
+ long desiredSizeBytes =
boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits;
+ return boundedSource.split(desiredSizeBytes, pipelineOptions);
+ } else if (beamSource instanceof UnboundedSource) {
+ return ((UnboundedSource<T, ?>) beamSource).split(numSplits,
pipelineOptions);
+ } else {
+ throw new IllegalStateException("Unknown source type " +
beamSource.getClass());
+ }
+ }
+
+ private void sendPendingSplitsToSourceReaders() {
+ Iterator<Map.Entry<Integer, List<FlinkSourceSplit<T>>>> splitIter =
+ pendingSplits.entrySet().iterator();
+ while (splitIter.hasNext()) {
+ Map.Entry<Integer, List<FlinkSourceSplit<T>>> entry = splitIter.next();
+ int readerIndex = entry.getKey();
+ int targetSubtask = readerIndex % context.currentParallelism();
+ if (context.registeredReaders().containsKey(targetSubtask)) {
+ assignSplitsAndLog(entry.getValue(), targetSubtask);
+ splitIter.remove();
+ }
+ }
+ }
+
+ private void assignSplitsAndLog(List<FlinkSourceSplit<T>> splits, int
subtaskId) {
+ context.assignSplits(new
SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
+ context.signalNoMoreSplits(subtaskId);
+ LOG.info("Assigned splits {} to subtask {}", splits, subtaskId);
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java
new file mode 100644
index 00000000000..c2bd904dcc6
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.Source Source}
implementation that wraps a
+ * Beam {@link BoundedSource BoundedSource}.
+ *
+ * <p>A {@link FlinkBoundedSource} can run in either batch or streaming mode,
depending on its
+ * {@link Boundedness} setting. For a BoundedSource running in streaming mode,
it is acting like a
+ * "finite stream".
+ *
+ * @param <T> The output type of the wrapped Beam {@link BoundedSource
BoundedSource}.
+ */
+public class FlinkBoundedSource<T> extends FlinkSource<T, WindowedValue<T>> {
+ protected final @Nullable TimestampExtractor<WindowedValue<T>>
timestampExtractor;
+
+ public FlinkBoundedSource(
+ BoundedSource<T> beamSource,
+ SerializablePipelineOptions serializablePipelineOptions,
+ Boundedness boundedness,
+ int numSplits) {
+ this(beamSource, serializablePipelineOptions, boundedness, numSplits,
null);
+ }
+
+ public FlinkBoundedSource(
+ BoundedSource<T> beamSource,
+ SerializablePipelineOptions serializablePipelineOptions,
+ Boundedness boundedness,
+ int numSplits,
+ @Nullable TimestampExtractor<WindowedValue<T>> timestampExtractor) {
+ super(beamSource, serializablePipelineOptions, boundedness, numSplits);
+ this.timestampExtractor = timestampExtractor;
+ }
+
+ @Override
+ public SourceReader<WindowedValue<T>, FlinkSourceSplit<T>> createReader(
+ SourceReaderContext readerContext) throws Exception {
+ return new FlinkBoundedSourceReader<>(
+ readerContext, serializablePipelineOptions.get(), timestampExtractor);
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java
new file mode 100644
index 00000000000..9cea73f6a4a
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader
SourceReader} implementation
+ * that reads from the assigned {@link
+ *
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link
org.apache.beam.sdk.io.BoundedSource.BoundedReader
+ * BoundedReaders}.
+ *
+ * <p>This reader consumes the source splits one by one sequentially, instead
of concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ * org.apache.beam.sdk.io.BoundedSource.BoundedReader BoundedReader.}
+ */
+public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T,
WindowedValue<T>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
+ private @Nullable Source.Reader<T> currentReader;
+ private int currentSplitId;
+
+ public FlinkBoundedSourceReader(
+ SourceReaderContext context,
+ PipelineOptions pipelineOptions,
+ @Nullable Function<WindowedValue<T>, Long> timestampExtractor) {
+ super(context, pipelineOptions, timestampExtractor);
+ currentSplitId = -1;
+ }
+
+ @VisibleForTesting
+ protected FlinkBoundedSourceReader(
+ SourceReaderContext context,
+ PipelineOptions pipelineOptions,
+ ScheduledExecutorService executor,
+ @Nullable Function<WindowedValue<T>, Long> timestampExtractor) {
+ super(executor, context, pipelineOptions, timestampExtractor);
+ currentSplitId = -1;
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws
Exception {
+ checkExceptionAndMaybeThrow();
+ if (currentReader == null && !moveToNextNonEmptyReader()) {
+ // Nothing to read for now.
+ if (noMoreSplits() && checkIdleTimeoutAndMaybeStartCountdown()) {
+ // All the source splits have been read and idle timeout has passed.
+ LOG.info(
+ "All splits have finished reading, and idle time {} ms has
passed.", idleTimeoutMs);
+ return InputStatus.END_OF_INPUT;
+ } else {
+ // This reader either hasn't received NoMoreSplitsEvent yet or it is
waiting for idle
+ // timeout.
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+ }
+ Source.Reader<T> tempCurrentReader = currentReader;
+ if (tempCurrentReader != null) {
+ T record = tempCurrentReader.getCurrent();
+ WindowedValue<T> windowedValue =
+ WindowedValue.of(
+ record,
+ tempCurrentReader.getCurrentTimestamp(),
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING);
+ if (timestampExtractor == null) {
+ output.collect(windowedValue);
+ } else {
+ output.collect(windowedValue, timestampExtractor.apply(windowedValue));
+ }
+ numRecordsInCounter.inc();
+ // If the advance() invocation throws exception here, the job will just
fail over and read
+ // everything again from
+ // the beginning. So the failover granularity is the entire Flink job.
+ if (!tempCurrentReader.advance()) {
+ finishSplit(currentSplitId);
+ currentReader = null;
+ currentSplitId = -1;
+ LOG.debug("Finished reading from {}", currentSplitId);
+ }
+ // Always return MORE_AVAILABLE here regardless of the availability of
next record. If there
+ // is no more
+ // records available in the current split, the next invocation of
pollNext() will handle that.
+ return InputStatus.MORE_AVAILABLE;
+ } else {
+ throw new IllegalArgumentException(
+ "If we reach here, the current beam reader should not be null");
+ }
+ }
+
+ @Override
+ protected CompletableFuture<Void> isAvailableForAliveReaders() {
+ // For bounded source, as long as there are active readers, the data is
available.
+ return AVAILABLE_NOW;
+ }
+
+ // ------------------------- private helper methods
--------------------------
+
+ private boolean moveToNextNonEmptyReader() throws IOException {
+ Optional<ReaderAndOutput> readerAndOutput;
+ while ((readerAndOutput = createAndTrackNextReader()).isPresent()) {
+ ReaderAndOutput rao = readerAndOutput.get();
+ if (rao.reader.start()) {
+ currentSplitId = Integer.parseInt(rao.splitId);
+ currentReader = rao.reader;
+ return true;
+ } else {
+ finishSplit(Integer.parseInt(rao.splitId));
+ }
+ }
+ return false;
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java
new file mode 100644
index 00000000000..7722241331d
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java
new file mode 100644
index 00000000000..cbf1871dfba
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+
+/** A Beam {@link BoundedSource} for Impulse Source. */
+public class BeamImpulseSource extends BoundedSource<byte[]> {
+
+ @Override
+ public List<? extends BoundedSource<byte[]>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ // Always return a single split.
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public BoundedReader<byte[]> createReader(PipelineOptions options) throws
IOException {
+ return new ImpulseReader(this);
+ }
+
+ private static class ImpulseReader extends
BoundedSource.BoundedReader<byte[]> {
+ private final BeamImpulseSource source;
+ private boolean started;
+ private int index;
+
+ private ImpulseReader(BeamImpulseSource source) {
+ this.source = source;
+ this.started = false;
+ this.index = 0;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ started = true;
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (!started) {
+ throw new IllegalStateException("start() should be called before
calling advance()");
+ }
+ index++;
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrent() throws NoSuchElementException {
+ if (!started) {
+ throw new IllegalStateException("The reader hasn't started.");
+ }
+ if (index == 0) {
+ return new byte[0];
+ } else {
+ throw new NoSuchElementException("No element is available.");
+ }
+ }
+
+ @Override
+ public BoundedSource<byte[]> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (!started) {
+ throw new IllegalStateException("The reader hasn't started.");
+ }
+ if (index == 0) {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ } else {
+ throw new NoSuchElementException("No element is available.");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java
new file mode 100644
index 00000000000..3fd38e2259d
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java
new file mode 100644
index 00000000000..448b3fef6d2
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java
new file mode 100644
index 00000000000..b4049220170
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.Source Source}
implementation that wraps a
+ * Beam {@link org.apache.beam.sdk.io.UnboundedSource UnboundedSource}.
+ *
+ * @param <T> The output type of the wrapped Beam {@link
org.apache.beam.sdk.io.UnboundedSource
+ * UnboundedSource}.
+ */
+public class FlinkUnboundedSource<T> extends FlinkSource<T,
WindowedValue<ValueWithRecordId<T>>> {
+ private final @Nullable
TimestampExtractor<WindowedValue<ValueWithRecordId<T>>>
+ timestampExtractor;
+
+ public FlinkUnboundedSource(
+ UnboundedSource<T, ?> beamSource,
+ SerializablePipelineOptions serializablePipelineOptions,
+ int numSplits) {
+ this(beamSource, serializablePipelineOptions, numSplits, null);
+ }
+
+ public FlinkUnboundedSource(
+ UnboundedSource<T, ?> beamSource,
+ SerializablePipelineOptions serializablePipelineOptions,
+ int numSplits,
+ @Nullable TimestampExtractor<WindowedValue<ValueWithRecordId<T>>>
timestampExtractor) {
+ super(beamSource, serializablePipelineOptions,
Boundedness.CONTINUOUS_UNBOUNDED, numSplits);
+ this.timestampExtractor = timestampExtractor;
+ }
+
+ @Override
+ public SourceReader<WindowedValue<ValueWithRecordId<T>>,
FlinkSourceSplit<T>> createReader(
+ SourceReaderContext readerContext) throws Exception {
+ return new FlinkUnboundedSourceReader<>(
+ readerContext, serializablePipelineOptions.get(), timestampExtractor);
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java
new file mode 100644
index 00000000000..3c596360efd
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader
SourceReader} implementation
+ * that reads from the assigned {@link
+ *
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link
org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
+ * UnboundedReaders}.
+ *
+ * <p>This reader consumes all the assigned source splits concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader UnboundedReader}.
+ */
+public class FlinkUnboundedSourceReader<T>
+ extends FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkUnboundedSourceReader.class);
+ // This name is defined in FLIP-33.
+ @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME =
"pendingBytes";
+ private static final long SLEEP_ON_IDLE_MS = 50L;
+ private final AtomicReference<CompletableFuture<Void>>
dataAvailableFutureRef;
+ private final List<ReaderAndOutput> readers;
+ private int currentReaderIndex;
+ private volatile boolean shouldEmitWatermark;
+
+ public FlinkUnboundedSourceReader(
+ SourceReaderContext context,
+ PipelineOptions pipelineOptions,
+ @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long>
timestampExtractor) {
+ super(context, pipelineOptions, timestampExtractor);
+ this.readers = new ArrayList<>();
+ this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+ this.currentReaderIndex = 0;
+ }
+
+ @VisibleForTesting
+ protected FlinkUnboundedSourceReader(
+ SourceReaderContext context,
+ PipelineOptions pipelineOptions,
+ ScheduledExecutorService executor,
+ @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long>
timestampExtractor) {
+ super(executor, context, pipelineOptions, timestampExtractor);
+ this.readers = new ArrayList<>();
+ this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+ this.currentReaderIndex = 0;
+ }
+
+ @Override
+ public void start() {
+ createPendingBytesGauge(context);
+ Long watermarkInterval =
+
pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
+ if (watermarkInterval != null) {
+ scheduleTaskAtFixedRate(
+ () -> {
+ // Set the watermark emission flag first.
+ shouldEmitWatermark = true;
+ // Wake up the main thread if necessary.
+ CompletableFuture<Void> f = dataAvailableFutureRef.get();
+ if (f != DUMMY_FUTURE) {
+ f.complete(null);
+ }
+ },
+ watermarkInterval,
+ watermarkInterval);
+ } else {
+ LOG.warn("AutoWatermarkInterval is not set, watermarks won't be
emitted.");
+ }
+ }
+
+ @Override
+ public InputStatus
pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output)
+ throws Exception {
+ checkExceptionAndMaybeThrow();
+ maybeEmitWatermark();
+ maybeCreateReaderForNewSplits();
+
+ ReaderAndOutput reader = nextReaderWithData();
+ if (reader != null) {
+ emitRecord(reader, output);
+ return InputStatus.MORE_AVAILABLE;
+ } else {
+ LOG.trace("No data available for now.");
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+ }
+
+ /**
+ * Check whether there are data available from alive readers. If not, set a
future and wait for
+ * the periodically running wake-up task to complete that future when the
check interval passes.
+ * This method is only called by the main thread, which is the only thread
writing to the future
+ * ref. Note that for UnboundedSource, because the splits never finishes,
there are always alive
+ * readers after the first split assigment. Hence, the return value of {@link
+ * FlinkSourceReaderBase#isAvailable()} will effectively be determined by
this method after the
+ * first split assignment.
+ */
+ @Override
+ protected CompletableFuture<Void> isAvailableForAliveReaders() {
+ CompletableFuture<Void> future = dataAvailableFutureRef.get();
+ if (future == DUMMY_FUTURE) {
+ CompletableFuture<Void> newFuture = new CompletableFuture<>();
+ // Need to set the future first to avoid the race condition of missing
the watermark emission
+ // notification.
+ dataAvailableFutureRef.set(newFuture);
+ if (shouldEmitWatermark || hasException()) {
+ // There are exception after we set the new future,
+ // immediately complete the future and return.
+ dataAvailableFutureRef.set(DUMMY_FUTURE);
+ newFuture.complete(null);
+ } else {
+ LOG.debug("There is no data available, scheduling the idle reader
checker.");
+ scheduleTask(
+ () -> {
+ CompletableFuture<Void> f = dataAvailableFutureRef.get();
+ if (f != DUMMY_FUTURE) {
+ f.complete(null);
+ }
+ },
+ SLEEP_ON_IDLE_MS);
+ }
+ return newFuture;
+ } else if (future.isDone()) {
+ // The previous future is completed, just use it and reset the future
ref.
+ dataAvailableFutureRef.getAndSet(DUMMY_FUTURE);
+ return future;
+ } else {
+ // The previous future has not been completed, just use it.
+ return future;
+ }
+ }
+
+ // -------------- private helper methods ----------------
+
+ private void emitRecord(
+ ReaderAndOutput readerAndOutput,
ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output) {
+ UnboundedSource.UnboundedReader<T> reader =
asUnbounded(readerAndOutput.reader);
+ T item = reader.getCurrent();
+ byte[] recordId = reader.getCurrentRecordId();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ WindowedValue<ValueWithRecordId<T>> windowedValue =
+ WindowedValue.of(
+ new ValueWithRecordId<>(item, recordId),
+ timestamp,
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING);
+ LOG.trace("Emitting record: {}", windowedValue);
+ if (timestampExtractor == null) {
+
readerAndOutput.getAndMaybeCreateSplitOutput(output).collect(windowedValue);
+ } else {
+ readerAndOutput
+ .getAndMaybeCreateSplitOutput(output)
+ .collect(windowedValue, timestampExtractor.apply(windowedValue));
+ }
+ numRecordsInCounter.inc();
+ }
+
+ private void maybeEmitWatermark() {
+ // Here we rely on the Flink source watermark multiplexer to combine the
per-split watermark.
+ // The runnable may emit more than one watermark when it runs.
+ if (shouldEmitWatermark) {
+ allReaders()
+ .values()
+ .forEach(
+ readerAndOutput -> {
+ SourceOutput<?> sourceOutput = readerAndOutput.sourceOutput();
+ if (sourceOutput != null) {
+ long watermark =
asUnbounded(readerAndOutput.reader).getWatermark().getMillis();
+ sourceOutput.emitWatermark(new Watermark(watermark));
+ }
+ });
+ shouldEmitWatermark = false;
+ }
+ }
+
+ private void maybeCreateReaderForNewSplits() throws Exception {
+ while (!sourceSplits().isEmpty()) {
+ Optional<ReaderAndOutput> readerAndOutputOpt =
createAndTrackNextReader();
+ if (readerAndOutputOpt.isPresent()) {
+ readers.add(readerAndOutputOpt.get());
+ } else {
+ // Null splitId is only possible when exception occurs, just check
exception to throw it.
+ checkExceptionAndMaybeThrow();
+ }
+ }
+ }
+
+ private @Nullable ReaderAndOutput nextReaderWithData() throws IOException {
+ int numReaders = readers.size();
+ for (int i = 0; i < numReaders; i++) {
+ ReaderAndOutput readerAndOutput = readers.get(currentReaderIndex);
+ currentReaderIndex = (currentReaderIndex + 1) % numReaders;
+ if (readerAndOutput.startOrAdvance()) {
+ return readerAndOutput;
+ }
+ }
+ return null;
+ }
+
+ private static <T> UnboundedSource.UnboundedReader<T>
asUnbounded(Source.Reader<T> reader) {
+ return (UnboundedSource.UnboundedReader<T>) reader;
+ }
+
+ private void createPendingBytesGauge(SourceReaderContext context) {
+ // TODO: Replace with
SourceReaderContest.metricGroup().setPendingBytesGauge() after Flink 1.14
+ // and above.
+ context
+ .metricGroup()
+ .gauge(
+ PENDING_BYTES_METRIC_NAME,
+ () -> {
+ long pendingBytes = -1L;
+ for (FlinkSourceReaderBase<?, ?>.ReaderAndOutput readerAndOutput
:
+ allReaders().values()) {
+ long pendingBytesForReader =
+ asUnbounded(readerAndOutput.reader).getSplitBacklogBytes();
+ if (pendingBytesForReader != -1L) {
+ pendingBytes =
+ pendingBytes == -1L
+ ? pendingBytesForReader
+ : pendingBytes + pendingBytesForReader;
+ }
+ }
+ return pendingBytes;
+ });
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java
new file mode 100644
index 00000000000..0a853b66f4d
--- /dev/null
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java
new file mode 100644
index 00000000000..68438a70120
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.TestSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+public class TestBoundedCountingSource extends BoundedSource<KV<Integer,
Integer>>
+ implements TestSource {
+ private final int totalNumRecords;
+ private final TestCountingSource source;
+ private final List<TestReader> createdReaders;
+ private int nextValueForValidating;
+ private long nextTimestampForValidating;
+
+ public TestBoundedCountingSource(int shardNum, int totalNumRecords) {
+ this.totalNumRecords = totalNumRecords;
+ this.source =
+ new
TestCountingSource(totalNumRecords).withShardNumber(shardNum).withFixedNumSplits(1);
+ this.createdReaders = new ArrayList<>();
+ this.nextValueForValidating = 0;
+ this.nextTimestampForValidating = 0;
+ }
+
+ @Override
+ public List<? extends BoundedSource<KV<Integer, Integer>>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ List<TestBoundedCountingSource> splits = new ArrayList<>();
+ int numRecordsAssigned = 0;
+ int shardNum = 0;
+ while (numRecordsAssigned < totalNumRecords) {
+ int numRecordsForSplit =
+ (int) Math.min(totalNumRecords - numRecordsAssigned,
desiredBundleSizeBytes);
+ splits.add(new TestBoundedCountingSource(shardNum, numRecordsForSplit));
+ numRecordsAssigned += numRecordsForSplit;
+ shardNum++;
+ }
+ return splits;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return totalNumRecords;
+ }
+
+ @Override
+ public BoundedReader<KV<Integer, Integer>> createReader(PipelineOptions
options)
+ throws IOException {
+ BoundedTestCountingSourceReader reader =
+ new BoundedTestCountingSourceReader(source.createReader(options,
null), this);
+ createdReaders.add(reader);
+ return reader;
+ }
+
+ @Override
+ public List<TestReader> createdReaders() {
+ return createdReaders;
+ }
+
+ @Override
+ public boolean validateNextValue(int value) {
+ boolean result = value == nextValueForValidating;
+ nextValueForValidating++;
+ return result;
+ }
+
+ @Override
+ public boolean validateNextTimestamp(long timestamp) {
+ boolean result = timestamp == nextTimestampForValidating;
+ nextTimestampForValidating++;
+ return result;
+ }
+
+ @Override
+ public boolean isConsumptionCompleted() {
+ return nextValueForValidating == totalNumRecords;
+ }
+
+ @Override
+ public boolean allTimestampsReceived() {
+ return nextTimestampForValidating == nextValueForValidating;
+ }
+
+ public static class BoundedTestCountingSourceReader
+ extends BoundedSource.BoundedReader<KV<Integer, Integer>> implements
TestReader {
+
+ private final TestCountingSource.CountingSourceReader reader;
+ private final TestBoundedCountingSource currentSource;
+ private boolean closed;
+
+ private BoundedTestCountingSourceReader(
+ TestCountingSource.CountingSourceReader reader,
TestBoundedCountingSource currentSource) {
+ this.reader = reader;
+ this.currentSource = currentSource;
+ this.closed = false;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return reader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return reader.advance();
+ }
+
+ @Override
+ public KV<Integer, Integer> getCurrent() throws NoSuchElementException {
+ return reader.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ reader.close();
+ }
+
+ @Override
+ public BoundedSource<KV<Integer, Integer>> getCurrentSource() {
+ return currentSource;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+ }
+}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
index 41932dd1e49..5c54ce4c44e 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.TestSource;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -45,7 +46,8 @@ import org.slf4j.LoggerFactory;
* where not all the data is available immediately.
*/
public class TestCountingSource
- extends UnboundedSource<KV<Integer, Integer>,
TestCountingSource.CounterMark> {
+ extends UnboundedSource<KV<Integer, Integer>,
TestCountingSource.CounterMark>
+ implements TestSource {
private static final Logger LOG =
LoggerFactory.getLogger(TestCountingSource.class);
private static List<Integer> finalizeTracker;
@@ -65,6 +67,12 @@ public class TestCountingSource
*/
private static boolean thrown = false;
+ private final List<TestReader> createdReaders;
+
+ private int nextValueForValidating;
+
+ private long nextTimestampForValidating;
+
public static void setFinalizeTracker(List<Integer> finalizeTracker) {
TestCountingSource.finalizeTracker = finalizeTracker;
}
@@ -77,7 +85,7 @@ public class TestCountingSource
return new TestCountingSource(numMessagesPerShard, shardNumber, true,
throwOnFirstSnapshot, -1);
}
- private TestCountingSource withShardNumber(int shardNumber) {
+ public TestCountingSource withShardNumber(int shardNumber) {
return new TestCountingSource(
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1);
}
@@ -107,6 +115,7 @@ public class TestCountingSource
this.dedup = dedup;
this.throwOnFirstSnapshot = throwOnFirstSnapshot;
this.fixedNumSplits = fixedNumSplits;
+ this.createdReaders = new ArrayList<>();
}
/** Halts emission of elements until {@code continueEmission} is invoked. */
@@ -129,7 +138,7 @@ public class TestCountingSource
return splits;
}
- static class CounterMark implements UnboundedSource.CheckpointMark {
+ public static class CounterMark implements UnboundedSource.CheckpointMark {
int current;
public CounterMark(int current) {
@@ -144,6 +153,35 @@ public class TestCountingSource
}
}
+ @Override
+ public List<TestReader> createdReaders() {
+ return createdReaders;
+ }
+
+ @Override
+ public boolean validateNextValue(int value) {
+ boolean result = value == nextValueForValidating;
+ nextValueForValidating++;
+ return result;
+ }
+
+ @Override
+ public boolean validateNextTimestamp(long timestamp) {
+ boolean result = timestamp == nextTimestampForValidating;
+ nextTimestampForValidating++;
+ return result;
+ }
+
+ @Override
+ public boolean isConsumptionCompleted() {
+ return nextValueForValidating == numMessagesPerShard;
+ }
+
+ @Override
+ public boolean allTimestampsReceived() {
+ return nextTimestampForValidating == nextValueForValidating;
+ }
+
@Override
public Coder<CounterMark> getCheckpointMarkCoder() {
return DelegateCoder.of(VarIntCoder.of(), new FromCounterMark(), new
ToCounterMark());
@@ -158,11 +196,14 @@ public class TestCountingSource
* Public only so that the checkpoint can be conveyed from {@link
#getCheckpointMark()} to {@link
* TestCountingSource#createReader(PipelineOptions, CounterMark)} without
cast.
*/
- public class CountingSourceReader extends UnboundedReader<KV<Integer,
Integer>> {
+ public class CountingSourceReader extends UnboundedReader<KV<Integer,
Integer>>
+ implements TestReader {
private int current;
+ private boolean closed;
public CountingSourceReader(int startingPoint) {
this.current = startingPoint;
+ this.closed = false;
}
@Override
@@ -203,7 +244,9 @@ public class TestCountingSource
}
@Override
- public void close() {}
+ public void close() {
+ closed = true;
+ }
@Override
public TestCountingSource getCurrentSource() {
@@ -238,6 +281,11 @@ public class TestCountingSource
public long getSplitBacklogBytes() {
return 7L;
}
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
}
@Override
@@ -248,7 +296,10 @@ public class TestCountingSource
} else {
LOG.debug("restoring reader from checkpoint with current = {}",
checkpointMark.current);
}
- return new CountingSourceReader(checkpointMark != null ?
checkpointMark.current : -1);
+ CountingSourceReader reader =
+ new CountingSourceReader(checkpointMark != null ?
checkpointMark.current : -1);
+ createdReaders.add(reader);
+ return reader;
}
@Override
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java
new file mode 100644
index 00000000000..dcab3aff0f5
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/** An abstract test base for {@link FlinkSourceReaderBase}. */
+public abstract class FlinkSourceReaderTestBase<OutputT> {
+
+ // -------------- test poll --------------
+ @Test(timeout = 30000L)
+ public void testPollBasic() throws Exception {
+ testPoll(5, 10);
+ }
+
+ @Test(timeout = 30000L)
+ public void testPollFromEmptySplit() throws Exception {
+ testPoll(3, 0);
+ }
+
+ @Test
+ public void testPollWithTimestampExtractor() throws Exception {
+ testPoll(5, 10, record -> getKVPairs(record).getValue().longValue());
+ }
+
+ @Test
+ public void testExceptionInExecutorThread() throws Exception {
+ try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader
= createReader()) {
+ reader.start();
+ ReaderOutput<OutputT> output = Mockito.mock(ReaderOutput.class);
+ // The first poll should not throw any exception.
+ reader.pollNext(output);
+
+ RuntimeException expectedException = new RuntimeException();
+ RuntimeException suppressedException = new RuntimeException();
+ ((FlinkSourceReaderBase) reader)
+ .execute(
+ () -> {
+ throw expectedException;
+ });
+ ((FlinkSourceReaderBase) reader)
+ .execute(
+ () -> {
+ throw suppressedException;
+ });
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ ((FlinkSourceReaderBase) reader).execute(countDownLatch::countDown);
+ countDownLatch.await();
+
+ try {
+ reader.pollNext(output);
+ fail("Should have thrown exception here.");
+ } catch (Exception e) {
+ Throwable actualException = e;
+ while (actualException != expectedException &&
actualException.getCause() != null) {
+ actualException = actualException.getCause();
+ }
+ assertEquals(expectedException, actualException);
+ assertEquals(1, actualException.getSuppressed().length);
+ assertEquals(suppressedException, actualException.getSuppressed()[0]);
+ }
+ }
+ }
+
+ private void testPoll(int numSplits, int numRecordsPerSplit) throws
Exception {
+ testPoll(numSplits, numRecordsPerSplit, null);
+ }
+
+ private void testPoll(
+ int numSplits, int numRecordsPerSplit, @Nullable Function<OutputT, Long>
timestampExtractor)
+ throws Exception {
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+ createSplits(numSplits, numRecordsPerSplit, 0);
+ try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader =
+ createReader(timestampExtractor)) {
+ pollAndValidate(reader, splits, timestampExtractor != null);
+ }
+ verifyBeamReaderClosed(splits);
+ }
+
+ // This test may fail if the subclass of FlinkSourceReaderBase overrides
+ // the isAvailable() method, which should have a good reason.
+ @Test
+ public void testIsAvailableOnNoMoreSplitsNotification() throws Exception {
+ try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader
= createReader()) {
+ reader.start();
+
+ // No splits assigned yet.
+ CompletableFuture<Void> future1 = reader.isAvailable();
+ assertFalse("No split assigned yet, should not be available.",
future1.isDone());
+
+ // Data available on split assigned.
+ reader.notifyNoMoreSplits();
+ assertTrue("Future1 should be completed upon no more splits
notification", future1.isDone());
+ assertTrue(
+ "Completed future should be returned so pollNext can be invoked to
get updated INPUT_STATUS",
+ reader.isAvailable().isDone());
+ }
+ }
+
+ // This test may fail if the subclass of FlinkSourceReaderBase overrides
+ // the isAvailable() method, which should have a good reason.
+ @Test
+ public void testIsAvailableWithIdleTimeout() throws Exception {
+ ManuallyTriggeredScheduledExecutorService executor =
+ new ManuallyTriggeredScheduledExecutorService();
+ try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader =
+ createReader(executor, 1L)) {
+ reader.start();
+
+ CompletableFuture<Void> future1 = reader.isAvailable();
+ assertFalse("Future1 should be uncompleted without live split.",
future1.isDone());
+
+ reader.notifyNoMoreSplits();
+ assertTrue("Future1 should be completed upon no more splits
notification.", future1.isDone());
+ CompletableFuture<Void> future2 = reader.isAvailable();
+ assertFalse("Future2 should be uncompleted when waiting for idle
timeout", future2.isDone());
+
+ executor.triggerScheduledTasks();
+ assertTrue("Future2 should be completed after idle timeout.",
future2.isDone());
+ assertTrue(
+ "The future should always be completed after idle timeout.",
+ reader.isAvailable().isDone());
+ }
+ }
+
+ // This test may fail if the subclass of FlinkSourceReaderBase overrides
+ // the isAvailable() method, which should have a good reason.
+ @Test
+ public void testIsAvailableWithoutIdleTimeout() throws Exception {
+ try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader
= createReader()) {
+ reader.start();
+
+ CompletableFuture<Void> future1 = reader.isAvailable();
+ assertFalse("Future1 should be uncompleted without live split.",
future1.isDone());
+
+ reader.notifyNoMoreSplits();
+ assertTrue("Future1 should be completed upon no more splits
notification.", future1.isDone());
+ assertTrue(
+ "The future should be completed without idle timeout.",
reader.isAvailable().isDone());
+ }
+ }
+
+ @Test
+ public void testNumBytesInMetrics() throws Exception {
+ final int numSplits = 2;
+ final int numRecordsPerSplit = 10;
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+ createSplits(numSplits, numRecordsPerSplit, 0);
+ SourceTestCompat.TestMetricGroup testMetricGroup = new
SourceTestCompat.TestMetricGroup();
+ try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader =
+ createReader(null, -1L, null, testMetricGroup)) {
+ pollAndValidate(reader, splits, false);
+ }
+ assertEquals(numRecordsPerSplit * numSplits,
testMetricGroup.numRecordsInCounter.getCount());
+ }
+
+ // --------------- abstract methods ---------------
+ protected abstract KV<Integer, Integer> getKVPairs(OutputT record);
+
+ protected abstract SourceReader<OutputT, FlinkSourceSplit<KV<Integer,
Integer>>> createReader(
+ ScheduledExecutorService executor,
+ long idleTimeoutMs,
+ @Nullable Function<OutputT, Long> timestampExtractor,
+ SourceTestCompat.TestMetricGroup testMetricGroup);
+
+ protected abstract Source<KV<Integer, Integer>> createBeamSource(
+ int splitIndex, int numRecordsPerSplit);
+
+ // ------------------- protected helper methods ----------------------
+ protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>>
createReader() {
+ return createReader(null, -1L, null, new
SourceTestCompat.TestMetricGroup());
+ }
+
+ protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>>
createReader(
+ Function<OutputT, Long> timestampExtractor) {
+ return createReader(null, -1L, timestampExtractor, new
SourceTestCompat.TestMetricGroup());
+ }
+
+ protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>>
createReader(
+ ScheduledExecutorService executor, long idleTimeoutMs) {
+ return createReader(executor, idleTimeoutMs, null, new
SourceTestCompat.TestMetricGroup());
+ }
+
+ protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>>
createReader(
+ ScheduledExecutorService executor,
+ long idleTimeoutMs,
+ Function<OutputT, Long> timestampExtractor) {
+ return createReader(
+ executor, idleTimeoutMs, timestampExtractor, new
SourceTestCompat.TestMetricGroup());
+ }
+
+ protected void pollAndValidate(
+ SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader,
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits,
+ boolean validateReceivedTimestamp)
+ throws Exception {
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits);
+ pollAndValidate(reader, splits, validatingOutput, Integer.MAX_VALUE);
+ if (validateReceivedTimestamp) {
+ assertTrue(validatingOutput.allTimestampReceived());
+ }
+ }
+
+ protected void pollAndValidate(
+ SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader,
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits,
+ RecordsValidatingOutput validatingOutput,
+ int numRecordsToConsume)
+ throws Exception {
+ reader.addSplits(splits);
+ reader.notifyNoMoreSplits();
+
+ do {
+ reader.pollNext(validatingOutput);
+ } while (!validatingOutput.allRecordsConsumed()
+ && validatingOutput.numCollectedRecords() < numRecordsToConsume);
+ }
+
+ protected List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits(
+ int numSplits, int numRecordsPerSplit, int startingIndex) {
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splitList = new ArrayList<>();
+ for (int i = startingIndex; i < numSplits; i++) {
+ Source<KV<Integer, Integer>> testingSource = createBeamSource(i,
numRecordsPerSplit);
+ splitList.add(new FlinkSourceSplit<>(i, testingSource));
+ }
+ return splitList;
+ }
+
+ protected void verifyBeamReaderClosed(List<FlinkSourceSplit<KV<Integer,
Integer>>> splits) {
+ splits.forEach(
+ split -> {
+ TestSource source = (TestSource) split.getBeamSplitSource();
+ assertEquals(
+ "Should have only one beam BoundedReader created", 1,
source.createdReaders().size());
+ assertTrue(
+ "The beam BoundedReader should have been closed",
+ source.createdReaders().get(0).isClosed());
+ });
+ }
+
+ protected static SourceReaderContext createSourceReaderContext(
+ SourceTestCompat.TestMetricGroup metricGroup) {
+ SourceReaderContext mockContext = Mockito.mock(SourceReaderContext.class);
+ when(mockContext.metricGroup()).thenReturn(metricGroup);
+ return mockContext;
+ }
+
+ // -------------------- protected helper class for fetch result validation
---------------------
+ protected class RecordsValidatingOutput implements
SourceTestCompat.ReaderOutputCompat<OutputT> {
+ private final List<Source<KV<Integer, Integer>>> sources;
+ private final Map<String, TestSourceOutput> sourceOutputs;
+ private int numCollectedRecords = 0;
+
+ public RecordsValidatingOutput(List<FlinkSourceSplit<KV<Integer,
Integer>>> splits) {
+ this.sources = new ArrayList<>();
+ this.sourceOutputs = new HashMap<>();
+ splits.forEach(split -> sources.add(split.getBeamSplitSource()));
+ }
+
+ @Override
+ public void collect(OutputT record) {
+ KV<Integer, Integer> kv = getKVPairs(record);
+ ((TestSource) sources.get(kv.getKey())).validateNextValue(kv.getValue());
+ numCollectedRecords++;
+ }
+
+ @Override
+ public void collect(OutputT record, long timestamp) {
+ KV<Integer, Integer> kv = getKVPairs(record);
+ TestSource testSource = ((TestSource) sources.get(kv.getKey()));
+ testSource.validateNextValue(kv.getValue());
+ testSource.validateNextTimestamp(timestamp);
+ numCollectedRecords++;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {}
+
+ @Override
+ public void markIdle() {}
+
+ @Override
+ public void markActive() {}
+
+ @Override
+ public SourceOutput<OutputT> createOutputForSplit(String splitId) {
+ return sourceOutputs.computeIfAbsent(splitId, ignored -> new
TestSourceOutput(this));
+ }
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {}
+
+ public int numCollectedRecords() {
+ return numCollectedRecords;
+ }
+
+ public boolean allRecordsConsumed() {
+ boolean allRecordsConsumed = true;
+ for (Source<?> source : sources) {
+ allRecordsConsumed = allRecordsConsumed && ((TestSource)
source).isConsumptionCompleted();
+ }
+ return allRecordsConsumed;
+ }
+
+ public boolean allTimestampReceived() {
+ boolean allTimestampReceived = true;
+ for (Source<?> source : sources) {
+ allTimestampReceived =
+ allTimestampReceived && ((TestSource)
source).allTimestampsReceived();
+ }
+ return allTimestampReceived;
+ }
+
+ public Map<String, TestSourceOutput> createdSourceOutputs() {
+ return sourceOutputs;
+ }
+ }
+
+ protected class TestSourceOutput implements
SourceTestCompat.SourceOutputCompat<OutputT> {
+ private final ReaderOutput<OutputT> output;
+ private @Nullable Watermark watermark;
+ private boolean isIdle;
+
+ private TestSourceOutput(RecordsValidatingOutput output) {
+ this.output = output;
+ this.watermark = null;
+ this.isIdle = false;
+ }
+
+ @Override
+ public void collect(OutputT record) {
+ output.collect(record);
+ }
+
+ @Override
+ public void collect(OutputT record, long timestamp) {
+ output.collect(record, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ this.watermark = watermark;
+ }
+
+ @Override
+ public void markIdle() {
+ isIdle = true;
+ }
+
+ @Override
+ public void markActive() {
+ isIdle = false;
+ }
+
+ public @Nullable Watermark watermark() {
+ return watermark;
+ }
+
+ public boolean isIdle() {
+ return isIdle;
+ }
+ }
+}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java
new file mode 100644
index 00000000000..59097bae47b
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.values.KV;
+import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.junit.Test;
+
+/** Unit tests for {@link FlinkSourceSplitEnumerator}. */
+public class FlinkSourceSplitEnumeratorTest {
+
+ @Test
+ public void testAssignSplitsWithBoundedSource() throws IOException {
+ final int numSubtasks = 2;
+ final int numSplits = 10;
+ final int totalNumRecords = 10;
+ TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>>
testContext =
+ new TestingSplitEnumeratorContext<>(numSubtasks);
+ TestBoundedCountingSource testSource =
+ new TestBoundedCountingSource(numSplits, totalNumRecords);
+
+ assignSplits(testContext, testSource, numSplits);
+ assertEquals(numSubtasks, testContext.getSplitAssignments().size());
+
+ testContext
+ .getSplitAssignments()
+ .forEach(
+ (subtaskId, state) -> {
+ int expectedNumSplitsPerSubtask = numSplits / numSubtasks;
+ assertEquals(
+ "Each subtask should have " + expectedNumSplitsPerSubtask +
" assigned splits",
+ expectedNumSplitsPerSubtask,
+ state.getAssignedSplits().size());
+ assertTrue(
+ "Each subtask should have received NoMoreSplits",
+ state.hasReceivedNoMoreSplitsSignal());
+ state
+ .getAssignedSplits()
+ .forEach(
+ split -> {
+ TestBoundedCountingSource source =
+ (TestBoundedCountingSource)
split.getBeamSplitSource();
+ try {
+ int expectedSplitSize = totalNumRecords / numSplits;
+ assertEquals(
+ expectedSplitSize,
+
source.getEstimatedSizeBytes(FlinkPipelineOptions.defaults()));
+ } catch (Exception e) {
+ fail("Received exception" + e);
+ }
+ });
+ });
+ }
+
+ @Test
+ public void testAssignSplitsWithUnboundedSource() throws IOException {
+ final int numSplits = 10;
+ final int numSubtasks = 5;
+ final int numRecordsPerSplit = 10;
+ TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>>
testContext =
+ new TestingSplitEnumeratorContext<>(numSubtasks);
+ TestCountingSource testSource = new TestCountingSource(numRecordsPerSplit);
+
+ assignSplits(testContext, testSource, numSplits);
+
+ testContext
+ .getSplitAssignments()
+ .forEach(
+ (subtaskId, state) -> {
+ int expectedNumSplitsPerSubtask = numSplits / numSubtasks;
+ assertEquals(
+ "Each subtask should have " + expectedNumSplitsPerSubtask +
" assigned splits",
+ expectedNumSplitsPerSubtask,
+ state.getAssignedSplits().size());
+ assertTrue(
+ "Each subtask should have received NoMoreSplits",
+ state.hasReceivedNoMoreSplitsSignal());
+ });
+ }
+
+ @Test
+ public void testAddSplitsBack() throws IOException {
+ final int numSubtasks = 2;
+ final int numSplits = 10;
+ final int totalNumRecords = 10;
+ TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>>
testContext =
+ new TestingSplitEnumeratorContext<>(numSubtasks);
+ TestBoundedCountingSource testSource =
+ new TestBoundedCountingSource(numSplits, totalNumRecords);
+ try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
+ new FlinkSourceSplitEnumerator<>(
+ testContext, testSource, FlinkPipelineOptions.defaults(),
numSplits)) {
+ splitEnumerator.start();
+ testContext.registerReader(0, "0");
+ splitEnumerator.addReader(0);
+ testContext.getExecutorService().triggerAll();
+
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splitsForReader =
+ testContext.getSplitAssignments().get(0).getAssignedSplits();
+ assertEquals(numSplits / numSubtasks, splitsForReader.size());
+
+ splitEnumerator.addSplitsBack(splitsForReader, 0);
+ splitEnumerator.addReader(0);
+ assertEquals(2 * numSplits / numSubtasks, splitsForReader.size());
+ }
+ }
+
+ private void assignSplits(
+ TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>>
context,
+ Source<KV<Integer, Integer>> source,
+ int numSplits)
+ throws IOException {
+ try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
+ new FlinkSourceSplitEnumerator<>(
+ context, source, FlinkPipelineOptions.defaults(), numSplits)) {
+ splitEnumerator.start();
+ // Add a reader before splitting the beam source.
+ context.registerReader(0, "0");
+ splitEnumerator.addReader(0);
+ context.getExecutorService().triggerAll();
+ context.registerReader(1, "1");
+ // Add another reader after splitting the beam source.
+ splitEnumerator.addReader(1);
+ }
+ }
+}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java
new file mode 100644
index 00000000000..5b8b734a68a
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface TestSource extends Serializable {
+
+ List<TestReader> createdReaders();
+
+ boolean validateNextValue(int value);
+
+ boolean validateNextTimestamp(long timestamp);
+
+ boolean isConsumptionCompleted();
+
+ boolean allTimestampsReceived();
+
+ interface TestReader extends Serializable {
+ boolean isClosed();
+ }
+}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java
new file mode 100644
index 00000000000..6303a729652
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/** Unite tests for {@link FlinkBoundedSourceReader}. */
+public class FlinkBoundedSourceReaderTest
+ extends FlinkSourceReaderTestBase<WindowedValue<KV<Integer, Integer>>> {
+
+ @Test
+ public void testPollWithIdleTimeout() throws Exception {
+ ManuallyTriggeredScheduledExecutorService executor =
+ new ManuallyTriggeredScheduledExecutorService();
+ ReaderOutput<WindowedValue<KV<Integer, Integer>>> mockReaderOutput =
+ Mockito.mock(ReaderOutput.class);
+ try (FlinkBoundedSourceReader<KV<Integer, Integer>> reader =
+ (FlinkBoundedSourceReader<KV<Integer, Integer>>)
createReader(executor, 1)) {
+ reader.notifyNoMoreSplits();
+ assertEquals(InputStatus.NOTHING_AVAILABLE,
reader.pollNext(mockReaderOutput));
+
+ executor.triggerScheduledTasks();
+ assertEquals(InputStatus.END_OF_INPUT,
reader.pollNext(mockReaderOutput));
+ }
+ }
+
+ @Test
+ public void testPollWithoutIdleTimeout() throws Exception {
+ ReaderOutput<WindowedValue<KV<Integer, Integer>>> mockReaderOutput =
+ Mockito.mock(ReaderOutput.class);
+ try (SourceReader<WindowedValue<KV<Integer, Integer>>,
FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader()) {
+ reader.notifyNoMoreSplits();
+ assertEquals(InputStatus.END_OF_INPUT,
reader.pollNext(mockReaderOutput));
+ }
+ }
+
+ @Test
+ public void testIsAvailableOnSplitsAssignment() throws Exception {
+ try (SourceReader<WindowedValue<KV<Integer, Integer>>,
FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader()) {
+ reader.start();
+
+ CompletableFuture<Void> future1 = reader.isAvailable();
+ assertFalse("No split assigned yet, should not be available.",
future1.isDone());
+
+ // Data available on split assigned.
+ reader.addSplits(createSplits(1, 1, 0));
+ assertTrue("Adding a split should complete future1", future1.isDone());
+ assertTrue("Data should be available with a live split.",
reader.isAvailable().isDone());
+ }
+ }
+
+ @Test
+ public void testSnapshotStateAndRestore() throws Exception {
+ final int numSplits = 2;
+ final int numRecordsPerSplit = 10;
+
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+ createSplits(numSplits, numRecordsPerSplit, 0);
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits);
+ List<FlinkSourceSplit<KV<Integer, Integer>>> snapshot;
+
+ // Create a reader, take a snapshot.
+ try (SourceReader<WindowedValue<KV<Integer, Integer>>,
FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader()) {
+ // Only poll half of the records in the first split.
+ pollAndValidate(reader, splits, validatingOutput, numRecordsPerSplit /
2);
+ snapshot = reader.snapshotState(0L);
+ }
+
+ // Create a new validating output because the first split will be consumed
from very beginning.
+ validatingOutput = new RecordsValidatingOutput(splits);
+ // Create another reader, add the snapshot splits back.
+ try (SourceReader<WindowedValue<KV<Integer, Integer>>,
FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader()) {
+ pollAndValidate(reader, snapshot, validatingOutput, Integer.MAX_VALUE);
+ }
+ }
+
+ // --------------- abstract methods impl ----------------
+ @Override
+ protected KV<Integer, Integer> getKVPairs(WindowedValue<KV<Integer,
Integer>> record) {
+ return record.getValue();
+ }
+
+ @Override
+ protected Source<KV<Integer, Integer>> createBeamSource(int splitIndex, int
numRecordsPerSplit) {
+ return new TestBoundedCountingSource(splitIndex, numRecordsPerSplit);
+ }
+
+ @Override
+ protected FlinkBoundedSourceReader<KV<Integer, Integer>> createReader(
+ ScheduledExecutorService executor,
+ long idleTimeoutMs,
+ @Nullable Function<WindowedValue<KV<Integer, Integer>>, Long>
timestampExtractor,
+ TestMetricGroup testMetricGroup) {
+ FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults();
+ pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs);
+ SourceReaderContext mockContext =
createSourceReaderContext(testMetricGroup);
+ if (executor != null) {
+ return new FlinkBoundedSourceReader<>(
+ mockContext, pipelineOptions, executor, timestampExtractor);
+ } else {
+ return new FlinkBoundedSourceReader<>(mockContext, pipelineOptions,
timestampExtractor);
+ }
+ }
+}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java
new file mode 100644
index 00000000000..f420bd8900f
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import static
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader.PENDING_BYTES_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.metrics.Gauge;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.Test;
+
+/** Unite tests for {@link FlinkUnboundedSourceReader}. */
+public class FlinkUnboundedSourceReaderTest
+ extends
FlinkSourceReaderTestBase<WindowedValue<ValueWithRecordId<KV<Integer,
Integer>>>> {
+
+ @Test
+ public void testSnapshotStateAndRestore() throws Exception {
+ final int numSplits = 2;
+ final int numRecordsPerSplit = 10;
+
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+ createSplits(numSplits, numRecordsPerSplit, 0);
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits);
+ List<FlinkSourceSplit<KV<Integer, Integer>>> snapshot;
+
+ // Create a reader, take a snapshot.
+ try (SourceReader<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader()) {
+ pollAndValidate(reader, splits, validatingOutput, numSplits *
numRecordsPerSplit / 2);
+ snapshot = reader.snapshotState(0L);
+ }
+
+ // Create another reader, add the snapshot splits back.
+ try (SourceReader<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader()) {
+ pollAndValidate(reader, snapshot, validatingOutput, Integer.MAX_VALUE);
+ }
+ }
+
+ /**
+ * This is a concurrency correctness test. It verifies that the main thread
is always waken up by
+ * the alarm runner executed in the executor thread.
+ */
+ @Test(timeout = 30000L)
+ public void testIsAvailableAlwaysWakenUp() throws Exception {
+ final int numFuturesRequired = 1_000_000;
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits = new ArrayList<>();
+ splits.add(new FlinkSourceSplit<>(0, new DummySource(Integer.MAX_VALUE)));
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits);
+ ManuallyTriggeredScheduledExecutorService executor =
+ new ManuallyTriggeredScheduledExecutorService();
+
+ try (SourceReader<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader(executor, Long.MAX_VALUE)) {
+ reader.start();
+ reader.addSplits(splits);
+
+ Thread mainThread =
+ new Thread(
+ () -> {
+ try {
+ while (futures.size() < numFuturesRequired) {
+ // This poll will return NOTHING_AVAILABLE after each
record emission.
+ if (reader.pollNext(validatingOutput) ==
InputStatus.NOTHING_AVAILABLE) {
+ CompletableFuture<Void> future = reader.isAvailable();
+ future.get();
+ futures.add(future);
+ }
+ }
+ } catch (Exception e) {
+ if (!exceptionRef.compareAndSet(null, e)) {
+ exceptionRef.get().addSuppressed(e);
+ }
+ }
+ },
+ "MainThread");
+
+ Thread executorThread =
+ new Thread(
+ () -> {
+ while (futures.size() < numFuturesRequired) {
+ executor.triggerScheduledTasks();
+ }
+ },
+ "ExecutorThread");
+
+ mainThread.start();
+ executorThread.start();
+ executorThread.join();
+ }
+ }
+
+ @Test
+ public void testIsAvailableOnSplitChangeWhenNoDataAvailableForAliveReaders()
throws Exception {
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits1 = new ArrayList<>();
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits2 = new ArrayList<>();
+ splits1.add(new FlinkSourceSplit<>(0, new DummySource(0)));
+ splits2.add(new FlinkSourceSplit<>(1, new DummySource(0)));
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits1);
+ ManuallyTriggeredScheduledExecutorService executor =
+ new ManuallyTriggeredScheduledExecutorService();
+
+ try (SourceReader<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader(executor, Long.MAX_VALUE)) {
+ reader.start();
+ reader.addSplits(splits1);
+
+ assertEquals(
+ "The reader should have nothing available",
+ InputStatus.NOTHING_AVAILABLE,
+ reader.pollNext(validatingOutput));
+
+ CompletableFuture<Void> future1 = reader.isAvailable();
+ assertFalse("Future1 should be uncompleted without live split.",
future1.isDone());
+
+ reader.addSplits(splits2);
+ assertTrue("Future1 should be completed upon addition of new splits.",
future1.isDone());
+
+ CompletableFuture<Void> future2 = reader.isAvailable();
+ assertFalse("Future2 should be uncompleted without live split.",
future2.isDone());
+
+ reader.notifyNoMoreSplits();
+ assertTrue("Future2 should be completed upon NoMoreSplitsNotification.",
future2.isDone());
+ }
+ }
+
+ @Test
+ public void testWatermark() throws Exception {
+ ManuallyTriggeredScheduledExecutorService executor =
+ new ManuallyTriggeredScheduledExecutorService();
+ try (FlinkUnboundedSourceReader<KV<Integer, Integer>> reader =
+ (FlinkUnboundedSourceReader<KV<Integer, Integer>>)
createReader(executor, -1L)) {
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits = createSplits(2,
10, 0);
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits);
+
+ reader.start();
+ reader.addSplits(splits);
+
+ // Poll 3 records from split 0 and 2 records from split 1.
+ for (int i = 0; i < 5; i++) {
+ reader.pollNext(validatingOutput);
+ }
+
+ Map<String, TestSourceOutput> sourceOutputs =
validatingOutput.createdSourceOutputs();
+ assertEquals("There should be 2 source outputs created.", 2,
sourceOutputs.size());
+ assertNull(sourceOutputs.get("0").watermark());
+ assertNull(sourceOutputs.get("1").watermark());
+
+ // Trigger the periodic task marking the watermark emission flag.
+ executor.triggerScheduledTasks();
+ // Poll one more time to actually emit the watermark. Getting record
value 2 from split_1.
+ reader.pollNext(validatingOutput);
+
+ assertEquals(3, sourceOutputs.get("0").watermark().getTimestamp());
+ assertEquals(2, sourceOutputs.get("1").watermark().getTimestamp());
+
+ // Poll one more time to ensure no additional watermark is emitted.
Getting record value 3
+ // from split_0.
+ reader.pollNext(validatingOutput);
+ assertEquals(3, sourceOutputs.get("0").watermark().getTimestamp());
+ assertEquals(2, sourceOutputs.get("1").watermark().getTimestamp());
+
+ // Trigger the task to mark the watermark emission flag again.
+ executor.triggerScheduledTasks();
+ // Poll to actually emit the watermark. Getting (split_1 -> 3).
+ reader.pollNext(validatingOutput);
+
+ assertEquals(4, sourceOutputs.get("0").watermark().getTimestamp());
+ assertEquals(3, sourceOutputs.get("1").watermark().getTimestamp());
+ }
+ }
+
+ @Test
+ public void testPendingBytesMetric() throws Exception {
+ ManuallyTriggeredScheduledExecutorService executor =
+ new ManuallyTriggeredScheduledExecutorService();
+ TestMetricGroup testMetricGroup = new TestMetricGroup();
+ try (SourceReader<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ FlinkSourceSplit<KV<Integer, Integer>>>
+ reader = createReader(executor, 0L, null, testMetricGroup)) {
+ reader.start();
+
+ List<FlinkSourceSplit<KV<Integer, Integer>>> splits = createSplits(2,
10, 0);
+ reader.addSplits(splits);
+ RecordsValidatingOutput validatingOutput = new
RecordsValidatingOutput(splits);
+
+ // Need to poll once to create all the readers.
+ reader.pollNext(validatingOutput);
+ Gauge<Long> pendingBytesGauge =
+ (Gauge<Long>)
testMetricGroup.registeredGauge.get(PENDING_BYTES_METRIC_NAME);
+ assertNotNull(pendingBytesGauge);
+ // The TestCountingSource.CountingSourceReader always return 7L as
backlog bytes. Because we
+ // have 2 splits,
+ // the expected value is the magic number 14 here.
+ assertEquals(14L, pendingBytesGauge.getValue().longValue());
+ }
+ }
+
+ // --------------- private helper classes -----------------
+ /** A source whose advance() method only returns true occasionally. */
+ private static class DummySource extends TestCountingSource {
+
+ public DummySource(int numMessagesPerShard) {
+ super(numMessagesPerShard);
+ }
+
+ @Override
+ public CountingSourceReader createReader(
+ PipelineOptions options, @Nullable CounterMark checkpointMark) {
+ CountingSourceReader reader = new DummySourceReader();
+ createdReaders().add(reader);
+ return reader;
+ }
+
+ private class DummySourceReader extends
TestCountingSource.CountingSourceReader {
+ private final Random random = new Random();
+
+ public DummySourceReader() {
+ super(0);
+ }
+
+ @Override
+ public boolean advance() {
+ // Return true once every three times advance is invoked.
+ if (random.nextInt(3) == 0) {
+ return super.advance();
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ // --------------- abstract methods impl ------------------
+ @Override
+ protected KV<Integer, Integer> getKVPairs(
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> record) {
+ return record.getValue().getValue();
+ }
+
+ @Override
+ protected FlinkUnboundedSourceReader<KV<Integer, Integer>> createReader(
+ ScheduledExecutorService executor,
+ long idleTimeoutMs,
+ Function<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, Long>
timestampExtractor,
+ TestMetricGroup metricGroup) {
+ FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults();
+ pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs);
+ pipelineOptions.setAutoWatermarkInterval(10L);
+ SourceReaderContext mockContext = createSourceReaderContext(metricGroup);
+ if (executor != null) {
+ return new FlinkUnboundedSourceReader<>(
+ mockContext, pipelineOptions, executor, timestampExtractor);
+ } else {
+ return new FlinkUnboundedSourceReader<>(mockContext, pipelineOptions,
timestampExtractor);
+ }
+ }
+
+ @Override
+ protected Source<KV<Integer, Integer>> createBeamSource(int splitIndex, int
numRecordsPerSplit) {
+ return new TestCountingSource(numRecordsPerSplit)
+ .withShardNumber(splitIndex)
+ .withFixedNumSplits(1);
+ }
+}