xinyuiscool commented on code in PR #25525:
URL: https://github.com/apache/beam/pull/25525#discussion_r1126819727


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader 
SourceReader} implementation
+ * that reads from the assigned {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link 
org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
+ * UnboundedReaders}.
+ *
+ * <p>This reader consumes all the assigned source splits concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ *     org.apache.beam.sdk.io.UnboundedSource.UnboundedReader UnboundedReader}.
+ */
+public class FlinkUnboundedSourceReader<T>
+    extends FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUnboundedSourceReader.class);
+  // This name is defined in FLIP-33.
+  @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = 
"pendingBytes";
+  private static final long SLEEP_ON_IDLE_MS = 50L;
+  private final AtomicReference<CompletableFuture<Void>> 
dataAvailableFutureRef;
+  private final List<ReaderAndOutput> readers;
+  private int currentReaderIndex;
+  private volatile boolean shouldEmitWatermark;
+
+  public FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> 
timestampExtractor) {
+    super(context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @VisibleForTesting
+  protected FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      ScheduledExecutorService executor,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> 
timestampExtractor) {
+    super(executor, context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @Override
+  public void start() {
+    createPendingBytesGauge(context);
+    Long watermarkInterval =
+        
pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
+    if (watermarkInterval != null) {
+      scheduleTaskAtFixedRate(
+          () -> {
+            // Set the watermark emission flag first.
+            shouldEmitWatermark = true;
+            // Wake up the main thread if necessary.
+            CompletableFuture<Void> f = dataAvailableFutureRef.get();
+            if (f != DUMMY_FUTURE) {
+              f.complete(null);
+            }
+          },
+          watermarkInterval,
+          watermarkInterval);
+    } else {
+      LOG.warn("AutoWatermarkInterval is not set, watermarks won't be 
emitted.");
+    }
+  }
+
+  @Override
+  public InputStatus 
pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output)
+      throws Exception {
+    checkExceptionAndMaybeThrow();
+    maybeEmitWatermark();
+    maybeCreateReaderForNewSplits();
+
+    ReaderAndOutput reader = nextReaderWithData();
+    if (reader != null) {
+      emitRecord(reader, output);
+      return InputStatus.MORE_AVAILABLE;
+    } else {
+      LOG.trace("No data available for now.");
+      return InputStatus.NOTHING_AVAILABLE;
+    }
+  }
+
+  /**
+   * Check whether there are data available from alive readers. If not, set a 
future and wait for
+   * the periodically running wake-up task to complete that future when the 
check interval passes.
+   * This method is only called by the main thread, which is the only thread 
writing to the future
+   * ref. Note that for UnboundedSource, because the splits never finishes, 
there are always alive
+   * readers after the first split assigment. Hence, the return value of {@link
+   * FlinkSourceReaderBase#isAvailable()} will effectively be determined by 
this method after the
+   * first split assignment.
+   */
+  @Override
+  protected CompletableFuture<Void> isAvailableForAliveReaders() {
+    CompletableFuture<Void> future = dataAvailableFutureRef.get();
+    if (future == DUMMY_FUTURE) {
+      CompletableFuture<Void> newFuture = new CompletableFuture<>();
+      // Need to set the future first to avoid the race condition of missing 
the watermark emission
+      // notification.
+      dataAvailableFutureRef.set(newFuture);
+      if (shouldEmitWatermark || hasException()) {
+        // There are exception after we set the new future,
+        // immediately complete the future and return.
+        dataAvailableFutureRef.set(DUMMY_FUTURE);
+        newFuture.complete(null);
+      } else {
+        LOG.debug("There is no data available, scheduling the idle reader 
checker.");
+        scheduleTask(

Review Comment:
   Thanks for the explanation. So here is the intention is to put a wait for 
the next poll. I was thinking somewhere dataAvailableFuture will be set once 
the data is available. Looks like it's not used that way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to