becketqin commented on code in PR #25525:
URL: https://github.com/apache/beam/pull/25525#discussion_r1117833265


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader 
SourceReader} implementation
+ * that reads from the assigned {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link 
org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
+ * UnboundedReaders}.
+ *
+ * <p>This reader consumes all the assigned source splits concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ *     org.apache.beam.sdk.io.UnboundedSource.UnboundedReader UnboundedReader}.
+ */
+public class FlinkUnboundedSourceReader<T>
+    extends FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUnboundedSourceReader.class);
+  // This name is defined in FLIP-33.
+  @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = 
"pendingBytes";
+  private static final long SLEEP_ON_IDLE_MS = 50L;
+  private final AtomicReference<CompletableFuture<Void>> 
dataAvailableFutureRef;
+  private final List<ReaderAndOutput> readers;
+  private int currentReaderIndex;
+  private volatile boolean shouldEmitWatermark;
+
+  public FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> 
timestampExtractor) {
+    super(context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @VisibleForTesting
+  protected FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      ScheduledExecutorService executor,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> 
timestampExtractor) {
+    super(executor, context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @Override
+  public void start() {
+    createPendingBytesGauge(context);
+    Long watermarkInterval =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
+    if (watermarkInterval != null) {
+      scheduleTaskAtFixedRate(
+          () -> {
+            // Set the watermark emission flag first.
+            shouldEmitWatermark = true;
+            // Wake up the main thread if necessary.
+            CompletableFuture<Void> f = dataAvailableFutureRef.get();
+            if (f != DUMMY_FUTURE) {
+              f.complete(null);
+            }
+          },
+          watermarkInterval,
+          watermarkInterval);
+    } else {
+      LOG.warn("AutoWatermarkInterval is not set, watermarks won't be 
emitted.");
+    }
+  }
+
+  @Override
+  public InputStatus 
pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output)
+      throws Exception {
+    checkExceptionAndMaybeThrow();
+    maybeEmitWatermark();
+    maybeCreateReaderForNewSplits();
+
+    ReaderAndOutput reader = nextReaderWithData();
+    if (reader != null) {
+      emitRecord(reader, output);
+      return InputStatus.MORE_AVAILABLE;
+    } else {
+      LOG.trace("No data available for now.");
+      return InputStatus.NOTHING_AVAILABLE;
+    }
+  }
+
+  /**
+   * Check whether there are data available from alive readers. If not, set a 
future and wait for
+   * the periodically running wake-up task to complete that future when the 
check interval passes.
+   * This method is only called by the main thread, which is the only thread 
writing to the future
+   * ref. Note that for UnboundedSource, because the splits never finishes, 
there are always alive
+   * readers after the first split assigment. Hence, the return value of {@link
+   * FlinkSourceReaderBase#isAvailable()} will effectively be determined by 
this method after the
+   * first split assignment.
+   */
+  @Override
+  protected CompletableFuture<Void> isAvailableForAliveReaders() {
+    CompletableFuture<Void> future = dataAvailableFutureRef.get();
+    if (future == DUMMY_FUTURE) {
+      CompletableFuture<Void> newFuture = new CompletableFuture<>();
+      // Need to set the future first to avoid the race condition of missing 
the watermark emission
+      // notification.
+      dataAvailableFutureRef.set(newFuture);
+      if (shouldEmitWatermark || hasException()) {
+        // There are exception after we set the new future,
+        // immediately complete the future and return.
+        dataAvailableFutureRef.set(DUMMY_FUTURE);
+        newFuture.complete(null);
+      } else {
+        LOG.debug("There is no data available, scheduling the idle reader 
checker.");
+        scheduleTask(

Review Comment:
   `start()` is called only once right after the instantiation of the reader 
object. After that, the reader main thread will block on the future returned 
from `isAvailable()` if there is no data available for read. Before the main 
thread goes to block on that future, it sets the check thread to wake itself up 
after `SLEEP_ON_IDLE_MS` by completing that future. So the reader main thread 
will start to `poll()` again to check if there is more data available. 
   
   Logically speaking this is as if the reader main thread calls 
`Thread.sleep(SLEEP_ON_IDLE_MS)`, except that we don't want to hijack the 
reader main thread in this case, because the reader main thread is also the 
task main thread and may need to do other things (e.g. checkpointing) even if 
there is no data available for reading.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates 
{@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link 
BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link 
Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with 
AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new 
CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread 
may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + 
context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum 
FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = 
FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming 
mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire 
source split.
+            // So, in case of failure, all the data from this split will be 
consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, 
reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), 
checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no 
more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);

Review Comment:
   In general, there is no guarantee that `notifyNoMoreSplits` is always going 
to be invoked, or when it will be invoked. 
   
   I think we need to complete the future in both `addSplits()` and 
`notifyNoMoreSplits()`. Basically if a reader goes to sleep because it has 
exhausted all the splits, it needs to be waken up either when a new split is 
assigned or NoMoreSplits notification is received.
   
   Our current `SplitEnumerator` implementation follows a static splits 
assignment approach, so it sends all the splits to a subtask at once and then 
sends the `NoMoreSplits` notification immediately. So timing wise, it seems 
that the reader can wait for the `NoMoreSplits` notification and then act. 
However, that won't work for the dynamic assignment case. For example, if a 
reader only gets one split at a time and will request another split from the 
`SplitEnumerator` after finishing reading from the current split, it has to 
wake up and poll once a split is assigned. So the `splitChangeFuture` has to be 
completed in `addSplits()`. Also, if the reader has exhausted all the splits 
and gone to sleep, it has to be waken up upon receiving `NoMoreSplits` 
notification so it can exit normally.
   
   Orthogonally, it seems the code does have a bug here. If there are live 
readers, the future returned by `isAvailableForAliveReaders()` should also be 
completed when there is a split change, either from `addSplits()` or 
`notifyNoMoreSplits()`. I'll update the patch to fix that. 



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SplitEnumerator 
SplitEnumerator}
+ * implementation that holds a Beam {@link Source} and does the following:
+ *
+ * <ul>
+ *   <li>Split the Beam {@link Source} to desired number of splits.
+ *   <li>Assign the splits to the Flink Source Reader.
+ * </ul>
+ *
+ * <p>Note that at this point, this class has a static round-robin split 
assignment strategy.
+ *
+ * @param <T> The output type of the encapsulated Beam {@link Source}.
+ */
+public class FlinkSourceSplitEnumerator<T>
+    implements SplitEnumeratorCompat<FlinkSourceSplit<T>, Map<Integer, 
List<FlinkSourceSplit<T>>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);
+  private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;
+  private final Source<T> beamSource;
+  private final PipelineOptions pipelineOptions;
+  private final int numSplits;
+  private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;
+  private boolean splitsInitialized;
+
+  public FlinkSourceSplitEnumerator(
+      SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+      Source<T> beamSource,
+      PipelineOptions pipelineOptions,
+      int numSplits) {
+    this.context = context;
+    this.beamSource = beamSource;
+    this.pipelineOptions = pipelineOptions;
+    this.numSplits = numSplits;
+    this.pendingSplits = new HashMap<>(numSplits);
+    this.splitsInitialized = false;
+  }
+
+  @Override
+  public void start() {
+    context.callAsync(
+        () -> {
+          try {
+            List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
+            Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = 
new HashMap<>();
+            int i = 0;
+            for (Source<T> beamSplitSource : beamSplitSourceList) {
+              int targetSubtask = i % context.currentParallelism();
+              List<FlinkSourceSplit<T>> splitsForTask =
+                  flinkSourceSplitsList.computeIfAbsent(
+                      targetSubtask, ignored -> new ArrayList<>());
+              splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
+              i++;
+            }
+            return flinkSourceSplitsList;
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        },
+        (sourceSplits, error) -> {
+          if (error != null) {
+            throw new RuntimeException("Failed to start source enumerator.", 
error);
+          } else {
+            pendingSplits.putAll(sourceSplits);
+            splitsInitialized = true;
+            sendPendingSplitsToSourceReaders();
+          }
+        });
+  }
+
+  @Override
+  public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+    // Not used.
+  }
+
+  @Override
+  public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
+    LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
+    List<FlinkSourceSplit<T>> splitsForSubtask =
+        pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
+    splitsForSubtask.addAll(splits);
+  }
+
+  @Override
+  public void addReader(int subtaskId) {
+    List<FlinkSourceSplit<T>> splitsForSubtask = 
pendingSplits.remove(subtaskId);
+    if (splitsForSubtask != null) {
+      assignSplitsAndLog(splitsForSubtask, subtaskId);
+      pendingSplits.remove(subtaskId);
+    } else {
+      if (splitsInitialized) {
+        LOG.info("There is no split for subtask {}. Signaling no more 
splits.", subtaskId);
+        context.signalNoMoreSplits(subtaskId);
+      }
+    }
+  }
+
+  @Override
+  public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState(long 
checkpointId) throws Exception {
+    LOG.info("Taking snapshot for checkpoint {}", checkpointId);
+    return snapshotState();
+  }
+
+  @Override
+  public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState() throws 
Exception {
+    return pendingSplits;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // NoOp
+  }
+
+  // -------------- Private helper methods ----------------------
+  private List<? extends Source<T>> splitBeamSource() throws Exception {
+    if (beamSource instanceof BoundedSource) {
+      BoundedSource<T> boundedSource = (BoundedSource<T>) beamSource;
+      long desiredSizeBytes = 
boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits;
+      return boundedSource.split(desiredSizeBytes, pipelineOptions);
+    } else if (beamSource instanceof UnboundedSource) {
+      return ((UnboundedSource<T, ?>) beamSource).split(numSplits, 
pipelineOptions);
+    } else {
+      throw new IllegalStateException("Unknown source type " + 
beamSource.getClass());
+    }
+  }
+
+  private void sendPendingSplitsToSourceReaders() {
+    Iterator<Map.Entry<Integer, List<FlinkSourceSplit<T>>>> splitIter =
+        pendingSplits.entrySet().iterator();
+    while (splitIter.hasNext()) {
+      Map.Entry<Integer, List<FlinkSourceSplit<T>>> entry = splitIter.next();
+      int readerIndex = entry.getKey();
+      int targetSubtask = readerIndex % context.currentParallelism();

Review Comment:
   Not sure if I follow the comment. The key will be in the `pendingSplits` map 
because that is an "intended assignments" regardless of whether the target 
reader has actually registered to the `SplitEnumerator` or not. But we can only 
send the "intended assignments" to a reader if that reader has registered to 
the `SplitEnumerator`.  That is what the check does.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates 
{@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link 
BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link 
Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with 
AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new 
CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread 
may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + 
context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum 
FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = 
FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming 
mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire 
source split.
+            // So, in case of failure, all the data from this split will be 
consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, 
reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), 
checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no 
more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is 
available when there
+   * are alive readers. For example, an unbounded source may not have any 
source split ready for
+   * data emission even if all the sources are still alive. Whereas for the 
bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses 
--------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws 
IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new 
ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long 
periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", 
exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods 
------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      byte[] 
getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
+    UnboundedSource<OutputT, CheckpointMarkT> source =
+        (UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
+    CheckpointMarkT checkpointMark = (CheckpointMarkT) 
reader.getCheckpointMark();
+    Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      coder.encode(checkpointMark, baos);
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
+    }
+  }
+
+  private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> 
sourceSplit)
+      throws IOException {
+    Source<T> beamSource = sourceSplit.getBeamSplitSource();
+    if (beamSource instanceof BoundedSource) {
+      return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
+    } else if (beamSource instanceof UnboundedSource) {
+      return createUnboundedSourceReader(beamSource, 
sourceSplit.getSplitState());
+    } else {
+      throw new IllegalStateException("Unknown source type " + 
beamSource.getClass());
+    }
+  }
+
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>

Review Comment:
   Same as above.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates 
{@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link 
BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link 
Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with 
AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new 
CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread 
may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + 
context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum 
FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = 
FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming 
mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire 
source split.
+            // So, in case of failure, all the data from this split will be 
consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, 
reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), 
checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no 
more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is 
available when there
+   * are alive readers. For example, an unbounded source may not have any 
source split ready for
+   * data emission even if all the sources are still alive. Whereas for the 
bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses 
--------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws 
IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new 
ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long 
periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", 
exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods 
------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>

Review Comment:
   This method uses the `OutputT` from the enclosing class. So it is not 
static. We can make it static, but the benefit might be limited given we don't 
expect tons of reader instances. And it also makes the code somewhat less 
readable.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates 
{@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link 
BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link 
Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with 
AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new 
CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread 
may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + 
context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum 
FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = 
FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming 
mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire 
source split.
+            // So, in case of failure, all the data from this split will be 
consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, 
reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), 
checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no 
more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is 
available when there
+   * are alive readers. For example, an unbounded source may not have any 
source split ready for
+   * data emission even if all the sources are still alive. Whereas for the 
bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses 
--------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws 
IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new 
ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long 
periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, 
TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", 
exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods 
------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      byte[] 
getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
+    UnboundedSource<OutputT, CheckpointMarkT> source =
+        (UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
+    CheckpointMarkT checkpointMark = (CheckpointMarkT) 
reader.getCheckpointMark();
+    Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      coder.encode(checkpointMark, baos);
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
+    }
+  }
+
+  private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> 
sourceSplit)

Review Comment:
   Similar to above, this method uses the type `T` from the enclosing class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to