tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r680576567



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * <p>This reader processes splits from a sequence of sources as determined by 
the enumerator. The
+ * current source is provided with {@link SwitchSourceEvent} and the reader 
does not require upfront
+ * knowledge of the number and order of sources. At a given point in time one 
underlying reader is
+ * active.
+ *
+ * <p>When the underlying reader has consumed all input for a source, {@link 
HybridSourceReader}
+ * sends {@link SourceReaderFinishedEvent} to the coordinator.
+ *
+ * <p>This reader does not make assumptions about the order in which sources 
are activated. When
+ * recovering from a checkpoint it may start processing splits for a previous 
source, which is
+ * indicated via {@link SwitchSourceEvent}.
+ */
+public class HybridSourceReader<T> implements SourceReader<T, 
HybridSourceSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private final SourceReaderContext readerContext;
+    private final Map<Integer, Source> switchedSources;
+    private int currentSourceIndex = -1;
+    private boolean isFinalSource;
+    private SourceReader<T, ? extends SourceSplit> currentReader;
+    private CompletableFuture<Void> availabilityFuture;
+    private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
+
+    public HybridSourceReader(
+            SourceReaderContext readerContext, Map<Integer, Source> 
switchedSources) {
+        this.readerContext = readerContext;
+        this.switchedSources = switchedSources;
+    }
+
+    @Override
+    public void start() {
+        // underlying reader starts on demand with split assignment
+        int initialSourceIndex = currentSourceIndex;
+        if (!restoredSplits.isEmpty()) {
+            initialSourceIndex = restoredSplits.get(0).sourceIndex() - 1;
+        }
+        readerContext.sendSourceEventToCoordinator(
+                new SourceReaderFinishedEvent(initialSourceIndex));
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput output) throws Exception {
+        if (currentReader == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        InputStatus status = currentReader.pollNext(output);
+        if (status == InputStatus.END_OF_INPUT) {
+            // trap END_OF_INPUT unless all sources have finished
+            LOG.info(
+                    "End of input subtask={} sourceIndex={} {}",
+                    readerContext.getIndexOfSubtask(),
+                    currentSourceIndex,
+                    currentReader);
+            // Signal the coordinator that this reader has consumed all input 
and the
+            // next source can potentially be activated.
+            readerContext.sendSourceEventToCoordinator(
+                    new SourceReaderFinishedEvent(currentSourceIndex));
+            if (!isFinalSource) {
+                // More splits may arrive for a subsequent reader.
+                // InputStatus.NOTHING_AVAILABLE suspends poll, requires 
completion of the
+                // availability future after receiving more splits to resume.
+                if (availabilityFuture != null && availabilityFuture.isDone()) 
{
+                    // reset to avoid continued polling
+                    availabilityFuture = new CompletableFuture();
+                }
+                return InputStatus.NOTHING_AVAILABLE;
+            }
+        }
+        return status;
+    }
+
+    @Override
+    public List<HybridSourceSplit> snapshotState(long checkpointId) {
+        List<? extends SourceSplit> state =
+                currentReader != null
+                        ? currentReader.snapshotState(checkpointId)
+                        : Collections.emptyList();
+        return HybridSourceSplit.wrapSplits(currentSourceIndex, state);
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        // track future to resume reader after source switch
+        if (currentReader != null) {
+            return availabilityFuture = currentReader.isAvailable();
+        } else {
+            LOG.debug("Suspending pollNext due to no underlying reader");
+            return availabilityFuture = new CompletableFuture<>();

Review comment:
       Done, it is now created during initialization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to