This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6452dc79822 [Flink Runner] Add new Source classes that are based on 
FLIP-27 Source API. (#25525)
6452dc79822 is described below

commit 6452dc7982240819a763aaf9ff3efc4a01fc1d2b
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Mar 7 02:12:58 2023 +0800

    [Flink Runner] Add new Source classes that are based on FLIP-27 Source API. 
(#25525)
---
 .../io/source/compat/FlinkSourceCompat.java        |  31 ++
 .../io/source/compat/SplitEnumeratorCompat.java    |  27 ++
 .../streaming/io/source/compat/package-info.java   |  20 +
 .../streaming/io/source/SourceTestCompat.java      |  62 ++++
 .../io/source/compat/SplitEnumeratorCompat.java    |  27 ++
 .../io/source/compat/FlinkSourceCompat.java        |  28 ++
 .../streaming/io/source/SourceTestCompat.java      |  93 +++++
 runners/flink/flink_runner.gradle                  |   2 +
 .../flink/translation/utils/SerdeUtils.java        |  85 +++++
 .../wrappers/streaming/io/source/FlinkSource.java  | 152 ++++++++
 .../streaming/io/source/FlinkSourceReaderBase.java | 399 ++++++++++++++++++++
 .../streaming/io/source/FlinkSourceSplit.java      |  76 ++++
 .../io/source/FlinkSourceSplitEnumerator.java      | 181 +++++++++
 .../io/source/bounded/FlinkBoundedSource.java      |  67 ++++
 .../source/bounded/FlinkBoundedSourceReader.java   | 146 ++++++++
 .../streaming/io/source/bounded/package-info.java  |  21 ++
 .../io/source/impulse/BeamImpulseSource.java       | 107 ++++++
 .../streaming/io/source/impulse/package-info.java  |  21 ++
 .../wrappers/streaming/io/source/package-info.java |  21 ++
 .../io/source/unbounded/FlinkUnboundedSource.java  |  64 ++++
 .../unbounded/FlinkUnboundedSourceReader.java      | 272 ++++++++++++++
 .../io/source/unbounded/package-info.java          |  21 ++
 .../streaming/io/TestBoundedCountingSource.java    | 150 ++++++++
 .../wrappers/streaming/io/TestCountingSource.java  |  63 +++-
 .../io/source/FlinkSourceReaderTestBase.java       | 403 +++++++++++++++++++++
 .../io/source/FlinkSourceSplitEnumeratorTest.java  | 151 ++++++++
 .../wrappers/streaming/io/source/TestSource.java   |  38 ++
 .../bounded/FlinkBoundedSourceReaderTest.java      | 146 ++++++++
 .../unbounded/FlinkUnboundedSourceReaderTest.java  | 318 ++++++++++++++++
 29 files changed, 3186 insertions(+), 6 deletions(-)

diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
new file mode 100644
index 00000000000..2f9e69fe4f7
--- /dev/null
+++ 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+
+public class FlinkSourceCompat {
+
+  public static Counter getNumRecordsInCounter(SourceReaderContext context) {
+    return ((OperatorMetricGroup) context.metricGroup())
+        .getIOMetricGroup()
+        .getNumRecordsInCounter();
+  }
+}
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
new file mode 100644
index 00000000000..d6bed940470
--- /dev/null
+++ 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
+    extends SplitEnumerator<SplitT, CheckpointT> {
+
+  CheckpointT snapshotState(long checkpointId) throws Exception;
+}
diff --git 
a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
new file mode 100644
index 00000000000..08bba20e576
--- /dev/null
+++ 
b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Classes helping maintain backwards compatibility across Flink versions. */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
diff --git 
a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
new file mode 100644
index 00000000000..1ddc2a957b7
--- /dev/null
+++ 
b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+public class SourceTestCompat {
+
+  /** A MetricGroup implementation which records the registered gauge. */
+  public static class TestMetricGroup
+      extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup {
+    public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
+    public final Counter numRecordsInCounter = new SimpleCounter();
+
+    @Override
+    public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT 
gauge) {
+      registeredGauge.put(name, gauge);
+      return gauge;
+    }
+
+    @Override
+    public OperatorIOMetricGroup getIOMetricGroup() {
+      return new OperatorIOMetricGroup(this) {
+        @Override
+        public Counter getNumRecordsInCounter() {
+          return numRecordsInCounter;
+        }
+      };
+    }
+  }
+
+  public interface ReaderOutputCompat<T> extends ReaderOutput<T> {
+    void markActive();
+  }
+
+  public interface SourceOutputCompat<T> extends SourceOutput<T> {
+    void markActive();
+  }
+}
diff --git 
a/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
 
b/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
new file mode 100644
index 00000000000..06fdd781fc5
--- /dev/null
+++ 
b/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+public interface SplitEnumeratorCompat<SplitT extends SourceSplit, CheckpointT>
+    extends SplitEnumerator<SplitT, CheckpointT> {
+
+  CheckpointT snapshotState() throws Exception;
+}
diff --git 
a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
 
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
new file mode 100644
index 00000000000..f68ae75d38e
--- /dev/null
+++ 
b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+
+public class FlinkSourceCompat {
+
+  public static Counter getNumRecordsInCounter(SourceReaderContext context) {
+    return context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+  }
+}
diff --git 
a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
 
b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
new file mode 100644
index 00000000000..62b16eedca0
--- /dev/null
+++ 
b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+public class SourceTestCompat {
+
+  /** A MetricGroup implementation which records the registered gauge. */
+  public static class TestMetricGroup extends UnregisteredMetricsGroup
+      implements SourceReaderMetricGroup {
+    public final Map<String, Gauge<?>> registeredGauge = new HashMap<>();
+    public final Counter numRecordsInCounter = new SimpleCounter();
+
+    @Override
+    public OperatorIOMetricGroup getIOMetricGroup() {
+      return new UnregisteredOperatorIOMetricGroup() {
+        @Override
+        public Counter getNumRecordsInCounter() {
+          return numRecordsInCounter;
+        }
+      };
+    }
+
+    @Override
+    public <T, GaugeT extends Gauge<T>> GaugeT gauge(String name, GaugeT 
gauge) {
+      registeredGauge.put(name, gauge);
+      return gauge;
+    }
+
+    @Override
+    public Counter getNumRecordsInErrorsCounter() {
+      return new SimpleCounter();
+    }
+
+    @Override
+    public void setPendingBytesGauge(Gauge<Long> pendingBytesGauge) {}
+
+    @Override
+    public void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge) {}
+  }
+
+  private static class UnregisteredOperatorIOMetricGroup extends 
UnregisteredMetricsGroup
+      implements OperatorIOMetricGroup {
+    @Override
+    public Counter getNumRecordsInCounter() {
+      return new SimpleCounter();
+    }
+
+    @Override
+    public Counter getNumRecordsOutCounter() {
+      return new SimpleCounter();
+    }
+
+    @Override
+    public Counter getNumBytesInCounter() {
+      return new SimpleCounter();
+    }
+
+    @Override
+    public Counter getNumBytesOutCounter() {
+      return new SimpleCounter();
+    }
+  }
+
+  public interface ReaderOutputCompat<T> extends ReaderOutput<T> {}
+
+  public interface SourceOutputCompat<T> extends SourceOutput<T> {}
+}
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index ccd4f75d3b7..cdfe921c1b2 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -179,6 +179,7 @@ dependencies {
   if (flink_version.compareTo("1.14") >= 0) {
     implementation "org.apache.flink:flink-runtime:$flink_version"
     implementation "org.apache.flink:flink-optimizer:$flink_version"
+    implementation "org.apache.flink:flink-metrics-core:$flink_version"
     testImplementation "org.apache.flink:flink-runtime:$flink_version:tests"
     testImplementation "org.apache.flink:flink-rpc-akka:$flink_version"
   } else {
@@ -197,6 +198,7 @@ dependencies {
   testImplementation project(":sdks:java:io:google-cloud-platform")
   testImplementation library.java.jackson_dataformat_yaml
   testImplementation "org.apache.flink:flink-core:$flink_version:tests"
+  testImplementation 
"org.apache.flink:flink-connector-test-utils:$flink_version"
   testImplementation project(":sdks:java:harness")
   testRuntimeOnly library.java.slf4j_simple
   validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java
new file mode 100644
index 00000000000..c502faeb7a6
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/** Util methods to help with serialization / deserialization. */
+public class SerdeUtils {
+
+  // Private constructor for a util class.
+  private SerdeUtils() {}
+
+  public static @Nonnull byte[] serializeObject(@Nullable Object obj) throws 
IOException {
+    if (obj == null) {
+      return new byte[0];
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(obj);
+    oos.close();
+    return baos.toByteArray();
+  }
+
+  @SuppressWarnings("unchecked")
+  public static @Nullable Object deserializeObject(byte[] serialized) throws 
IOException {
+    if (serialized == null || serialized.length == 0) {
+      return null;
+    }
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+        ObjectInputStream ois = new ObjectInputStream(bais)) {
+      return ois.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static <T> SimpleVersionedSerializer<T> getNaiveObjectSerializer() {
+    return new SimpleVersionedSerializer<T>() {
+      @Override
+      public int getVersion() {
+        return 0;
+      }
+
+      @Override
+      public byte[] serialize(T obj) throws IOException {
+        return serializeObject(obj);
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public T deserialize(int version, byte[] serialized) throws IOException {
+        if (version > getVersion()) {
+          throw new IOException(
+              String.format(
+                  "Received serialized object of version %d, which is higher 
than "
+                      + "the highest supported version %d.",
+                  version, getVersion()));
+        }
+        return (T) deserializeObject(serialized);
+      }
+    };
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
new file mode 100644
index 00000000000..c001b263340
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse.BeamImpulseSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The base class for {@link FlinkBoundedSource} and {@link 
FlinkUnboundedSource}.
+ *
+ * @param <T> The data type of the records emitted by the raw Beam sources.
+ * @param <OutputT> The data type of the records emitted by the Flink Source.
+ */
+public abstract class FlinkSource<T, OutputT>
+    implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, 
List<FlinkSourceSplit<T>>>> {
+  protected final org.apache.beam.sdk.io.Source<T> beamSource;
+  protected final Boundedness boundedness;
+  protected final SerializablePipelineOptions serializablePipelineOptions;
+
+  private final int numSplits;
+
+  // ----------------- public static methods to construct sources 
--------------------
+
+  public static <T> FlinkBoundedSource<T> bounded(
+      BoundedSource<T> boundedSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      int numSplits) {
+    return new FlinkBoundedSource<>(
+        boundedSource, serializablePipelineOptions, Boundedness.BOUNDED, 
numSplits);
+  }
+
+  public static <T> FlinkUnboundedSource<T> unbounded(
+      UnboundedSource<T, ?> source,
+      SerializablePipelineOptions serializablePipelineOptions,
+      int numSplits) {
+    return new FlinkUnboundedSource<>(source, serializablePipelineOptions, 
numSplits);
+  }
+
+  public static FlinkBoundedSource<byte[]> unboundedImpulse(long 
shutdownSourceAfterIdleMs) {
+    FlinkPipelineOptions flinkPipelineOptions = 
FlinkPipelineOptions.defaults();
+    
flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
+    // Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but 
overriding its
+    // boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will 
treat this
+    // source as an unbounded source and execute the job in streaming mode. 
This also
+    // works well with checkpoint, because the FlinkSourceSplit containing the
+    // BeamImpulseSource will be discarded after the impulse emission. So the 
streaming
+    // job won't see another impulse after failover.
+    return new FlinkBoundedSource<>(
+        new BeamImpulseSource(),
+        new SerializablePipelineOptions(flinkPipelineOptions),
+        Boundedness.CONTINUOUS_UNBOUNDED,
+        1,
+        record -> Watermark.MAX_WATERMARK.getTimestamp());
+  }
+
+  public static FlinkBoundedSource<byte[]> boundedImpulse() {
+    return new FlinkBoundedSource<>(
+        new BeamImpulseSource(),
+        new SerializablePipelineOptions(FlinkPipelineOptions.defaults()),
+        Boundedness.BOUNDED,
+        1,
+        record -> Watermark.MAX_WATERMARK.getTimestamp());
+  }
+
+  // ------ Common implementations for both bounded and unbounded source 
---------
+
+  protected FlinkSource(
+      org.apache.beam.sdk.io.Source<T> beamSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      Boundedness boundedness,
+      int numSplits) {
+    this.beamSource = beamSource;
+    this.serializablePipelineOptions = serializablePipelineOptions;
+    this.boundedness = boundedness;
+    this.numSplits = numSplits;
+  }
+
+  @Override
+  public Boundedness getBoundedness() {
+    return boundedness;
+  }
+
+  @Override
+  public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, 
List<FlinkSourceSplit<T>>>>
+      createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> 
enumContext) throws Exception {
+    return new FlinkSourceSplitEnumerator<>(
+        enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
+  }
+
+  @Override
+  public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, 
List<FlinkSourceSplit<T>>>>
+      restoreEnumerator(
+          SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,
+          Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)
+          throws Exception {
+    FlinkSourceSplitEnumerator<T> enumerator =
+        new FlinkSourceSplitEnumerator<>(
+            enumContext, beamSource, serializablePipelineOptions.get(), 
numSplits);
+    checkpoint.forEach(
+        (subtaskId, splitsForSubtask) -> 
enumerator.addSplitsBack(splitsForSubtask, subtaskId));
+    return enumerator;
+  }
+
+  @Override
+  public SimpleVersionedSerializer<FlinkSourceSplit<T>> getSplitSerializer() {
+    return FlinkSourceSplit.serializer();
+  }
+
+  @Override
+  public SimpleVersionedSerializer<Map<Integer, List<FlinkSourceSplit<T>>>>
+      getEnumeratorCheckpointSerializer() {
+    return SerdeUtils.getNaiveObjectSerializer();
+  }
+
+  public int getNumSplits() {
+    return numSplits;
+  }
+
+  @FunctionalInterface
+  public interface TimestampExtractor<T> extends Function<T, Long>, 
Serializable {}
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
new file mode 100644
index 00000000000..27b84910ac2
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates 
{@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link 
BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link 
Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with 
AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new 
CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread 
may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + 
context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum 
FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = 
FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming 
mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire 
source split.
+            // So, in case of failure, all the data from this split will be 
consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, 
reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), 
checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      CompletableFuture<Void> aliveReaderAvailableFuture = 
isAvailableForAliveReaders();
+      // Regardless of whether there is data available from the alive readers, 
the
+      // main thread needs to be woken up if there is a split change. Hence, we
+      // need to combine the data available future with the split change 
future.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return CompletableFuture.anyOf(aliveReaderAvailableFuture, 
waitingForSplitChangeFuture)
+          .thenAccept(ignored -> {});
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no 
more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is 
available when there
+   * are alive readers. For example, an unbounded source may not have any 
source split ready for
+   * data emission even if all the sources are still alive. Whereas for the 
bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses 
--------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws 
IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new 
ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long 
periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", 
exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods 
------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      byte[] 
getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
+    UnboundedSource<OutputT, CheckpointMarkT> source =
+        (UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
+    CheckpointMarkT checkpointMark = (CheckpointMarkT) 
reader.getCheckpointMark();
+    Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      coder.encode(checkpointMark, baos);
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
+    }
+  }
+
+  private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> 
sourceSplit)
+      throws IOException {
+    Source<T> beamSource = sourceSplit.getBeamSplitSource();
+    if (beamSource instanceof BoundedSource) {
+      return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
+    } else if (beamSource instanceof UnboundedSource) {
+      return createUnboundedSourceReader(beamSource, 
sourceSplit.getSplitState());
+    } else {
+      throw new IllegalStateException("Unknown source type " + 
beamSource.getClass());
+    }
+  }
+
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      Source.Reader<T> createUnboundedSourceReader(
+          Source<T> beamSource, @Nullable byte[] splitState) throws 
IOException {
+    UnboundedSource<T, CheckpointMarkT> unboundedSource =
+        (UnboundedSource<T, CheckpointMarkT>) beamSource;
+    Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
+    if (splitState == null) {
+      return unboundedSource.createReader(pipelineOptions, null);
+    } else {
+      try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) {
+        return unboundedSource.createReader(pipelineOptions, 
coder.decode(bais));
+      }
+    }
+  }
+
+  // -------------------- protected helper class ---------------------
+
+  /** A wrapper for the reader and its associated information. */
+  protected final class ReaderAndOutput {
+    public final String splitId;
+    public final Source.Reader<T> reader;
+    private boolean started;
+    private @Nullable SourceOutput<OutputT> outputForSplit;
+
+    public ReaderAndOutput(String splitId, Source.Reader<T> reader, boolean 
started) {
+      this.splitId = splitId;
+      this.reader = reader;
+      this.started = started;
+      this.outputForSplit = null;
+    }
+
+    public SourceOutput<OutputT> 
getAndMaybeCreateSplitOutput(ReaderOutput<OutputT> output) {
+      if (outputForSplit == null) {
+        outputForSplit = output.createOutputForSplit(splitId);
+      }
+      return outputForSplit;
+    }
+
+    public boolean startOrAdvance() throws IOException {
+      if (started) {
+        return reader.advance();
+      } else {
+        started = true;
+        return reader.start();
+      }
+    }
+
+    public @Nullable SourceOutput<OutputT> sourceOutput() {
+      return outputForSplit;
+    }
+  }
+
+  private final class ErrorRecordingRunnable implements Runnable {
+    private final Runnable runnable;
+
+    ErrorRecordingRunnable(Runnable r) {
+      this.runnable = r;
+    }
+
+    @Override
+    public void run() {
+      try {
+        runnable.run();
+      } catch (Throwable t) {
+        recordException(t);
+      }
+    }
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java
new file mode 100644
index 00000000000..32fcd23344d
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
+import org.apache.beam.sdk.io.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * A Flink {@link SourceSplit} implementation that encapsulates a Beam {@link 
Source}. This class
+ * also serves as the holder of the checkpoint state of the Beam {@link
+ * org.apache.beam.sdk.io.Source.Reader Reader} created from the encapsulated 
source. So, the Source
+ * can recreate the Reader from the checkpointed state upon failure recovery.
+ *
+ * @param <T> The output type of the encapsulated Beam {@link Source}.
+ */
+public class FlinkSourceSplit<T> implements SourceSplit, Serializable {
+  // The index of the split.
+  private final int splitIndex;
+  private final Source<T> beamSplitSource;
+  private final @Nullable byte[] splitState;
+
+  public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource) {
+    this(splitIndex, beamSplitSource, null);
+  }
+
+  public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource, @Nullable 
byte[] splitState) {
+    this.splitIndex = splitIndex;
+    this.beamSplitSource = beamSplitSource;
+    this.splitState = splitState;
+  }
+
+  public int splitIndex() {
+    return splitIndex;
+  }
+
+  public @Nullable byte[] getSplitState() {
+    return splitState;
+  }
+
+  public Source<T> getBeamSplitSource() {
+    return beamSplitSource;
+  }
+
+  @Override
+  public String splitId() {
+    return Integer.toString(splitIndex);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("[SplitIndex: %d, BeamSource: %s]", splitIndex, 
beamSplitSource);
+  }
+
+  public static <T> SimpleVersionedSerializer<FlinkSourceSplit<T>> 
serializer() {
+    return SerdeUtils.getNaiveObjectSerializer();
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
new file mode 100644
index 00000000000..292697479bc
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SplitEnumerator 
SplitEnumerator}
+ * implementation that holds a Beam {@link Source} and does the following:
+ *
+ * <ul>
+ *   <li>Split the Beam {@link Source} to desired number of splits.
+ *   <li>Assign the splits to the Flink Source Reader.
+ * </ul>
+ *
+ * <p>Note that at this point, this class has a static round-robin split 
assignment strategy.
+ *
+ * @param <T> The output type of the encapsulated Beam {@link Source}.
+ */
+public class FlinkSourceSplitEnumerator<T>
+    implements SplitEnumeratorCompat<FlinkSourceSplit<T>, Map<Integer, 
List<FlinkSourceSplit<T>>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);
+  private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;
+  private final Source<T> beamSource;
+  private final PipelineOptions pipelineOptions;
+  private final int numSplits;
+  private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;
+  private boolean splitsInitialized;
+
+  public FlinkSourceSplitEnumerator(
+      SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+      Source<T> beamSource,
+      PipelineOptions pipelineOptions,
+      int numSplits) {
+    this.context = context;
+    this.beamSource = beamSource;
+    this.pipelineOptions = pipelineOptions;
+    this.numSplits = numSplits;
+    this.pendingSplits = new HashMap<>(numSplits);
+    this.splitsInitialized = false;
+  }
+
+  @Override
+  public void start() {
+    context.callAsync(
+        () -> {
+          try {
+            List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
+            Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = 
new HashMap<>();
+            int i = 0;
+            for (Source<T> beamSplitSource : beamSplitSourceList) {
+              int targetSubtask = i % context.currentParallelism();
+              List<FlinkSourceSplit<T>> splitsForTask =
+                  flinkSourceSplitsList.computeIfAbsent(
+                      targetSubtask, ignored -> new ArrayList<>());
+              splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
+              i++;
+            }
+            return flinkSourceSplitsList;
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        },
+        (sourceSplits, error) -> {
+          if (error != null) {
+            throw new RuntimeException("Failed to start source enumerator.", 
error);
+          } else {
+            pendingSplits.putAll(sourceSplits);
+            splitsInitialized = true;
+            sendPendingSplitsToSourceReaders();
+          }
+        });
+  }
+
+  @Override
+  public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+    // Not used.
+  }
+
+  @Override
+  public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
+    LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
+    List<FlinkSourceSplit<T>> splitsForSubtask =
+        pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
+    splitsForSubtask.addAll(splits);
+  }
+
+  @Override
+  public void addReader(int subtaskId) {
+    List<FlinkSourceSplit<T>> splitsForSubtask = 
pendingSplits.remove(subtaskId);
+    if (splitsForSubtask != null) {
+      assignSplitsAndLog(splitsForSubtask, subtaskId);
+      pendingSplits.remove(subtaskId);
+    } else {
+      if (splitsInitialized) {
+        LOG.info("There is no split for subtask {}. Signaling no more 
splits.", subtaskId);
+        context.signalNoMoreSplits(subtaskId);
+      }
+    }
+  }
+
+  @Override
+  public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState(long 
checkpointId) throws Exception {
+    LOG.info("Taking snapshot for checkpoint {}", checkpointId);
+    return snapshotState();
+  }
+
+  @Override
+  public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState() throws 
Exception {
+    return pendingSplits;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // NoOp
+  }
+
+  // -------------- Private helper methods ----------------------
+  private List<? extends Source<T>> splitBeamSource() throws Exception {
+    if (beamSource instanceof BoundedSource) {
+      BoundedSource<T> boundedSource = (BoundedSource<T>) beamSource;
+      long desiredSizeBytes = 
boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits;
+      return boundedSource.split(desiredSizeBytes, pipelineOptions);
+    } else if (beamSource instanceof UnboundedSource) {
+      return ((UnboundedSource<T, ?>) beamSource).split(numSplits, 
pipelineOptions);
+    } else {
+      throw new IllegalStateException("Unknown source type " + 
beamSource.getClass());
+    }
+  }
+
+  private void sendPendingSplitsToSourceReaders() {
+    Iterator<Map.Entry<Integer, List<FlinkSourceSplit<T>>>> splitIter =
+        pendingSplits.entrySet().iterator();
+    while (splitIter.hasNext()) {
+      Map.Entry<Integer, List<FlinkSourceSplit<T>>> entry = splitIter.next();
+      int readerIndex = entry.getKey();
+      int targetSubtask = readerIndex % context.currentParallelism();
+      if (context.registeredReaders().containsKey(targetSubtask)) {
+        assignSplitsAndLog(entry.getValue(), targetSubtask);
+        splitIter.remove();
+      }
+    }
+  }
+
+  private void assignSplitsAndLog(List<FlinkSourceSplit<T>> splits, int 
subtaskId) {
+    context.assignSplits(new 
SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
+    context.signalNoMoreSplits(subtaskId);
+    LOG.info("Assigned splits {} to subtask {}", splits, subtaskId);
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java
new file mode 100644
index 00000000000..c2bd904dcc6
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.Source Source} 
implementation that wraps a
+ * Beam {@link BoundedSource BoundedSource}.
+ *
+ * <p>A {@link FlinkBoundedSource} can run in either batch or streaming mode, 
depending on its
+ * {@link Boundedness} setting. For a BoundedSource running in streaming mode, 
it is acting like a
+ * "finite stream".
+ *
+ * @param <T> The output type of the wrapped Beam {@link BoundedSource 
BoundedSource}.
+ */
+public class FlinkBoundedSource<T> extends FlinkSource<T, WindowedValue<T>> {
+  protected final @Nullable TimestampExtractor<WindowedValue<T>> 
timestampExtractor;
+
+  public FlinkBoundedSource(
+      BoundedSource<T> beamSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      Boundedness boundedness,
+      int numSplits) {
+    this(beamSource, serializablePipelineOptions, boundedness, numSplits, 
null);
+  }
+
+  public FlinkBoundedSource(
+      BoundedSource<T> beamSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      Boundedness boundedness,
+      int numSplits,
+      @Nullable TimestampExtractor<WindowedValue<T>> timestampExtractor) {
+    super(beamSource, serializablePipelineOptions, boundedness, numSplits);
+    this.timestampExtractor = timestampExtractor;
+  }
+
+  @Override
+  public SourceReader<WindowedValue<T>, FlinkSourceSplit<T>> createReader(
+      SourceReaderContext readerContext) throws Exception {
+    return new FlinkBoundedSourceReader<>(
+        readerContext, serializablePipelineOptions.get(), timestampExtractor);
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java
new file mode 100644
index 00000000000..9cea73f6a4a
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader 
SourceReader} implementation
+ * that reads from the assigned {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link 
org.apache.beam.sdk.io.BoundedSource.BoundedReader
+ * BoundedReaders}.
+ *
+ * <p>This reader consumes the source splits one by one sequentially, instead 
of concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ *     org.apache.beam.sdk.io.BoundedSource.BoundedReader BoundedReader.}
+ */
+public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T, 
WindowedValue<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
+  private @Nullable Source.Reader<T> currentReader;
+  private int currentSplitId;
+
+  public FlinkBoundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<WindowedValue<T>, Long> timestampExtractor) {
+    super(context, pipelineOptions, timestampExtractor);
+    currentSplitId = -1;
+  }
+
+  @VisibleForTesting
+  protected FlinkBoundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      ScheduledExecutorService executor,
+      @Nullable Function<WindowedValue<T>, Long> timestampExtractor) {
+    super(executor, context, pipelineOptions, timestampExtractor);
+    currentSplitId = -1;
+  }
+
+  @Override
+  public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws 
Exception {
+    checkExceptionAndMaybeThrow();
+    if (currentReader == null && !moveToNextNonEmptyReader()) {
+      // Nothing to read for now.
+      if (noMoreSplits() && checkIdleTimeoutAndMaybeStartCountdown()) {
+        // All the source splits have been read and idle timeout has passed.
+        LOG.info(
+            "All splits have finished reading, and idle time {} ms has 
passed.", idleTimeoutMs);
+        return InputStatus.END_OF_INPUT;
+      } else {
+        // This reader either hasn't received NoMoreSplitsEvent yet or it is 
waiting for idle
+        // timeout.
+        return InputStatus.NOTHING_AVAILABLE;
+      }
+    }
+    Source.Reader<T> tempCurrentReader = currentReader;
+    if (tempCurrentReader != null) {
+      T record = tempCurrentReader.getCurrent();
+      WindowedValue<T> windowedValue =
+          WindowedValue.of(
+              record,
+              tempCurrentReader.getCurrentTimestamp(),
+              GlobalWindow.INSTANCE,
+              PaneInfo.NO_FIRING);
+      if (timestampExtractor == null) {
+        output.collect(windowedValue);
+      } else {
+        output.collect(windowedValue, timestampExtractor.apply(windowedValue));
+      }
+      numRecordsInCounter.inc();
+      // If the advance() invocation throws exception here, the job will just 
fail over and read
+      // everything again from
+      // the beginning. So the failover granularity is the entire Flink job.
+      if (!tempCurrentReader.advance()) {
+        finishSplit(currentSplitId);
+        currentReader = null;
+        currentSplitId = -1;
+        LOG.debug("Finished reading from {}", currentSplitId);
+      }
+      // Always return MORE_AVAILABLE here regardless of the availability of 
next record. If there
+      // is no more
+      // records available in the current split, the next invocation of 
pollNext() will handle that.
+      return InputStatus.MORE_AVAILABLE;
+    } else {
+      throw new IllegalArgumentException(
+          "If we reach here, the current beam reader should not be null");
+    }
+  }
+
+  @Override
+  protected CompletableFuture<Void> isAvailableForAliveReaders() {
+    // For bounded source, as long as there are active readers, the data is 
available.
+    return AVAILABLE_NOW;
+  }
+
+  // ------------------------- private helper methods 
--------------------------
+
+  private boolean moveToNextNonEmptyReader() throws IOException {
+    Optional<ReaderAndOutput> readerAndOutput;
+    while ((readerAndOutput = createAndTrackNextReader()).isPresent()) {
+      ReaderAndOutput rao = readerAndOutput.get();
+      if (rao.reader.start()) {
+        currentSplitId = Integer.parseInt(rao.splitId);
+        currentReader = rao.reader;
+        return true;
+      } else {
+        finishSplit(Integer.parseInt(rao.splitId));
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java
new file mode 100644
index 00000000000..7722241331d
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java
new file mode 100644
index 00000000000..cbf1871dfba
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+
+/** A Beam {@link BoundedSource} for Impulse Source. */
+public class BeamImpulseSource extends BoundedSource<byte[]> {
+
+  @Override
+  public List<? extends BoundedSource<byte[]>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    // Always return a single split.
+    return Collections.singletonList(this);
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    return 0;
+  }
+
+  @Override
+  public BoundedReader<byte[]> createReader(PipelineOptions options) throws 
IOException {
+    return new ImpulseReader(this);
+  }
+
+  private static class ImpulseReader extends 
BoundedSource.BoundedReader<byte[]> {
+    private final BeamImpulseSource source;
+    private boolean started;
+    private int index;
+
+    private ImpulseReader(BeamImpulseSource source) {
+      this.source = source;
+      this.started = false;
+      this.index = 0;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      started = true;
+      return true;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (!started) {
+        throw new IllegalStateException("start() should be called before 
calling advance()");
+      }
+      index++;
+      return false;
+    }
+
+    @Override
+    public byte[] getCurrent() throws NoSuchElementException {
+      if (!started) {
+        throw new IllegalStateException("The reader hasn't started.");
+      }
+      if (index == 0) {
+        return new byte[0];
+      } else {
+        throw new NoSuchElementException("No element is available.");
+      }
+    }
+
+    @Override
+    public BoundedSource<byte[]> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      if (!started) {
+        throw new IllegalStateException("The reader hasn't started.");
+      }
+      if (index == 0) {
+        return BoundedWindow.TIMESTAMP_MIN_VALUE;
+      } else {
+        throw new NoSuchElementException("No element is available.");
+      }
+    }
+
+    @Override
+    public void close() throws IOException {}
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java
new file mode 100644
index 00000000000..3fd38e2259d
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java
new file mode 100644
index 00000000000..448b3fef6d2
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java
new file mode 100644
index 00000000000..b4049220170
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.Source Source} 
implementation that wraps a
+ * Beam {@link org.apache.beam.sdk.io.UnboundedSource UnboundedSource}.
+ *
+ * @param <T> The output type of the wrapped Beam {@link 
org.apache.beam.sdk.io.UnboundedSource
+ *     UnboundedSource}.
+ */
+public class FlinkUnboundedSource<T> extends FlinkSource<T, 
WindowedValue<ValueWithRecordId<T>>> {
+  private final @Nullable 
TimestampExtractor<WindowedValue<ValueWithRecordId<T>>>
+      timestampExtractor;
+
+  public FlinkUnboundedSource(
+      UnboundedSource<T, ?> beamSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      int numSplits) {
+    this(beamSource, serializablePipelineOptions, numSplits, null);
+  }
+
+  public FlinkUnboundedSource(
+      UnboundedSource<T, ?> beamSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      int numSplits,
+      @Nullable TimestampExtractor<WindowedValue<ValueWithRecordId<T>>> 
timestampExtractor) {
+    super(beamSource, serializablePipelineOptions, 
Boundedness.CONTINUOUS_UNBOUNDED, numSplits);
+    this.timestampExtractor = timestampExtractor;
+  }
+
+  @Override
+  public SourceReader<WindowedValue<ValueWithRecordId<T>>, 
FlinkSourceSplit<T>> createReader(
+      SourceReaderContext readerContext) throws Exception {
+    return new FlinkUnboundedSourceReader<>(
+        readerContext, serializablePipelineOptions.get(), timestampExtractor);
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java
new file mode 100644
index 00000000000..3c596360efd
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader 
SourceReader} implementation
+ * that reads from the assigned {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link 
org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
+ * UnboundedReaders}.
+ *
+ * <p>This reader consumes all the assigned source splits concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ *     org.apache.beam.sdk.io.UnboundedSource.UnboundedReader UnboundedReader}.
+ */
+public class FlinkUnboundedSourceReader<T>
+    extends FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUnboundedSourceReader.class);
+  // This name is defined in FLIP-33.
+  @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = 
"pendingBytes";
+  private static final long SLEEP_ON_IDLE_MS = 50L;
+  private final AtomicReference<CompletableFuture<Void>> 
dataAvailableFutureRef;
+  private final List<ReaderAndOutput> readers;
+  private int currentReaderIndex;
+  private volatile boolean shouldEmitWatermark;
+
+  public FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> 
timestampExtractor) {
+    super(context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @VisibleForTesting
+  protected FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      ScheduledExecutorService executor,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> 
timestampExtractor) {
+    super(executor, context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @Override
+  public void start() {
+    createPendingBytesGauge(context);
+    Long watermarkInterval =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
+    if (watermarkInterval != null) {
+      scheduleTaskAtFixedRate(
+          () -> {
+            // Set the watermark emission flag first.
+            shouldEmitWatermark = true;
+            // Wake up the main thread if necessary.
+            CompletableFuture<Void> f = dataAvailableFutureRef.get();
+            if (f != DUMMY_FUTURE) {
+              f.complete(null);
+            }
+          },
+          watermarkInterval,
+          watermarkInterval);
+    } else {
+      LOG.warn("AutoWatermarkInterval is not set, watermarks won't be 
emitted.");
+    }
+  }
+
+  @Override
+  public InputStatus 
pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output)
+      throws Exception {
+    checkExceptionAndMaybeThrow();
+    maybeEmitWatermark();
+    maybeCreateReaderForNewSplits();
+
+    ReaderAndOutput reader = nextReaderWithData();
+    if (reader != null) {
+      emitRecord(reader, output);
+      return InputStatus.MORE_AVAILABLE;
+    } else {
+      LOG.trace("No data available for now.");
+      return InputStatus.NOTHING_AVAILABLE;
+    }
+  }
+
+  /**
+   * Check whether there are data available from alive readers. If not, set a 
future and wait for
+   * the periodically running wake-up task to complete that future when the 
check interval passes.
+   * This method is only called by the main thread, which is the only thread 
writing to the future
+   * ref. Note that for UnboundedSource, because the splits never finishes, 
there are always alive
+   * readers after the first split assigment. Hence, the return value of {@link
+   * FlinkSourceReaderBase#isAvailable()} will effectively be determined by 
this method after the
+   * first split assignment.
+   */
+  @Override
+  protected CompletableFuture<Void> isAvailableForAliveReaders() {
+    CompletableFuture<Void> future = dataAvailableFutureRef.get();
+    if (future == DUMMY_FUTURE) {
+      CompletableFuture<Void> newFuture = new CompletableFuture<>();
+      // Need to set the future first to avoid the race condition of missing 
the watermark emission
+      // notification.
+      dataAvailableFutureRef.set(newFuture);
+      if (shouldEmitWatermark || hasException()) {
+        // There are exception after we set the new future,
+        // immediately complete the future and return.
+        dataAvailableFutureRef.set(DUMMY_FUTURE);
+        newFuture.complete(null);
+      } else {
+        LOG.debug("There is no data available, scheduling the idle reader 
checker.");
+        scheduleTask(
+            () -> {
+              CompletableFuture<Void> f = dataAvailableFutureRef.get();
+              if (f != DUMMY_FUTURE) {
+                f.complete(null);
+              }
+            },
+            SLEEP_ON_IDLE_MS);
+      }
+      return newFuture;
+    } else if (future.isDone()) {
+      // The previous future is completed, just use it and reset the future 
ref.
+      dataAvailableFutureRef.getAndSet(DUMMY_FUTURE);
+      return future;
+    } else {
+      // The previous future has not been completed, just use it.
+      return future;
+    }
+  }
+
+  // -------------- private helper methods ----------------
+
+  private void emitRecord(
+      ReaderAndOutput readerAndOutput, 
ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output) {
+    UnboundedSource.UnboundedReader<T> reader = 
asUnbounded(readerAndOutput.reader);
+    T item = reader.getCurrent();
+    byte[] recordId = reader.getCurrentRecordId();
+    Instant timestamp = reader.getCurrentTimestamp();
+
+    WindowedValue<ValueWithRecordId<T>> windowedValue =
+        WindowedValue.of(
+            new ValueWithRecordId<>(item, recordId),
+            timestamp,
+            GlobalWindow.INSTANCE,
+            PaneInfo.NO_FIRING);
+    LOG.trace("Emitting record: {}", windowedValue);
+    if (timestampExtractor == null) {
+      
readerAndOutput.getAndMaybeCreateSplitOutput(output).collect(windowedValue);
+    } else {
+      readerAndOutput
+          .getAndMaybeCreateSplitOutput(output)
+          .collect(windowedValue, timestampExtractor.apply(windowedValue));
+    }
+    numRecordsInCounter.inc();
+  }
+
+  private void maybeEmitWatermark() {
+    // Here we rely on the Flink source watermark multiplexer to combine the 
per-split watermark.
+    // The runnable may emit more than one watermark when it runs.
+    if (shouldEmitWatermark) {
+      allReaders()
+          .values()
+          .forEach(
+              readerAndOutput -> {
+                SourceOutput<?> sourceOutput = readerAndOutput.sourceOutput();
+                if (sourceOutput != null) {
+                  long watermark = 
asUnbounded(readerAndOutput.reader).getWatermark().getMillis();
+                  sourceOutput.emitWatermark(new Watermark(watermark));
+                }
+              });
+      shouldEmitWatermark = false;
+    }
+  }
+
+  private void maybeCreateReaderForNewSplits() throws Exception {
+    while (!sourceSplits().isEmpty()) {
+      Optional<ReaderAndOutput> readerAndOutputOpt = 
createAndTrackNextReader();
+      if (readerAndOutputOpt.isPresent()) {
+        readers.add(readerAndOutputOpt.get());
+      } else {
+        // Null splitId is only possible when exception occurs, just check 
exception to throw it.
+        checkExceptionAndMaybeThrow();
+      }
+    }
+  }
+
+  private @Nullable ReaderAndOutput nextReaderWithData() throws IOException {
+    int numReaders = readers.size();
+    for (int i = 0; i < numReaders; i++) {
+      ReaderAndOutput readerAndOutput = readers.get(currentReaderIndex);
+      currentReaderIndex = (currentReaderIndex + 1) % numReaders;
+      if (readerAndOutput.startOrAdvance()) {
+        return readerAndOutput;
+      }
+    }
+    return null;
+  }
+
+  private static <T> UnboundedSource.UnboundedReader<T> 
asUnbounded(Source.Reader<T> reader) {
+    return (UnboundedSource.UnboundedReader<T>) reader;
+  }
+
+  private void createPendingBytesGauge(SourceReaderContext context) {
+    // TODO: Replace with 
SourceReaderContest.metricGroup().setPendingBytesGauge() after Flink 1.14
+    // and above.
+    context
+        .metricGroup()
+        .gauge(
+            PENDING_BYTES_METRIC_NAME,
+            () -> {
+              long pendingBytes = -1L;
+              for (FlinkSourceReaderBase<?, ?>.ReaderAndOutput readerAndOutput 
:
+                  allReaders().values()) {
+                long pendingBytesForReader =
+                    asUnbounded(readerAndOutput.reader).getSplitBacklogBytes();
+                if (pendingBytesForReader != -1L) {
+                  pendingBytes =
+                      pendingBytes == -1L
+                          ? pendingBytesForReader
+                          : pendingBytes + pendingBytesForReader;
+                }
+              }
+              return pendingBytes;
+            });
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java
new file mode 100644
index 00000000000..0a853b66f4d
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/** Internal implementation of the Beam runner for Apache Flink. */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java
new file mode 100644
index 00000000000..68438a70120
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.TestSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+public class TestBoundedCountingSource extends BoundedSource<KV<Integer, 
Integer>>
+    implements TestSource {
+  private final int totalNumRecords;
+  private final TestCountingSource source;
+  private final List<TestReader> createdReaders;
+  private int nextValueForValidating;
+  private long nextTimestampForValidating;
+
+  public TestBoundedCountingSource(int shardNum, int totalNumRecords) {
+    this.totalNumRecords = totalNumRecords;
+    this.source =
+        new 
TestCountingSource(totalNumRecords).withShardNumber(shardNum).withFixedNumSplits(1);
+    this.createdReaders = new ArrayList<>();
+    this.nextValueForValidating = 0;
+    this.nextTimestampForValidating = 0;
+  }
+
+  @Override
+  public List<? extends BoundedSource<KV<Integer, Integer>>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    List<TestBoundedCountingSource> splits = new ArrayList<>();
+    int numRecordsAssigned = 0;
+    int shardNum = 0;
+    while (numRecordsAssigned < totalNumRecords) {
+      int numRecordsForSplit =
+          (int) Math.min(totalNumRecords - numRecordsAssigned, 
desiredBundleSizeBytes);
+      splits.add(new TestBoundedCountingSource(shardNum, numRecordsForSplit));
+      numRecordsAssigned += numRecordsForSplit;
+      shardNum++;
+    }
+    return splits;
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    return totalNumRecords;
+  }
+
+  @Override
+  public BoundedReader<KV<Integer, Integer>> createReader(PipelineOptions 
options)
+      throws IOException {
+    BoundedTestCountingSourceReader reader =
+        new BoundedTestCountingSourceReader(source.createReader(options, 
null), this);
+    createdReaders.add(reader);
+    return reader;
+  }
+
+  @Override
+  public List<TestReader> createdReaders() {
+    return createdReaders;
+  }
+
+  @Override
+  public boolean validateNextValue(int value) {
+    boolean result = value == nextValueForValidating;
+    nextValueForValidating++;
+    return result;
+  }
+
+  @Override
+  public boolean validateNextTimestamp(long timestamp) {
+    boolean result = timestamp == nextTimestampForValidating;
+    nextTimestampForValidating++;
+    return result;
+  }
+
+  @Override
+  public boolean isConsumptionCompleted() {
+    return nextValueForValidating == totalNumRecords;
+  }
+
+  @Override
+  public boolean allTimestampsReceived() {
+    return nextTimestampForValidating == nextValueForValidating;
+  }
+
+  public static class BoundedTestCountingSourceReader
+      extends BoundedSource.BoundedReader<KV<Integer, Integer>> implements 
TestReader {
+
+    private final TestCountingSource.CountingSourceReader reader;
+    private final TestBoundedCountingSource currentSource;
+    private boolean closed;
+
+    private BoundedTestCountingSourceReader(
+        TestCountingSource.CountingSourceReader reader, 
TestBoundedCountingSource currentSource) {
+      this.reader = reader;
+      this.currentSource = currentSource;
+      this.closed = false;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return reader.start();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      return reader.advance();
+    }
+
+    @Override
+    public KV<Integer, Integer> getCurrent() throws NoSuchElementException {
+      return reader.getCurrent();
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      reader.close();
+    }
+
+    @Override
+    public BoundedSource<KV<Integer, Integer>> getCurrentSource() {
+      return currentSource;
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
index 41932dd1e49..5c54ce4c44e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.TestSource;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DelegateCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -45,7 +46,8 @@ import org.slf4j.LoggerFactory;
  * where not all the data is available immediately.
  */
 public class TestCountingSource
-    extends UnboundedSource<KV<Integer, Integer>, 
TestCountingSource.CounterMark> {
+    extends UnboundedSource<KV<Integer, Integer>, 
TestCountingSource.CounterMark>
+    implements TestSource {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestCountingSource.class);
 
   private static List<Integer> finalizeTracker;
@@ -65,6 +67,12 @@ public class TestCountingSource
    */
   private static boolean thrown = false;
 
+  private final List<TestReader> createdReaders;
+
+  private int nextValueForValidating;
+
+  private long nextTimestampForValidating;
+
   public static void setFinalizeTracker(List<Integer> finalizeTracker) {
     TestCountingSource.finalizeTracker = finalizeTracker;
   }
@@ -77,7 +85,7 @@ public class TestCountingSource
     return new TestCountingSource(numMessagesPerShard, shardNumber, true, 
throwOnFirstSnapshot, -1);
   }
 
-  private TestCountingSource withShardNumber(int shardNumber) {
+  public TestCountingSource withShardNumber(int shardNumber) {
     return new TestCountingSource(
         numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1);
   }
@@ -107,6 +115,7 @@ public class TestCountingSource
     this.dedup = dedup;
     this.throwOnFirstSnapshot = throwOnFirstSnapshot;
     this.fixedNumSplits = fixedNumSplits;
+    this.createdReaders = new ArrayList<>();
   }
 
   /** Halts emission of elements until {@code continueEmission} is invoked. */
@@ -129,7 +138,7 @@ public class TestCountingSource
     return splits;
   }
 
-  static class CounterMark implements UnboundedSource.CheckpointMark {
+  public static class CounterMark implements UnboundedSource.CheckpointMark {
     int current;
 
     public CounterMark(int current) {
@@ -144,6 +153,35 @@ public class TestCountingSource
     }
   }
 
+  @Override
+  public List<TestReader> createdReaders() {
+    return createdReaders;
+  }
+
+  @Override
+  public boolean validateNextValue(int value) {
+    boolean result = value == nextValueForValidating;
+    nextValueForValidating++;
+    return result;
+  }
+
+  @Override
+  public boolean validateNextTimestamp(long timestamp) {
+    boolean result = timestamp == nextTimestampForValidating;
+    nextTimestampForValidating++;
+    return result;
+  }
+
+  @Override
+  public boolean isConsumptionCompleted() {
+    return nextValueForValidating == numMessagesPerShard;
+  }
+
+  @Override
+  public boolean allTimestampsReceived() {
+    return nextTimestampForValidating == nextValueForValidating;
+  }
+
   @Override
   public Coder<CounterMark> getCheckpointMarkCoder() {
     return DelegateCoder.of(VarIntCoder.of(), new FromCounterMark(), new 
ToCounterMark());
@@ -158,11 +196,14 @@ public class TestCountingSource
    * Public only so that the checkpoint can be conveyed from {@link 
#getCheckpointMark()} to {@link
    * TestCountingSource#createReader(PipelineOptions, CounterMark)} without 
cast.
    */
-  public class CountingSourceReader extends UnboundedReader<KV<Integer, 
Integer>> {
+  public class CountingSourceReader extends UnboundedReader<KV<Integer, 
Integer>>
+      implements TestReader {
     private int current;
+    private boolean closed;
 
     public CountingSourceReader(int startingPoint) {
       this.current = startingPoint;
+      this.closed = false;
     }
 
     @Override
@@ -203,7 +244,9 @@ public class TestCountingSource
     }
 
     @Override
-    public void close() {}
+    public void close() {
+      closed = true;
+    }
 
     @Override
     public TestCountingSource getCurrentSource() {
@@ -238,6 +281,11 @@ public class TestCountingSource
     public long getSplitBacklogBytes() {
       return 7L;
     }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
   }
 
   @Override
@@ -248,7 +296,10 @@ public class TestCountingSource
     } else {
       LOG.debug("restoring reader from checkpoint with current = {}", 
checkpointMark.current);
     }
-    return new CountingSourceReader(checkpointMark != null ? 
checkpointMark.current : -1);
+    CountingSourceReader reader =
+        new CountingSourceReader(checkpointMark != null ? 
checkpointMark.current : -1);
+    createdReaders.add(reader);
+    return reader;
   }
 
   @Override
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java
new file mode 100644
index 00000000000..dcab3aff0f5
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/** An abstract test base for {@link FlinkSourceReaderBase}. */
+public abstract class FlinkSourceReaderTestBase<OutputT> {
+
+  // -------------- test poll --------------
+  @Test(timeout = 30000L)
+  public void testPollBasic() throws Exception {
+    testPoll(5, 10);
+  }
+
+  @Test(timeout = 30000L)
+  public void testPollFromEmptySplit() throws Exception {
+    testPoll(3, 0);
+  }
+
+  @Test
+  public void testPollWithTimestampExtractor() throws Exception {
+    testPoll(5, 10, record -> getKVPairs(record).getValue().longValue());
+  }
+
+  @Test
+  public void testExceptionInExecutorThread() throws Exception {
+    try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader 
= createReader()) {
+      reader.start();
+      ReaderOutput<OutputT> output = Mockito.mock(ReaderOutput.class);
+      // The first poll should not throw any exception.
+      reader.pollNext(output);
+
+      RuntimeException expectedException = new RuntimeException();
+      RuntimeException suppressedException = new RuntimeException();
+      ((FlinkSourceReaderBase) reader)
+          .execute(
+              () -> {
+                throw expectedException;
+              });
+      ((FlinkSourceReaderBase) reader)
+          .execute(
+              () -> {
+                throw suppressedException;
+              });
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      ((FlinkSourceReaderBase) reader).execute(countDownLatch::countDown);
+      countDownLatch.await();
+
+      try {
+        reader.pollNext(output);
+        fail("Should have thrown exception here.");
+      } catch (Exception e) {
+        Throwable actualException = e;
+        while (actualException != expectedException && 
actualException.getCause() != null) {
+          actualException = actualException.getCause();
+        }
+        assertEquals(expectedException, actualException);
+        assertEquals(1, actualException.getSuppressed().length);
+        assertEquals(suppressedException, actualException.getSuppressed()[0]);
+      }
+    }
+  }
+
+  private void testPoll(int numSplits, int numRecordsPerSplit) throws 
Exception {
+    testPoll(numSplits, numRecordsPerSplit, null);
+  }
+
+  private void testPoll(
+      int numSplits, int numRecordsPerSplit, @Nullable Function<OutputT, Long> 
timestampExtractor)
+      throws Exception {
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+        createSplits(numSplits, numRecordsPerSplit, 0);
+    try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader =
+        createReader(timestampExtractor)) {
+      pollAndValidate(reader, splits, timestampExtractor != null);
+    }
+    verifyBeamReaderClosed(splits);
+  }
+
+  // This test may fail if the subclass of FlinkSourceReaderBase overrides
+  // the isAvailable() method, which should have a good reason.
+  @Test
+  public void testIsAvailableOnNoMoreSplitsNotification() throws Exception {
+    try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader 
= createReader()) {
+      reader.start();
+
+      // No splits assigned yet.
+      CompletableFuture<Void> future1 = reader.isAvailable();
+      assertFalse("No split assigned yet, should not be available.", 
future1.isDone());
+
+      // Data available on split assigned.
+      reader.notifyNoMoreSplits();
+      assertTrue("Future1 should be completed upon no more splits 
notification", future1.isDone());
+      assertTrue(
+          "Completed future should be returned so pollNext can be invoked to 
get updated INPUT_STATUS",
+          reader.isAvailable().isDone());
+    }
+  }
+
+  // This test may fail if the subclass of FlinkSourceReaderBase overrides
+  // the isAvailable() method, which should have a good reason.
+  @Test
+  public void testIsAvailableWithIdleTimeout() throws Exception {
+    ManuallyTriggeredScheduledExecutorService executor =
+        new ManuallyTriggeredScheduledExecutorService();
+    try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader =
+        createReader(executor, 1L)) {
+      reader.start();
+
+      CompletableFuture<Void> future1 = reader.isAvailable();
+      assertFalse("Future1 should be uncompleted without live split.", 
future1.isDone());
+
+      reader.notifyNoMoreSplits();
+      assertTrue("Future1 should be completed upon no more splits 
notification.", future1.isDone());
+      CompletableFuture<Void> future2 = reader.isAvailable();
+      assertFalse("Future2 should be uncompleted when waiting for idle 
timeout", future2.isDone());
+
+      executor.triggerScheduledTasks();
+      assertTrue("Future2 should be completed after idle timeout.", 
future2.isDone());
+      assertTrue(
+          "The future should always be completed after idle timeout.",
+          reader.isAvailable().isDone());
+    }
+  }
+
+  // This test may fail if the subclass of FlinkSourceReaderBase overrides
+  // the isAvailable() method, which should have a good reason.
+  @Test
+  public void testIsAvailableWithoutIdleTimeout() throws Exception {
+    try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader 
= createReader()) {
+      reader.start();
+
+      CompletableFuture<Void> future1 = reader.isAvailable();
+      assertFalse("Future1 should be uncompleted without live split.", 
future1.isDone());
+
+      reader.notifyNoMoreSplits();
+      assertTrue("Future1 should be completed upon no more splits 
notification.", future1.isDone());
+      assertTrue(
+          "The future should be completed without idle timeout.", 
reader.isAvailable().isDone());
+    }
+  }
+
+  @Test
+  public void testNumBytesInMetrics() throws Exception {
+    final int numSplits = 2;
+    final int numRecordsPerSplit = 10;
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+        createSplits(numSplits, numRecordsPerSplit, 0);
+    SourceTestCompat.TestMetricGroup testMetricGroup = new 
SourceTestCompat.TestMetricGroup();
+    try (SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader =
+        createReader(null, -1L, null, testMetricGroup)) {
+      pollAndValidate(reader, splits, false);
+    }
+    assertEquals(numRecordsPerSplit * numSplits, 
testMetricGroup.numRecordsInCounter.getCount());
+  }
+
+  // --------------- abstract methods ---------------
+  protected abstract KV<Integer, Integer> getKVPairs(OutputT record);
+
+  protected abstract SourceReader<OutputT, FlinkSourceSplit<KV<Integer, 
Integer>>> createReader(
+      ScheduledExecutorService executor,
+      long idleTimeoutMs,
+      @Nullable Function<OutputT, Long> timestampExtractor,
+      SourceTestCompat.TestMetricGroup testMetricGroup);
+
+  protected abstract Source<KV<Integer, Integer>> createBeamSource(
+      int splitIndex, int numRecordsPerSplit);
+
+  // ------------------- protected helper methods ----------------------
+  protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> 
createReader() {
+    return createReader(null, -1L, null, new 
SourceTestCompat.TestMetricGroup());
+  }
+
+  protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> 
createReader(
+      Function<OutputT, Long> timestampExtractor) {
+    return createReader(null, -1L, timestampExtractor, new 
SourceTestCompat.TestMetricGroup());
+  }
+
+  protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> 
createReader(
+      ScheduledExecutorService executor, long idleTimeoutMs) {
+    return createReader(executor, idleTimeoutMs, null, new 
SourceTestCompat.TestMetricGroup());
+  }
+
+  protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> 
createReader(
+      ScheduledExecutorService executor,
+      long idleTimeoutMs,
+      Function<OutputT, Long> timestampExtractor) {
+    return createReader(
+        executor, idleTimeoutMs, timestampExtractor, new 
SourceTestCompat.TestMetricGroup());
+  }
+
+  protected void pollAndValidate(
+      SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader,
+      List<FlinkSourceSplit<KV<Integer, Integer>>> splits,
+      boolean validateReceivedTimestamp)
+      throws Exception {
+    RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits);
+    pollAndValidate(reader, splits, validatingOutput, Integer.MAX_VALUE);
+    if (validateReceivedTimestamp) {
+      assertTrue(validatingOutput.allTimestampReceived());
+    }
+  }
+
+  protected void pollAndValidate(
+      SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> reader,
+      List<FlinkSourceSplit<KV<Integer, Integer>>> splits,
+      RecordsValidatingOutput validatingOutput,
+      int numRecordsToConsume)
+      throws Exception {
+    reader.addSplits(splits);
+    reader.notifyNoMoreSplits();
+
+    do {
+      reader.pollNext(validatingOutput);
+    } while (!validatingOutput.allRecordsConsumed()
+        && validatingOutput.numCollectedRecords() < numRecordsToConsume);
+  }
+
+  protected List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits(
+      int numSplits, int numRecordsPerSplit, int startingIndex) {
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splitList = new ArrayList<>();
+    for (int i = startingIndex; i < numSplits; i++) {
+      Source<KV<Integer, Integer>> testingSource = createBeamSource(i, 
numRecordsPerSplit);
+      splitList.add(new FlinkSourceSplit<>(i, testingSource));
+    }
+    return splitList;
+  }
+
+  protected void verifyBeamReaderClosed(List<FlinkSourceSplit<KV<Integer, 
Integer>>> splits) {
+    splits.forEach(
+        split -> {
+          TestSource source = (TestSource) split.getBeamSplitSource();
+          assertEquals(
+              "Should have only one beam BoundedReader created", 1, 
source.createdReaders().size());
+          assertTrue(
+              "The beam BoundedReader should have been closed",
+              source.createdReaders().get(0).isClosed());
+        });
+  }
+
+  protected static SourceReaderContext createSourceReaderContext(
+      SourceTestCompat.TestMetricGroup metricGroup) {
+    SourceReaderContext mockContext = Mockito.mock(SourceReaderContext.class);
+    when(mockContext.metricGroup()).thenReturn(metricGroup);
+    return mockContext;
+  }
+
+  // -------------------- protected helper class for fetch result validation 
---------------------
+  protected class RecordsValidatingOutput implements 
SourceTestCompat.ReaderOutputCompat<OutputT> {
+    private final List<Source<KV<Integer, Integer>>> sources;
+    private final Map<String, TestSourceOutput> sourceOutputs;
+    private int numCollectedRecords = 0;
+
+    public RecordsValidatingOutput(List<FlinkSourceSplit<KV<Integer, 
Integer>>> splits) {
+      this.sources = new ArrayList<>();
+      this.sourceOutputs = new HashMap<>();
+      splits.forEach(split -> sources.add(split.getBeamSplitSource()));
+    }
+
+    @Override
+    public void collect(OutputT record) {
+      KV<Integer, Integer> kv = getKVPairs(record);
+      ((TestSource) sources.get(kv.getKey())).validateNextValue(kv.getValue());
+      numCollectedRecords++;
+    }
+
+    @Override
+    public void collect(OutputT record, long timestamp) {
+      KV<Integer, Integer> kv = getKVPairs(record);
+      TestSource testSource = ((TestSource) sources.get(kv.getKey()));
+      testSource.validateNextValue(kv.getValue());
+      testSource.validateNextTimestamp(timestamp);
+      numCollectedRecords++;
+    }
+
+    @Override
+    public void emitWatermark(Watermark watermark) {}
+
+    @Override
+    public void markIdle() {}
+
+    @Override
+    public void markActive() {}
+
+    @Override
+    public SourceOutput<OutputT> createOutputForSplit(String splitId) {
+      return sourceOutputs.computeIfAbsent(splitId, ignored -> new 
TestSourceOutput(this));
+    }
+
+    @Override
+    public void releaseOutputForSplit(String splitId) {}
+
+    public int numCollectedRecords() {
+      return numCollectedRecords;
+    }
+
+    public boolean allRecordsConsumed() {
+      boolean allRecordsConsumed = true;
+      for (Source<?> source : sources) {
+        allRecordsConsumed = allRecordsConsumed && ((TestSource) 
source).isConsumptionCompleted();
+      }
+      return allRecordsConsumed;
+    }
+
+    public boolean allTimestampReceived() {
+      boolean allTimestampReceived = true;
+      for (Source<?> source : sources) {
+        allTimestampReceived =
+            allTimestampReceived && ((TestSource) 
source).allTimestampsReceived();
+      }
+      return allTimestampReceived;
+    }
+
+    public Map<String, TestSourceOutput> createdSourceOutputs() {
+      return sourceOutputs;
+    }
+  }
+
+  protected class TestSourceOutput implements 
SourceTestCompat.SourceOutputCompat<OutputT> {
+    private final ReaderOutput<OutputT> output;
+    private @Nullable Watermark watermark;
+    private boolean isIdle;
+
+    private TestSourceOutput(RecordsValidatingOutput output) {
+      this.output = output;
+      this.watermark = null;
+      this.isIdle = false;
+    }
+
+    @Override
+    public void collect(OutputT record) {
+      output.collect(record);
+    }
+
+    @Override
+    public void collect(OutputT record, long timestamp) {
+      output.collect(record, timestamp);
+    }
+
+    @Override
+    public void emitWatermark(Watermark watermark) {
+      this.watermark = watermark;
+    }
+
+    @Override
+    public void markIdle() {
+      isIdle = true;
+    }
+
+    @Override
+    public void markActive() {
+      isIdle = false;
+    }
+
+    public @Nullable Watermark watermark() {
+      return watermark;
+    }
+
+    public boolean isIdle() {
+      return isIdle;
+    }
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java
new file mode 100644
index 00000000000..59097bae47b
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.values.KV;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.junit.Test;
+
+/** Unit tests for {@link FlinkSourceSplitEnumerator}. */
+public class FlinkSourceSplitEnumeratorTest {
+
+  @Test
+  public void testAssignSplitsWithBoundedSource() throws IOException {
+    final int numSubtasks = 2;
+    final int numSplits = 10;
+    final int totalNumRecords = 10;
+    TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> 
testContext =
+        new TestingSplitEnumeratorContext<>(numSubtasks);
+    TestBoundedCountingSource testSource =
+        new TestBoundedCountingSource(numSplits, totalNumRecords);
+
+    assignSplits(testContext, testSource, numSplits);
+    assertEquals(numSubtasks, testContext.getSplitAssignments().size());
+
+    testContext
+        .getSplitAssignments()
+        .forEach(
+            (subtaskId, state) -> {
+              int expectedNumSplitsPerSubtask = numSplits / numSubtasks;
+              assertEquals(
+                  "Each subtask should have " + expectedNumSplitsPerSubtask + 
" assigned splits",
+                  expectedNumSplitsPerSubtask,
+                  state.getAssignedSplits().size());
+              assertTrue(
+                  "Each subtask should have received NoMoreSplits",
+                  state.hasReceivedNoMoreSplitsSignal());
+              state
+                  .getAssignedSplits()
+                  .forEach(
+                      split -> {
+                        TestBoundedCountingSource source =
+                            (TestBoundedCountingSource) 
split.getBeamSplitSource();
+                        try {
+                          int expectedSplitSize = totalNumRecords / numSplits;
+                          assertEquals(
+                              expectedSplitSize,
+                              
source.getEstimatedSizeBytes(FlinkPipelineOptions.defaults()));
+                        } catch (Exception e) {
+                          fail("Received exception" + e);
+                        }
+                      });
+            });
+  }
+
+  @Test
+  public void testAssignSplitsWithUnboundedSource() throws IOException {
+    final int numSplits = 10;
+    final int numSubtasks = 5;
+    final int numRecordsPerSplit = 10;
+    TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> 
testContext =
+        new TestingSplitEnumeratorContext<>(numSubtasks);
+    TestCountingSource testSource = new TestCountingSource(numRecordsPerSplit);
+
+    assignSplits(testContext, testSource, numSplits);
+
+    testContext
+        .getSplitAssignments()
+        .forEach(
+            (subtaskId, state) -> {
+              int expectedNumSplitsPerSubtask = numSplits / numSubtasks;
+              assertEquals(
+                  "Each subtask should have " + expectedNumSplitsPerSubtask + 
" assigned splits",
+                  expectedNumSplitsPerSubtask,
+                  state.getAssignedSplits().size());
+              assertTrue(
+                  "Each subtask should have received NoMoreSplits",
+                  state.hasReceivedNoMoreSplitsSignal());
+            });
+  }
+
+  @Test
+  public void testAddSplitsBack() throws IOException {
+    final int numSubtasks = 2;
+    final int numSplits = 10;
+    final int totalNumRecords = 10;
+    TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> 
testContext =
+        new TestingSplitEnumeratorContext<>(numSubtasks);
+    TestBoundedCountingSource testSource =
+        new TestBoundedCountingSource(numSplits, totalNumRecords);
+    try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
+        new FlinkSourceSplitEnumerator<>(
+            testContext, testSource, FlinkPipelineOptions.defaults(), 
numSplits)) {
+      splitEnumerator.start();
+      testContext.registerReader(0, "0");
+      splitEnumerator.addReader(0);
+      testContext.getExecutorService().triggerAll();
+
+      List<FlinkSourceSplit<KV<Integer, Integer>>> splitsForReader =
+          testContext.getSplitAssignments().get(0).getAssignedSplits();
+      assertEquals(numSplits / numSubtasks, splitsForReader.size());
+
+      splitEnumerator.addSplitsBack(splitsForReader, 0);
+      splitEnumerator.addReader(0);
+      assertEquals(2 * numSplits / numSubtasks, splitsForReader.size());
+    }
+  }
+
+  private void assignSplits(
+      TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> 
context,
+      Source<KV<Integer, Integer>> source,
+      int numSplits)
+      throws IOException {
+    try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
+        new FlinkSourceSplitEnumerator<>(
+            context, source, FlinkPipelineOptions.defaults(), numSplits)) {
+      splitEnumerator.start();
+      // Add a reader before splitting the beam source.
+      context.registerReader(0, "0");
+      splitEnumerator.addReader(0);
+      context.getExecutorService().triggerAll();
+      context.registerReader(1, "1");
+      // Add another reader after splitting the beam source.
+      splitEnumerator.addReader(1);
+    }
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java
new file mode 100644
index 00000000000..5b8b734a68a
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface TestSource extends Serializable {
+
+  List<TestReader> createdReaders();
+
+  boolean validateNextValue(int value);
+
+  boolean validateNextTimestamp(long timestamp);
+
+  boolean isConsumptionCompleted();
+
+  boolean allTimestampsReceived();
+
+  interface TestReader extends Serializable {
+    boolean isClosed();
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java
new file mode 100644
index 00000000000..6303a729652
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/** Unite tests for {@link FlinkBoundedSourceReader}. */
+public class FlinkBoundedSourceReaderTest
+    extends FlinkSourceReaderTestBase<WindowedValue<KV<Integer, Integer>>> {
+
+  @Test
+  public void testPollWithIdleTimeout() throws Exception {
+    ManuallyTriggeredScheduledExecutorService executor =
+        new ManuallyTriggeredScheduledExecutorService();
+    ReaderOutput<WindowedValue<KV<Integer, Integer>>> mockReaderOutput =
+        Mockito.mock(ReaderOutput.class);
+    try (FlinkBoundedSourceReader<KV<Integer, Integer>> reader =
+        (FlinkBoundedSourceReader<KV<Integer, Integer>>) 
createReader(executor, 1)) {
+      reader.notifyNoMoreSplits();
+      assertEquals(InputStatus.NOTHING_AVAILABLE, 
reader.pollNext(mockReaderOutput));
+
+      executor.triggerScheduledTasks();
+      assertEquals(InputStatus.END_OF_INPUT, 
reader.pollNext(mockReaderOutput));
+    }
+  }
+
+  @Test
+  public void testPollWithoutIdleTimeout() throws Exception {
+    ReaderOutput<WindowedValue<KV<Integer, Integer>>> mockReaderOutput =
+        Mockito.mock(ReaderOutput.class);
+    try (SourceReader<WindowedValue<KV<Integer, Integer>>, 
FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader()) {
+      reader.notifyNoMoreSplits();
+      assertEquals(InputStatus.END_OF_INPUT, 
reader.pollNext(mockReaderOutput));
+    }
+  }
+
+  @Test
+  public void testIsAvailableOnSplitsAssignment() throws Exception {
+    try (SourceReader<WindowedValue<KV<Integer, Integer>>, 
FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader()) {
+      reader.start();
+
+      CompletableFuture<Void> future1 = reader.isAvailable();
+      assertFalse("No split assigned yet, should not be available.", 
future1.isDone());
+
+      // Data available on split assigned.
+      reader.addSplits(createSplits(1, 1, 0));
+      assertTrue("Adding a split should complete future1", future1.isDone());
+      assertTrue("Data should be available with a live split.", 
reader.isAvailable().isDone());
+    }
+  }
+
+  @Test
+  public void testSnapshotStateAndRestore() throws Exception {
+    final int numSplits = 2;
+    final int numRecordsPerSplit = 10;
+
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+        createSplits(numSplits, numRecordsPerSplit, 0);
+    RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits);
+    List<FlinkSourceSplit<KV<Integer, Integer>>> snapshot;
+
+    // Create a reader, take a snapshot.
+    try (SourceReader<WindowedValue<KV<Integer, Integer>>, 
FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader()) {
+      // Only poll half of the records in the first split.
+      pollAndValidate(reader, splits, validatingOutput, numRecordsPerSplit / 
2);
+      snapshot = reader.snapshotState(0L);
+    }
+
+    // Create a new validating output because the first split will be consumed 
from very beginning.
+    validatingOutput = new RecordsValidatingOutput(splits);
+    // Create another reader, add the snapshot splits back.
+    try (SourceReader<WindowedValue<KV<Integer, Integer>>, 
FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader()) {
+      pollAndValidate(reader, snapshot, validatingOutput, Integer.MAX_VALUE);
+    }
+  }
+
+  // --------------- abstract methods impl ----------------
+  @Override
+  protected KV<Integer, Integer> getKVPairs(WindowedValue<KV<Integer, 
Integer>> record) {
+    return record.getValue();
+  }
+
+  @Override
+  protected Source<KV<Integer, Integer>> createBeamSource(int splitIndex, int 
numRecordsPerSplit) {
+    return new TestBoundedCountingSource(splitIndex, numRecordsPerSplit);
+  }
+
+  @Override
+  protected FlinkBoundedSourceReader<KV<Integer, Integer>> createReader(
+      ScheduledExecutorService executor,
+      long idleTimeoutMs,
+      @Nullable Function<WindowedValue<KV<Integer, Integer>>, Long> 
timestampExtractor,
+      TestMetricGroup testMetricGroup) {
+    FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults();
+    pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs);
+    SourceReaderContext mockContext = 
createSourceReaderContext(testMetricGroup);
+    if (executor != null) {
+      return new FlinkBoundedSourceReader<>(
+          mockContext, pipelineOptions, executor, timestampExtractor);
+    } else {
+      return new FlinkBoundedSourceReader<>(mockContext, pipelineOptions, 
timestampExtractor);
+    }
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java
new file mode 100644
index 00000000000..f420bd8900f
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import static 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader.PENDING_BYTES_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.metrics.Gauge;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.Test;
+
+/** Unite tests for {@link FlinkUnboundedSourceReader}. */
+public class FlinkUnboundedSourceReaderTest
+    extends 
FlinkSourceReaderTestBase<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>> {
+
+  @Test
+  public void testSnapshotStateAndRestore() throws Exception {
+    final int numSplits = 2;
+    final int numRecordsPerSplit = 10;
+
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
+        createSplits(numSplits, numRecordsPerSplit, 0);
+    RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits);
+    List<FlinkSourceSplit<KV<Integer, Integer>>> snapshot;
+
+    // Create a reader, take a snapshot.
+    try (SourceReader<
+            WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+            FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader()) {
+      pollAndValidate(reader, splits, validatingOutput, numSplits * 
numRecordsPerSplit / 2);
+      snapshot = reader.snapshotState(0L);
+    }
+
+    // Create another reader, add the snapshot splits back.
+    try (SourceReader<
+            WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+            FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader()) {
+      pollAndValidate(reader, snapshot, validatingOutput, Integer.MAX_VALUE);
+    }
+  }
+
+  /**
+   * This is a concurrency correctness test. It verifies that the main thread 
is always waken up by
+   * the alarm runner executed in the executor thread.
+   */
+  @Test(timeout = 30000L)
+  public void testIsAvailableAlwaysWakenUp() throws Exception {
+    final int numFuturesRequired = 1_000_000;
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits = new ArrayList<>();
+    splits.add(new FlinkSourceSplit<>(0, new DummySource(Integer.MAX_VALUE)));
+    RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits);
+    ManuallyTriggeredScheduledExecutorService executor =
+        new ManuallyTriggeredScheduledExecutorService();
+
+    try (SourceReader<
+            WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+            FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader(executor, Long.MAX_VALUE)) {
+      reader.start();
+      reader.addSplits(splits);
+
+      Thread mainThread =
+          new Thread(
+              () -> {
+                try {
+                  while (futures.size() < numFuturesRequired) {
+                    // This poll will return NOTHING_AVAILABLE after each 
record emission.
+                    if (reader.pollNext(validatingOutput) == 
InputStatus.NOTHING_AVAILABLE) {
+                      CompletableFuture<Void> future = reader.isAvailable();
+                      future.get();
+                      futures.add(future);
+                    }
+                  }
+                } catch (Exception e) {
+                  if (!exceptionRef.compareAndSet(null, e)) {
+                    exceptionRef.get().addSuppressed(e);
+                  }
+                }
+              },
+              "MainThread");
+
+      Thread executorThread =
+          new Thread(
+              () -> {
+                while (futures.size() < numFuturesRequired) {
+                  executor.triggerScheduledTasks();
+                }
+              },
+              "ExecutorThread");
+
+      mainThread.start();
+      executorThread.start();
+      executorThread.join();
+    }
+  }
+
+  @Test
+  public void testIsAvailableOnSplitChangeWhenNoDataAvailableForAliveReaders() 
throws Exception {
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits1 = new ArrayList<>();
+    List<FlinkSourceSplit<KV<Integer, Integer>>> splits2 = new ArrayList<>();
+    splits1.add(new FlinkSourceSplit<>(0, new DummySource(0)));
+    splits2.add(new FlinkSourceSplit<>(1, new DummySource(0)));
+    RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits1);
+    ManuallyTriggeredScheduledExecutorService executor =
+        new ManuallyTriggeredScheduledExecutorService();
+
+    try (SourceReader<
+            WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+            FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader(executor, Long.MAX_VALUE)) {
+      reader.start();
+      reader.addSplits(splits1);
+
+      assertEquals(
+          "The reader should have nothing available",
+          InputStatus.NOTHING_AVAILABLE,
+          reader.pollNext(validatingOutput));
+
+      CompletableFuture<Void> future1 = reader.isAvailable();
+      assertFalse("Future1 should be uncompleted without live split.", 
future1.isDone());
+
+      reader.addSplits(splits2);
+      assertTrue("Future1 should be completed upon addition of new splits.", 
future1.isDone());
+
+      CompletableFuture<Void> future2 = reader.isAvailable();
+      assertFalse("Future2 should be uncompleted without live split.", 
future2.isDone());
+
+      reader.notifyNoMoreSplits();
+      assertTrue("Future2 should be completed upon NoMoreSplitsNotification.", 
future2.isDone());
+    }
+  }
+
+  @Test
+  public void testWatermark() throws Exception {
+    ManuallyTriggeredScheduledExecutorService executor =
+        new ManuallyTriggeredScheduledExecutorService();
+    try (FlinkUnboundedSourceReader<KV<Integer, Integer>> reader =
+        (FlinkUnboundedSourceReader<KV<Integer, Integer>>) 
createReader(executor, -1L)) {
+      List<FlinkSourceSplit<KV<Integer, Integer>>> splits = createSplits(2, 
10, 0);
+      RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits);
+
+      reader.start();
+      reader.addSplits(splits);
+
+      // Poll 3 records from split 0 and 2 records from split 1.
+      for (int i = 0; i < 5; i++) {
+        reader.pollNext(validatingOutput);
+      }
+
+      Map<String, TestSourceOutput> sourceOutputs = 
validatingOutput.createdSourceOutputs();
+      assertEquals("There should be 2 source outputs created.", 2, 
sourceOutputs.size());
+      assertNull(sourceOutputs.get("0").watermark());
+      assertNull(sourceOutputs.get("1").watermark());
+
+      // Trigger the periodic task marking the watermark emission flag.
+      executor.triggerScheduledTasks();
+      // Poll one more time to actually emit the watermark. Getting record 
value 2 from split_1.
+      reader.pollNext(validatingOutput);
+
+      assertEquals(3, sourceOutputs.get("0").watermark().getTimestamp());
+      assertEquals(2, sourceOutputs.get("1").watermark().getTimestamp());
+
+      // Poll one more time to ensure no additional watermark is emitted. 
Getting record value 3
+      // from split_0.
+      reader.pollNext(validatingOutput);
+      assertEquals(3, sourceOutputs.get("0").watermark().getTimestamp());
+      assertEquals(2, sourceOutputs.get("1").watermark().getTimestamp());
+
+      // Trigger the task to mark the watermark emission flag again.
+      executor.triggerScheduledTasks();
+      // Poll to actually emit the watermark. Getting (split_1 -> 3).
+      reader.pollNext(validatingOutput);
+
+      assertEquals(4, sourceOutputs.get("0").watermark().getTimestamp());
+      assertEquals(3, sourceOutputs.get("1").watermark().getTimestamp());
+    }
+  }
+
+  @Test
+  public void testPendingBytesMetric() throws Exception {
+    ManuallyTriggeredScheduledExecutorService executor =
+        new ManuallyTriggeredScheduledExecutorService();
+    TestMetricGroup testMetricGroup = new TestMetricGroup();
+    try (SourceReader<
+            WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+            FlinkSourceSplit<KV<Integer, Integer>>>
+        reader = createReader(executor, 0L, null, testMetricGroup)) {
+      reader.start();
+
+      List<FlinkSourceSplit<KV<Integer, Integer>>> splits = createSplits(2, 
10, 0);
+      reader.addSplits(splits);
+      RecordsValidatingOutput validatingOutput = new 
RecordsValidatingOutput(splits);
+
+      // Need to poll once to create all the readers.
+      reader.pollNext(validatingOutput);
+      Gauge<Long> pendingBytesGauge =
+          (Gauge<Long>) 
testMetricGroup.registeredGauge.get(PENDING_BYTES_METRIC_NAME);
+      assertNotNull(pendingBytesGauge);
+      // The TestCountingSource.CountingSourceReader always return 7L as 
backlog bytes. Because we
+      // have 2 splits,
+      // the expected value is the magic number 14 here.
+      assertEquals(14L, pendingBytesGauge.getValue().longValue());
+    }
+  }
+
+  // --------------- private helper classes -----------------
+  /** A source whose advance() method only returns true occasionally. */
+  private static class DummySource extends TestCountingSource {
+
+    public DummySource(int numMessagesPerShard) {
+      super(numMessagesPerShard);
+    }
+
+    @Override
+    public CountingSourceReader createReader(
+        PipelineOptions options, @Nullable CounterMark checkpointMark) {
+      CountingSourceReader reader = new DummySourceReader();
+      createdReaders().add(reader);
+      return reader;
+    }
+
+    private class DummySourceReader extends 
TestCountingSource.CountingSourceReader {
+      private final Random random = new Random();
+
+      public DummySourceReader() {
+        super(0);
+      }
+
+      @Override
+      public boolean advance() {
+        // Return true once every three times advance is invoked.
+        if (random.nextInt(3) == 0) {
+          return super.advance();
+        } else {
+          return false;
+        }
+      }
+    }
+  }
+
+  // --------------- abstract methods impl ------------------
+  @Override
+  protected KV<Integer, Integer> getKVPairs(
+      WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> record) {
+    return record.getValue().getValue();
+  }
+
+  @Override
+  protected FlinkUnboundedSourceReader<KV<Integer, Integer>> createReader(
+      ScheduledExecutorService executor,
+      long idleTimeoutMs,
+      Function<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, Long> 
timestampExtractor,
+      TestMetricGroup metricGroup) {
+    FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults();
+    pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs);
+    pipelineOptions.setAutoWatermarkInterval(10L);
+    SourceReaderContext mockContext = createSourceReaderContext(metricGroup);
+    if (executor != null) {
+      return new FlinkUnboundedSourceReader<>(
+          mockContext, pipelineOptions, executor, timestampExtractor);
+    } else {
+      return new FlinkUnboundedSourceReader<>(mockContext, pipelineOptions, 
timestampExtractor);
+    }
+  }
+
+  @Override
+  protected Source<KV<Integer, Integer>> createBeamSource(int splitIndex, int 
numRecordsPerSplit) {
+    return new TestCountingSource(numRecordsPerSplit)
+        .withShardNumber(splitIndex)
+        .withFixedNumSplits(1);
+  }
+}

Reply via email to