StephanEwen commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r666900347



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hybrid source that switches underlying sources based on configured source 
chain.
+ *
+ * <pre>{@code
+ * FileSource<String> fileSource = null;
+ * HybridSource<String> hybridSource =
+ *     new HybridSourceBuilder<String, ContinuousFileSplitEnumerator>()
+ *         .addSource(fileSource) // fixed start position
+ *         .addSource(
+ *             (enumerator) -> {
+ *               // instantiate Kafka source based on enumerator

Review comment:
       Which enumerator is that? The FileSource's enumerator?
   Could the comment clarify that?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. 
*/
+public class HybridSourceEnumeratorStateSerializer
+        implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    private final Map<Integer, SimpleVersionedSerializer<Object>> 
cachedSerializers;
+    private final Map<Integer, Source> switchedSources;
+
+    public HybridSourceEnumeratorStateSerializer(Map<Integer, Source> 
switchedSources) {
+        this.switchedSources = switchedSources;
+        this.cachedSerializers = new HashMap<>();
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(HybridSourceEnumeratorState enumState) throws 
IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(enumState.getCurrentSourceIndex());
+            SimpleVersionedSerializer<Object> serializer =
+                    serializerOf(enumState.getCurrentSourceIndex());
+            out.writeInt(serializer.getVersion());
+            byte[] enumStateBytes = 
serializer.serialize(enumState.getWrappedState());
+            out.writeInt(enumStateBytes.length);
+            out.write(enumStateBytes);
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public HybridSourceEnumeratorState deserialize(int version, byte[] 
serialized)
+            throws IOException {
+        if (version == 0) {
+            return deserializeV0(serialized);
+        }
+        throw new AssertionError(

Review comment:
       I get where this comes from, but it is different than in most other 
places in the Flink code. Usually, we throw an `IOException` in that place. The 
reason is that this isn't so much a static code assertion that is wrong, but 
more an incompatible or corrupt input that was read.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. 
*/
+public class HybridSourceEnumeratorStateSerializer
+        implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    private final Map<Integer, SimpleVersionedSerializer<Object>> 
cachedSerializers;
+    private final Map<Integer, Source> switchedSources;
+
+    public HybridSourceEnumeratorStateSerializer(Map<Integer, Source> 
switchedSources) {
+        this.switchedSources = switchedSources;
+        this.cachedSerializers = new HashMap<>();
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(HybridSourceEnumeratorState enumState) throws 
IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(enumState.getCurrentSourceIndex());
+            SimpleVersionedSerializer<Object> serializer =
+                    serializerOf(enumState.getCurrentSourceIndex());
+            out.writeInt(serializer.getVersion());
+            byte[] enumStateBytes = 
serializer.serialize(enumState.getWrappedState());
+            out.writeInt(enumStateBytes.length);
+            out.write(enumStateBytes);

Review comment:
       Nit: We usually `flush()` in such places, to not implicitly rely on the 
fact that the `DataOutputStream` doesn't buffer.
   Alternatively, you can use Flink's `DataOutputSerializer`, which is even a 
tad bit more efficient.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * <p>This reader processes splits from a sequence of sources as determined by 
the enumerator. The
+ * current source is provided with {@link SwitchSourceEvent} and the reader 
does not require upfront
+ * knowledge of the number and order of sources. At a given point in time one 
underlying reader is
+ * active.
+ *
+ * <p>When the underlying reader has consumed all input for a source, {@link 
HybridSourceReader}
+ * sends {@link SourceReaderFinishedEvent} to the coordinator.
+ *
+ * <p>This reader does not make assumptions about the order in which sources 
are activated. When
+ * recovering from a checkpoint it may start processing splits for a previous 
source, which is
+ * indicated via {@link SwitchSourceEvent}.
+ */
+public class HybridSourceReader<T> implements SourceReader<T, 
HybridSourceSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private final SourceReaderContext readerContext;
+    private final Map<Integer, Source> switchedSources;
+    private int currentSourceIndex = -1;
+    private boolean isFinalSource;
+    private SourceReader<T, ? extends SourceSplit> currentReader;
+    private CompletableFuture<Void> availabilityFuture;
+    private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
+
+    public HybridSourceReader(
+            SourceReaderContext readerContext, Map<Integer, Source> 
switchedSources) {
+        this.readerContext = readerContext;
+        this.switchedSources = switchedSources;
+    }
+
+    @Override
+    public void start() {
+        // underlying reader starts on demand with split assignment
+        int initialSourceIndex = currentSourceIndex;
+        if (!restoredSplits.isEmpty()) {
+            initialSourceIndex = restoredSplits.get(0).sourceIndex() - 1;
+        }
+        readerContext.sendSourceEventToCoordinator(
+                new SourceReaderFinishedEvent(initialSourceIndex));
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput output) throws Exception {
+        if (currentReader == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        InputStatus status = currentReader.pollNext(output);
+        if (status == InputStatus.END_OF_INPUT) {
+            // trap END_OF_INPUT unless all sources have finished
+            LOG.info(
+                    "End of input subtask={} sourceIndex={} {}",
+                    readerContext.getIndexOfSubtask(),
+                    currentSourceIndex,
+                    currentReader);
+            // Signal the coordinator that this reader has consumed all input 
and the
+            // next source can potentially be activated.
+            readerContext.sendSourceEventToCoordinator(
+                    new SourceReaderFinishedEvent(currentSourceIndex));
+            if (!isFinalSource) {
+                // More splits may arrive for a subsequent reader.
+                // InputStatus.NOTHING_AVAILABLE suspends poll, requires 
completion of the
+                // availability future after receiving more splits to resume.
+                return InputStatus.NOTHING_AVAILABLE;

Review comment:
       I think you are missing something here. The availability future would 
need to be reset, because if it is complete, the mailbox will keep polling all 
the time again and again.
   
   ```java
   if (availabilityFuture.isComplete()) {
       availabilityFuture = new CompletableFuture();
   }
   ```
   or
   ```java
   availabilityFuture.complete(null); // ensure anyone that previously grabbed 
this future will be woken up
   availabilityFuture = new CompletableFuture();
   
   I am wondering if we should even add two utility methods like this to the 
`SourceReaderBase`: `moveToAvailable()` and `moveToNothingAvailable()` which 
update the `availabilityFuture`.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hybrid source that switches underlying sources based on configured source 
chain.
+ *
+ * <pre>{@code
+ * FileSource<String> fileSource = null;
+ * HybridSource<String> hybridSource =
+ *     new HybridSourceBuilder<String, ContinuousFileSplitEnumerator>()
+ *         .addSource(fileSource) // fixed start position
+ *         .addSource(
+ *             (enumerator) -> {
+ *               // instantiate Kafka source based on enumerator
+ *               KafkaSource<String> kafkaSource = 
createKafkaSource(enumerator);
+ *               return kafkaSource;
+ *             }, Boundedness.CONTINUOUS_UNBOUNDED)
+ *         .build();
+ * }</pre>
+ */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {
+
+    private final List<SourceListEntry> sources;
+    // sources are populated per subtask at switch time
+    private final Map<Integer, Source> switchedSources;
+
+    /** Protected for subclass, use {@link #builder(Source)} to construct 
source. */
+    protected HybridSource(List<SourceListEntry> sources) {
+        Preconditions.checkArgument(!sources.isEmpty());
+        for (int i = 0; i < sources.size() - 1; i++) {
+            Preconditions.checkArgument(
+                    Boundedness.BOUNDED.equals(sources.get(i).boundedness),
+                    "All sources except the final source need to be bounded.");
+        }
+        this.sources = sources;
+        this.switchedSources = new HashMap<>(sources.size());
+    }
+
+    /** Builder for {@link HybridSource}. */
+    public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, 
EnumT> builder(
+            Source<T, ?, ?> firstSource) {
+        HybridSourceBuilder<T, EnumT> builder = new HybridSourceBuilder<>();
+        return builder.addSource(firstSource);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return sources.get(sources.size() - 1).boundedness;
+    }
+
+    @Override
+    public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        return new HybridSourceReader(readerContext, switchedSources);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext) {
+        return new HybridSourceSplitEnumerator(enumContext, sources, 0, 
switchedSources);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext,
+            HybridSourceEnumeratorState checkpoint)
+            throws Exception {
+        // TODO: restore underlying enumerator
+        return new HybridSourceSplitEnumerator(
+                enumContext, sources, checkpoint.getCurrentSourceIndex(), 
switchedSources);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
+        return new HybridSourceSplitSerializer(switchedSources);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceEnumeratorState>
+            getEnumeratorCheckpointSerializer() {
+        return new HybridSourceEnumeratorStateSerializer(switchedSources);
+    }
+
+    /**
+     * Factory for underlying sources of {@link HybridSource}.
+     *
+     * <p>This factory permits building of a source at graph construction time 
or deferred at switch
+     * time. Provides the ability to set a start position in any way a 
specific source allows.
+     * Future convenience could be built on top of it, for example a default 
implementation that
+     * recognizes optional interfaces to transfer position in a universal 
format.
+     *
+     * <p>Called when the current enumerator has finished and before the next 
enumerator is created.
+     * The enumerator end state can thus be used to set the next source's 
start start position. Only
+     * required for dynamic position transfer at time of switching.
+     *
+     * <p>If start position is known at job submission time, the source can be 
constructed in the
+     * entry point and simply wrapped into the factory, providing the benefit 
of validation during
+     * submission.
+     */
+    public interface SourceFactory<T, SourceT extends Source, FromEnumT 
extends SplitEnumerator>

Review comment:
       Annotate with `@FunctionalInterface`?
   (If we want to guarantee lambdas work for this).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to