This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c72aa4b97848625f23bac331b8e916c6a3e2acd3
Author: Thomas Weise <[email protected]>
AuthorDate: Fri Feb 26 19:32:32 2021 -0800

    [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
---
 .../connector/base/source/hybrid/HybridSource.java | 257 ++++++++++++++
 .../source/hybrid/HybridSourceEnumeratorState.java |  38 ++
 .../HybridSourceEnumeratorStateSerializer.java     | 106 ++++++
 .../base/source/hybrid/HybridSourceReader.java     | 254 ++++++++++++++
 .../base/source/hybrid/HybridSourceSplit.java      |  94 +++++
 .../source/hybrid/HybridSourceSplitEnumerator.java | 390 +++++++++++++++++++++
 .../source/hybrid/HybridSourceSplitSerializer.java |  98 ++++++
 .../source/hybrid/SourceReaderFinishedEvent.java   |  49 +++
 .../base/source/hybrid/SwitchSourceEvent.java      |  62 ++++
 .../base/source/hybrid/HybridSourceITCase.java     | 246 +++++++++++++
 .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++++++++++
 .../hybrid/HybridSourceSplitEnumeratorTest.java    | 252 +++++++++++++
 .../hybrid/HybridSourceSplitSerializerTest.java    |  52 +++
 .../base/source/hybrid/HybridSourceTest.java       | 115 ++++++
 14 files changed, 2194 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
new file mode 100644
index 0000000..e3d66de
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hybrid source that switches underlying sources based on configured source 
chain.
+ *
+ * <p>A simple example with FileSource and KafkaSource with fixed Kafka start 
position:
+ *
+ * <pre>{@code
+ * FileSource<String> fileSource =
+ *   FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir)).build();
+ * KafkaSource<String> kafkaSource =
+ *           KafkaSource.<String>builder()
+ *                   .setBootstrapServers("localhost:9092")
+ *                   .setGroupId("MyGroup")
+ *                   .setTopics(Arrays.asList("quickstart-events"))
+ *                   .setDeserializer(
+ *                           
KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
+ *                   .setStartingOffsets(OffsetsInitializer.earliest())
+ *                   .build();
+ * HybridSource<String> hybridSource =
+ *           HybridSource.builder(fileSource)
+ *                   .addSource(kafkaSource)
+ *                   .build();
+ * }</pre>
+ *
+ * <p>A more complex example with Kafka start position derived from previous 
source:
+ *
+ * <pre>{@code
+ * HybridSource<String> hybridSource =
+ *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
+ *         .addSource(
+ *             switchContext -> {
+ *               StaticFileSplitEnumerator previousEnumerator =
+ *                   switchContext.getPreviousEnumerator();
+ *               // how to get timestamp depends on specific enumerator
+ *               long timestamp = previousEnumerator.getEndTimestamp();
+ *               OffsetsInitializer offsets =
+ *                   OffsetsInitializer.timestamp(timestamp);
+ *               KafkaSource<String> kafkaSource =
+ *                   KafkaSource.<String>builder()
+ *                       .setBootstrapServers("localhost:9092")
+ *                       .setGroupId("MyGroup")
+ *                       .setTopics(Arrays.asList("quickstart-events"))
+ *                       .setDeserializer(
+ *                           
KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
+ *                       .setStartingOffsets(offsets)
+ *                       .build();
+ *               return kafkaSource;
+ *             },
+ *             Boundedness.CONTINUOUS_UNBOUNDED)
+ *         .build();
+ * }</pre>
+ */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {
+
+    private final List<SourceListEntry> sources;
+    // sources are populated per subtask at switch time
+    private final Map<Integer, Source> switchedSources;
+
+    /** Protected for subclass, use {@link #builder(Source)} to construct 
source. */
+    protected HybridSource(List<SourceListEntry> sources) {
+        Preconditions.checkArgument(!sources.isEmpty());
+        this.sources = sources;
+        this.switchedSources = new HashMap<>(sources.size());
+    }
+
+    /** Builder for {@link HybridSource}. */
+    public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, 
EnumT> builder(
+            Source<T, ?, ?> firstSource) {
+        HybridSourceBuilder<T, EnumT> builder = new HybridSourceBuilder<>();
+        return builder.addSource(firstSource);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return sources.get(sources.size() - 1).boundedness;
+    }
+
+    @Override
+    public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        return new HybridSourceReader(readerContext, switchedSources);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext) {
+        return new HybridSourceSplitEnumerator(enumContext, sources, 0, 
switchedSources, null);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext,
+            HybridSourceEnumeratorState checkpoint)
+            throws Exception {
+        return new HybridSourceSplitEnumerator(
+                enumContext,
+                sources,
+                checkpoint.getCurrentSourceIndex(),
+                switchedSources,
+                checkpoint.getWrappedState());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
+        return new HybridSourceSplitSerializer(switchedSources);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceEnumeratorState>
+            getEnumeratorCheckpointSerializer() {
+        return new HybridSourceEnumeratorStateSerializer(switchedSources);
+    }
+
+    /**
+     * Context provided to source factory.
+     *
+     * <p>To derive a start position at switch time, the source can be 
initialized from context of
+     * the previous enumerator. A specific enumerator implementation may carry 
state such as an end
+     * timestamp, that can be used to derive the start position of the next 
source.
+     *
+     * <p>Currently only the previous enumerator is exposed. The context 
interface allows for
+     * backward compatible extension, i.e. additional information about the 
previous source can be
+     * supplied in the future.
+     */
+    public interface SourceSwitchContext<EnumT> {
+        EnumT getPreviousEnumerator();
+    }
+
+    /**
+     * Factory for underlying sources of {@link HybridSource}.
+     *
+     * <p>This factory permits building of a source at graph construction time 
or deferred at switch
+     * time. Provides the ability to set a start position in any way a 
specific source allows.
+     * Future convenience could be built on top of it, for example a default 
implementation that
+     * recognizes optional interfaces to transfer position in a universal 
format.
+     *
+     * <p>Called when the current enumerator has finished. The previous 
source's final state can
+     * thus be used to construct the next source, as required for dynamic 
position transfer at time
+     * of switching.
+     *
+     * <p>If start position is known at job submission time, the source can be 
constructed in the
+     * entry point and simply wrapped into the factory, providing the benefit 
of validation during
+     * submission.
+     */
+    @FunctionalInterface
+    public interface SourceFactory<
+                    T, SourceT extends Source<T, ?, ?>, FromEnumT extends 
SplitEnumerator>
+            extends Serializable {
+        SourceT create(SourceSwitchContext<FromEnumT> context);
+    }
+
+    private static class PassthroughSourceFactory<
+                    T, SourceT extends Source<T, ?, ?>, FromEnumT extends 
SplitEnumerator>
+            implements SourceFactory<T, SourceT, FromEnumT> {
+
+        private final SourceT source;
+
+        private PassthroughSourceFactory(SourceT source) {
+            this.source = source;
+        }
+
+        @Override
+        public SourceT create(SourceSwitchContext<FromEnumT> context) {
+            return source;
+        }
+    }
+
+    /** Entry for list of underlying sources. */
+    static class SourceListEntry implements Serializable {
+        protected final SourceFactory factory;
+        protected final Boundedness boundedness;
+
+        private SourceListEntry(SourceFactory factory, Boundedness 
boundedness) {
+            this.factory = Preconditions.checkNotNull(factory);
+            this.boundedness = Preconditions.checkNotNull(boundedness);
+        }
+
+        static SourceListEntry of(SourceFactory configurer, Boundedness 
boundedness) {
+            return new SourceListEntry(configurer, boundedness);
+        }
+    }
+
+    /** Builder for HybridSource. */
+    public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
+            implements Serializable {
+        private final List<SourceListEntry> sources;
+
+        public HybridSourceBuilder() {
+            sources = new ArrayList<>();
+        }
+
+        /** Add pre-configured source (without switch time modification). */
+        public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, 
?, ?>>
+                HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
+            return addSource(new PassthroughSourceFactory<>(source), 
source.getBoundedness());
+        }
+
+        /** Add source with deferred instantiation based on previous 
enumerator. */
+        public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, 
?, ?>>
+                HybridSourceBuilder<T, ToEnumT> addSource(
+                        SourceFactory<T, NextSourceT, ? super EnumT> 
sourceFactory,
+                        Boundedness boundedness) {
+            if (!sources.isEmpty()) {
+                Preconditions.checkArgument(
+                        Boundedness.BOUNDED.equals(sources.get(sources.size() 
- 1).boundedness),
+                        "All sources except the final source need to be 
bounded.");
+            }
+            ClosureCleaner.clean(
+                    sourceFactory, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            sources.add(SourceListEntry.of(sourceFactory, boundedness));
+            return (HybridSourceBuilder) this;
+        }
+
+        /** Build the source. */
+        public HybridSource<T> build() {
+            return new HybridSource(sources);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
new file mode 100644
index 0000000..2da99ee
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+/** The state of hybrid source enumerator. */
+public class HybridSourceEnumeratorState {
+    private final int currentSourceIndex;
+    private final Object wrappedState;
+
+    HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) {
+        this.currentSourceIndex = currentSourceIndex;
+        this.wrappedState = wrappedState;
+    }
+
+    public int getCurrentSourceIndex() {
+        return this.currentSourceIndex;
+    }
+
+    public Object getWrappedState() {
+        return wrappedState;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
new file mode 100644
index 0000000..721e010
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. 
*/
+public class HybridSourceEnumeratorStateSerializer
+        implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    private final Map<Integer, SimpleVersionedSerializer<Object>> 
cachedSerializers;
+    private final Map<Integer, Source> switchedSources;
+
+    public HybridSourceEnumeratorStateSerializer(Map<Integer, Source> 
switchedSources) {
+        this.switchedSources = switchedSources;
+        this.cachedSerializers = new HashMap<>();
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(HybridSourceEnumeratorState enumState) throws 
IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(enumState.getCurrentSourceIndex());
+            SimpleVersionedSerializer<Object> serializer =
+                    serializerOf(enumState.getCurrentSourceIndex());
+            out.writeInt(serializer.getVersion());
+            byte[] enumStateBytes = 
serializer.serialize(enumState.getWrappedState());
+            out.writeInt(enumStateBytes.length);
+            out.write(enumStateBytes);
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public HybridSourceEnumeratorState deserialize(int version, byte[] 
serialized)
+            throws IOException {
+        if (version == 0) {
+            return deserializeV0(serialized);
+        }
+        throw new IOException(
+                String.format(
+                        "The bytes are serialized with version %d, "
+                                + "while this deserializer only supports 
version up to %d",
+                        version, CURRENT_VERSION));
+    }
+
+    private HybridSourceEnumeratorState deserializeV0(byte[] serialized) 
throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            int sourceIndex = in.readInt();
+            int nestedVersion = in.readInt();
+            int length = in.readInt();
+            byte[] nestedBytes = new byte[length];
+            in.readFully(nestedBytes);
+            Object nested = 
serializerOf(sourceIndex).deserialize(nestedVersion, nestedBytes);
+            return new HybridSourceEnumeratorState(sourceIndex, nested);
+        }
+    }
+
+    private SimpleVersionedSerializer<Object> serializerOf(int sourceIndex) {
+        return cachedSerializers.computeIfAbsent(
+                sourceIndex,
+                (k -> {
+                    Source source =
+                            Preconditions.checkNotNull(
+                                    switchedSources.get(k),
+                                    "Source for index=%s not available",
+                                    sourceIndex);
+                    return source.getEnumeratorCheckpointSerializer();
+                }));
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
new file mode 100644
index 0000000..cdd2e8b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * <p>This reader processes splits from a sequence of sources as determined by 
the enumerator. The
+ * current source is provided with {@link SwitchSourceEvent} and the reader 
does not require upfront
+ * knowledge of the number and order of sources. At a given point in time one 
underlying reader is
+ * active.
+ *
+ * <p>When the underlying reader has consumed all input for a source, {@link 
HybridSourceReader}
+ * sends {@link SourceReaderFinishedEvent} to the coordinator.
+ *
+ * <p>This reader does not make assumptions about the order in which sources 
are activated. When
+ * recovering from a checkpoint it may start processing splits for a previous 
source, which is
+ * indicated via {@link SwitchSourceEvent}.
+ */
+public class HybridSourceReader<T> implements SourceReader<T, 
HybridSourceSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private final SourceReaderContext readerContext;
+    private final Map<Integer, Source> switchedSources;
+    private int currentSourceIndex = -1;
+    private boolean isFinalSource;
+    private SourceReader<T, ? extends SourceSplit> currentReader;
+    private CompletableFuture<Void> availabilityFuture = new 
CompletableFuture<>();
+    private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
+
+    public HybridSourceReader(
+            SourceReaderContext readerContext, Map<Integer, Source> 
switchedSources) {
+        this.readerContext = readerContext;
+        this.switchedSources = switchedSources;
+    }
+
+    @Override
+    public void start() {
+        // underlying reader starts on demand with split assignment
+        int initialSourceIndex = currentSourceIndex;
+        if (!restoredSplits.isEmpty()) {
+            initialSourceIndex = restoredSplits.get(0).sourceIndex() - 1;
+        }
+        readerContext.sendSourceEventToCoordinator(
+                new SourceReaderFinishedEvent(initialSourceIndex));
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput output) throws Exception {
+        if (currentReader == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        InputStatus status = currentReader.pollNext(output);
+        if (status == InputStatus.END_OF_INPUT) {
+            // trap END_OF_INPUT unless all sources have finished
+            LOG.info(
+                    "End of input subtask={} sourceIndex={} {}",
+                    readerContext.getIndexOfSubtask(),
+                    currentSourceIndex,
+                    currentReader);
+            // Signal the coordinator that this reader has consumed all input 
and the
+            // next source can potentially be activated.
+            readerContext.sendSourceEventToCoordinator(
+                    new SourceReaderFinishedEvent(currentSourceIndex));
+            if (!isFinalSource) {
+                // More splits may arrive for a subsequent reader.
+                // InputStatus.NOTHING_AVAILABLE suspends poll, requires 
completion of the
+                // availability future after receiving more splits to resume.
+                if (availabilityFuture.isDone()) {
+                    // reset to avoid continued polling
+                    availabilityFuture = new CompletableFuture();
+                }
+                return InputStatus.NOTHING_AVAILABLE;
+            }
+        }
+        return status;
+    }
+
+    @Override
+    public List<HybridSourceSplit> snapshotState(long checkpointId) {
+        List<? extends SourceSplit> state =
+                currentReader != null
+                        ? currentReader.snapshotState(checkpointId)
+                        : Collections.emptyList();
+        return HybridSourceSplit.wrapSplits(currentSourceIndex, state);
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return availabilityFuture;
+    }
+
+    @Override
+    public void addSplits(List<HybridSourceSplit> splits) {
+        LOG.info(
+                "Adding splits subtask={} sourceIndex={} currentReader={} {}",
+                readerContext.getIndexOfSubtask(),
+                currentSourceIndex,
+                currentReader,
+                splits);
+        if (currentSourceIndex < 0) {
+            // splits added back during reader recovery
+            restoredSplits.addAll(splits);
+        } else {
+            List<SourceSplit> realSplits = new ArrayList<>(splits.size());
+            for (HybridSourceSplit split : splits) {
+                Preconditions.checkState(
+                        split.sourceIndex() == currentSourceIndex,
+                        "Split %s while current source is %s",
+                        split,
+                        currentSourceIndex);
+                realSplits.add(split.getWrappedSplit());
+            }
+            currentReader.addSplits((List) realSplits);
+        }
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {
+        if (currentReader != null) {
+            currentReader.notifyNoMoreSplits();
+        }
+        LOG.debug(
+                "No more splits for subtask={} sourceIndex={} 
currentReader={}",
+                readerContext.getIndexOfSubtask(),
+                currentSourceIndex,
+                currentReader);
+    }
+
+    @Override
+    public void handleSourceEvents(SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SwitchSourceEvent) {
+            SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
+            LOG.info(
+                    "Switch source event: subtask={} sourceIndex={} source={}",
+                    readerContext.getIndexOfSubtask(),
+                    sse.sourceIndex(),
+                    sse.source());
+            switchedSources.put(sse.sourceIndex(), sse.source());
+            setCurrentReader(sse.sourceIndex());
+            isFinalSource = sse.isFinalSource();
+            if (!availabilityFuture.isDone()) {
+                // continue polling
+                availabilityFuture.complete(null);
+            }
+        } else {
+            currentReader.handleSourceEvents(sourceEvent);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (currentReader != null) {
+            currentReader.close();
+        }
+        LOG.debug(
+                "Reader closed: subtask={} sourceIndex={} currentReader={}",
+                readerContext.getIndexOfSubtask(),
+                currentSourceIndex,
+                currentReader);
+    }
+
+    private void setCurrentReader(int index) {
+        Preconditions.checkArgument(index != currentSourceIndex);
+        if (currentReader != null) {
+            try {
+                currentReader.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to close current reader", 
e);
+            }
+            LOG.debug(
+                    "Reader closed: subtask={} sourceIndex={} 
currentReader={}",
+                    readerContext.getIndexOfSubtask(),
+                    currentSourceIndex,
+                    currentReader);
+        }
+        // TODO: track previous readers splits till checkpoint
+        Source source =
+                Preconditions.checkNotNull(
+                        switchedSources.get(index), "Source for index=%s not 
available", index);
+        SourceReader<T, ?> reader;
+        try {
+            reader = source.createReader(readerContext);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed tp create reader", e);
+        }
+        reader.start();
+        currentSourceIndex = index;
+        currentReader = reader;
+        currentReader
+                .isAvailable()
+                .whenComplete(
+                        (result, ex) -> {
+                            if (ex == null) {
+                                availabilityFuture.complete(result);
+                            } else {
+                                availabilityFuture.completeExceptionally(ex);
+                            }
+                        });
+        LOG.debug(
+                "Reader started: subtask={} sourceIndex={} {}",
+                readerContext.getIndexOfSubtask(),
+                currentSourceIndex,
+                reader);
+        // add restored splits
+        if (!restoredSplits.isEmpty()) {
+            List<HybridSourceSplit> splits = new 
ArrayList<>(restoredSplits.size());
+            Iterator<HybridSourceSplit> it = restoredSplits.iterator();
+            while (it.hasNext()) {
+                HybridSourceSplit hybridSplit = it.next();
+                if (hybridSplit.sourceIndex() == index) {
+                    splits.add(hybridSplit);
+                    it.remove();
+                }
+            }
+            addSplits(splits);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
new file mode 100644
index 0000000..9057aa6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/** Source split that wraps the actual split type. */
+public class HybridSourceSplit implements SourceSplit {
+
+    private final SourceSplit wrappedSplit;
+    private final int sourceIndex;
+
+    public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
+        this.sourceIndex = sourceIndex;
+        this.wrappedSplit = wrappedSplit;
+    }
+
+    public int sourceIndex() {
+        return this.sourceIndex;
+    }
+
+    public SourceSplit getWrappedSplit() {
+        return wrappedSplit;
+    }
+
+    @Override
+    public String splitId() {
+        return wrappedSplit.splitId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        HybridSourceSplit that = (HybridSourceSplit) o;
+        return sourceIndex == that.sourceIndex && 
wrappedSplit.equals(that.wrappedSplit);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(wrappedSplit, sourceIndex);
+    }
+
+    @Override
+    public String toString() {
+        return "HybridSourceSplit{"
+                + "realSplit="
+                + wrappedSplit
+                + ", sourceIndex="
+                + sourceIndex
+                + '}';
+    }
+
+    public static List<HybridSourceSplit> wrapSplits(
+            int readerIndex, List<? extends SourceSplit> state) {
+        List<HybridSourceSplit> wrappedSplits = new ArrayList<>(state.size());
+        for (SourceSplit split : state) {
+            wrappedSplits.add(new HybridSourceSplit(readerIndex, split));
+        }
+        return wrappedSplits;
+    }
+
+    public static List<SourceSplit> unwrapSplits(List<HybridSourceSplit> 
splits) {
+        List<SourceSplit> unwrappedSplits = new ArrayList<>(splits.size());
+        for (HybridSourceSplit split : splits) {
+            unwrappedSplits.add(split.getWrappedSplit());
+        }
+        return unwrappedSplits;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
new file mode 100644
index 0000000..f213492
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by creating the new enumerator via 
{@link
+ * Source#createEnumerator(SplitEnumeratorContext)}. The start position can be 
fixed at pipeline
+ * construction time through the source or supplied at switch time through a 
converter function by
+ * using the end state of the previous enumerator.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. These splits may originate from a 
previous enumerator that
+ * is no longer active. In that case {@link HybridSourceSplitEnumerator} will 
suspend forwarding to
+ * the current enumerator and replay the returned splits by activating the 
previous readers. After
+ * returned splits were processed, delegation to the current underlying 
enumerator resumes.
+ */
+public class HybridSourceSplitEnumerator
+        implements SplitEnumerator<HybridSourceSplit, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final List<HybridSource.SourceListEntry> sources;
+    private final Map<Integer, Source> switchedSources;
+    // Splits that have been returned due to subtask reset
+    private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> 
pendingSplits;
+    private final Set<Integer> finishedReaders;
+    private final Map<Integer, Integer> readerSourceIndex;
+    private int currentSourceIndex;
+    private Object restoredEnumeratorState;
+    private SplitEnumerator<SourceSplit, Object> currentEnumerator;
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            List<HybridSource.SourceListEntry> sources,
+            int initialSourceIndex,
+            Map<Integer, Source> switchedSources,
+            Object restoredEnumeratorState) {
+        Preconditions.checkArgument(initialSourceIndex < sources.size());
+        this.context = context;
+        this.sources = sources;
+        this.currentSourceIndex = initialSourceIndex;
+        this.pendingSplits = new HashMap<>();
+        this.finishedReaders = new HashSet<>();
+        this.readerSourceIndex = new HashMap<>();
+        this.switchedSources = switchedSources;
+        this.restoredEnumeratorState = restoredEnumeratorState;
+    }
+
+    @Override
+    public void start() {
+        switchEnumerator();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {
+        LOG.debug(
+                "handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+                subtaskId,
+                currentSourceIndex,
+                pendingSplits);
+        Preconditions.checkState(pendingSplits.isEmpty() || 
!pendingSplits.containsKey(subtaskId));
+        currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
+    }
+
+    @Override
+    public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) {
+        LOG.debug("Adding splits back for subtask={} splits={}", subtaskId, 
splits);
+
+        // Splits returned can belong to multiple sources, after switching 
since last checkpoint
+        TreeMap<Integer, List<HybridSourceSplit>> splitsBySourceIndex = new 
TreeMap<>();
+
+        for (HybridSourceSplit split : splits) {
+            splitsBySourceIndex
+                    .computeIfAbsent(split.sourceIndex(), k -> new 
ArrayList<>())
+                    .add(split);
+        }
+
+        splitsBySourceIndex.forEach(
+                (k, splitsPerSource) -> {
+                    if (k == currentSourceIndex) {
+                        currentEnumerator.addSplitsBack(
+                                
HybridSourceSplit.unwrapSplits(splitsPerSource), subtaskId);
+                    } else {
+                        pendingSplits
+                                .computeIfAbsent(subtaskId, sourceIndex -> new 
TreeMap<>())
+                                .put(k, splitsPerSource);
+                    }
+                });
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("addReader subtaskId={}", subtaskId);
+        readerSourceIndex.remove(subtaskId);
+    }
+
+    private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) {
+        readerSourceIndex.put(subtaskId, sourceIndex);
+        Source source = 
Preconditions.checkNotNull(switchedSources.get(sourceIndex));
+        context.sendEventToSourceReader(
+                subtaskId,
+                new SwitchSourceEvent(sourceIndex, source, sourceIndex >= 
(sources.size() - 1)));
+        // send pending splits, if any
+        TreeMap<Integer, List<HybridSourceSplit>> splitsBySource = 
pendingSplits.get(subtaskId);
+        if (splitsBySource != null) {
+            List<HybridSourceSplit> splits = 
splitsBySource.remove(sourceIndex);
+            if (splits != null && !splits.isEmpty()) {
+                LOG.debug("Restoring splits to subtask={} {}", subtaskId, 
splits);
+                context.assignSplits(
+                        new 
SplitsAssignment<>(Collections.singletonMap(subtaskId, splits)));
+                context.signalNoMoreSplits(subtaskId);
+            }
+            if (splitsBySource.isEmpty()) {
+                pendingSplits.remove(subtaskId);
+            }
+        }
+
+        if (sourceIndex == currentSourceIndex) {
+            LOG.debug("adding reader subtask={} sourceIndex={}", subtaskId, 
currentSourceIndex);
+            currentEnumerator.addReader(subtaskId);
+        }
+    }
+
+    @Override
+    public HybridSourceEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+        Object enumState = currentEnumerator.snapshotState(checkpointId);
+        return new HybridSourceEnumeratorState(currentSourceIndex, enumState);
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        LOG.debug(
+                "handleSourceEvent {} subtask={} pendingSplits={}",
+                sourceEvent,
+                subtaskId,
+                pendingSplits);
+        if (sourceEvent instanceof SourceReaderFinishedEvent) {
+            SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) 
sourceEvent;
+
+            int subtaskSourceIndex =
+                    readerSourceIndex.computeIfAbsent(
+                            subtaskId,
+                            k -> {
+                                // first time we see reader after cold start 
or recovery
+                                LOG.debug(
+                                        "New reader subtask={} sourceIndex={}",
+                                        subtaskId,
+                                        srfe.sourceIndex());
+                                return srfe.sourceIndex();
+                            });
+
+            if (srfe.sourceIndex() < subtaskSourceIndex) {
+                // duplicate event
+                return;
+            }
+
+            if (subtaskSourceIndex < currentSourceIndex) {
+                subtaskSourceIndex++;
+                sendSwitchSourceEvent(subtaskId, subtaskSourceIndex);
+                return;
+            }
+
+            // track readers that have finished processing for current 
enumerator
+            finishedReaders.add(subtaskId);
+            if (finishedReaders.size() == context.currentParallelism()) {
+                LOG.debug("All readers finished, ready to switch enumerator!");
+                if (currentSourceIndex + 1 < sources.size()) {
+                    switchEnumerator();
+                    // switch all readers prior to sending split assignments
+                    for (int i = 0; i < context.currentParallelism(); i++) {
+                        sendSwitchSourceEvent(i, currentSourceIndex);
+                    }
+                }
+            }
+        } else {
+            currentEnumerator.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        currentEnumerator.close();
+    }
+
+    private void switchEnumerator() {
+
+        SplitEnumerator<SourceSplit, Object> previousEnumerator = 
currentEnumerator;
+        if (currentEnumerator != null) {
+            try {
+                currentEnumerator.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            currentEnumerator = null;
+            currentSourceIndex++;
+        }
+
+        HybridSource.SourceSwitchContext<?> switchContext =
+                new HybridSource.SourceSwitchContext<Object>() {
+                    @Override
+                    public Object getPreviousEnumerator() {
+                        return previousEnumerator;
+                    }
+                };
+
+        Source<?, ? extends SourceSplit, Object> source =
+                switchedSources.computeIfAbsent(
+                        currentSourceIndex,
+                        k -> {
+                            return 
sources.get(currentSourceIndex).factory.create(switchContext);
+                        });
+        switchedSources.put(currentSourceIndex, source);
+        SplitEnumeratorContextProxy delegatingContext =
+                new SplitEnumeratorContextProxy(currentSourceIndex, context, 
readerSourceIndex);
+        try {
+            if (restoredEnumeratorState == null) {
+                currentEnumerator = source.createEnumerator(delegatingContext);
+            } else {
+                LOG.info("Restoring enumerator for sourceIndex={}", 
currentSourceIndex);
+                currentEnumerator =
+                        source.restoreEnumerator(delegatingContext, 
restoredEnumeratorState);
+                restoredEnumeratorState = null;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create enumerator for sourceIndex=" + 
currentSourceIndex, e);
+        }
+        LOG.info("Starting enumerator for sourceIndex={}", currentSourceIndex);
+        currentEnumerator.start();
+    }
+
+    /**
+     * The {@link SplitEnumeratorContext} that is provided to the currently 
active enumerator.
+     *
+     * <p>This context is used to wrap the splits into {@link 
HybridSourceSplit} and track
+     * assignment to readers.
+     */
+    private static class SplitEnumeratorContextProxy<SplitT extends 
SourceSplit>
+            implements SplitEnumeratorContext<SplitT> {
+        private static final Logger LOG =
+                LoggerFactory.getLogger(SplitEnumeratorContextProxy.class);
+
+        private final SplitEnumeratorContext<HybridSourceSplit> realContext;
+        private final int sourceIndex;
+        private final Map<Integer, Integer> readerSourceIndex;
+
+        private SplitEnumeratorContextProxy(
+                int sourceIndex,
+                SplitEnumeratorContext<HybridSourceSplit> realContext,
+                Map<Integer, Integer> readerSourceIndex) {
+            this.realContext = realContext;
+            this.sourceIndex = sourceIndex;
+            this.readerSourceIndex = readerSourceIndex;
+        }
+
+        @Override
+        public MetricGroup metricGroup() {
+            return realContext.metricGroup();
+        }
+
+        @Override
+        public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+            realContext.sendEventToSourceReader(subtaskId, event);
+        }
+
+        @Override
+        public int currentParallelism() {
+            return realContext.currentParallelism();
+        }
+
+        @Override
+        public Map<Integer, ReaderInfo> registeredReaders() {
+            // TODO: not start enumerator until readers are ready?
+            Map<Integer, ReaderInfo> readers = realContext.registeredReaders();
+            if (readers.size() != readerSourceIndex.size()) {
+                return filterRegisteredReaders(readers);
+            }
+            Integer lastIndex = null;
+            for (Integer sourceIndex : readerSourceIndex.values()) {
+                if (lastIndex != null && lastIndex != sourceIndex) {
+                    return filterRegisteredReaders(readers);
+                }
+                lastIndex = sourceIndex;
+            }
+            return readers;
+        }
+
+        private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, 
ReaderInfo> readers) {
+            Map<Integer, ReaderInfo> readersForSource = new 
HashMap<>(readers.size());
+            for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
+                if (readerSourceIndex.get(e.getKey()) == (Integer) 
sourceIndex) {
+                    readersForSource.put(e.getKey(), e.getValue());
+                }
+            }
+            return readersForSource;
+        }
+
+        @Override
+        public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) 
{
+            Map<Integer, List<HybridSourceSplit>> wrappedAssignmentMap = new 
HashMap<>();
+            for (Map.Entry<Integer, List<SplitT>> e : 
newSplitAssignments.assignment().entrySet()) {
+                List<HybridSourceSplit> splits =
+                        HybridSourceSplit.wrapSplits(sourceIndex, 
e.getValue());
+                wrappedAssignmentMap.put(e.getKey(), splits);
+            }
+            SplitsAssignment<HybridSourceSplit> wrappedAssignments =
+                    new SplitsAssignment<>(wrappedAssignmentMap);
+            LOG.debug("Assigning splits sourceIndex={} {}", sourceIndex, 
wrappedAssignments);
+            realContext.assignSplits(wrappedAssignments);
+        }
+
+        @Override
+        public void assignSplit(SplitT split, int subtask) {
+            HybridSourceSplit wrappedSplit = new 
HybridSourceSplit(sourceIndex, split);
+            realContext.assignSplit(wrappedSplit, subtask);
+        }
+
+        @Override
+        public void signalNoMoreSplits(int subtask) {
+            realContext.signalNoMoreSplits(subtask);
+        }
+
+        @Override
+        public <T> void callAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler) {
+            realContext.callAsync(callable, handler);
+        }
+
+        @Override
+        public <T> void callAsync(
+                Callable<T> callable,
+                BiConsumer<T, Throwable> handler,
+                long initialDelay,
+                long period) {
+            realContext.callAsync(callable, handler, initialDelay, period);
+        }
+
+        @Override
+        public void runInCoordinatorThread(Runnable runnable) {
+            realContext.runInCoordinatorThread(runnable);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
new file mode 100644
index 0000000..025733c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Serializes splits by delegating to the source-indexed underlying split 
serializer. */
+public class HybridSourceSplitSerializer implements 
SimpleVersionedSerializer<HybridSourceSplit> {
+
+    final Map<Integer, SimpleVersionedSerializer<SourceSplit>> 
cachedSerializers;
+    final Map<Integer, Source> switchedSources;
+
+    public HybridSourceSplitSerializer(Map<Integer, Source> switchedSources) {
+        this.cachedSerializers = new HashMap<>();
+        this.switchedSources = switchedSources;
+    }
+
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(HybridSourceSplit split) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(split.sourceIndex());
+            out.writeInt(serializerOf(split.sourceIndex()).getVersion());
+            byte[] serializedSplit =
+                    
serializerOf(split.sourceIndex()).serialize(split.getWrappedSplit());
+            out.writeInt(serializedSplit.length);
+            out.write(serializedSplit);
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public HybridSourceSplit deserialize(int version, byte[] serialized) 
throws IOException {
+        if (version == 0) {
+            return deserializeV0(serialized);
+        }
+        throw new IOException(String.format("Invalid version %d", version));
+    }
+
+    private HybridSourceSplit deserializeV0(byte[] serialized) throws 
IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            int sourceIndex = in.readInt();
+            int nestedVersion = in.readInt();
+            int length = in.readInt();
+            byte[] splitBytes = new byte[length];
+            in.readFully(splitBytes);
+            SourceSplit split = 
serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes);
+            return new HybridSourceSplit(sourceIndex, split);
+        }
+    }
+
+    private SimpleVersionedSerializer<SourceSplit> serializerOf(int 
sourceIndex) {
+        return cachedSerializers.computeIfAbsent(
+                sourceIndex,
+                (k -> {
+                    Source source =
+                            Preconditions.checkNotNull(
+                                    switchedSources.get(k),
+                                    "Source for index=%s not available",
+                                    sourceIndex);
+                    return source.getSplitSerializer();
+                }));
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java
new file mode 100644
index 0000000..df88d41
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/**
+ * A source event sent from the HybridSourceReader to the enumerator to 
indicate that the current
+ * reader has finished and splits for the next reader can be sent.
+ */
+public class SourceReaderFinishedEvent implements SourceEvent {
+
+    private static final long serialVersionUID = 1L;
+    private final int sourceIndex;
+
+    /**
+     * Constructor.
+     *
+     * @param sourceIndex
+     */
+    public SourceReaderFinishedEvent(int sourceIndex) {
+        this.sourceIndex = sourceIndex;
+    }
+
+    public int sourceIndex() {
+        return sourceIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "SourceReaderFinishedEvent{" + "sourceIndex=" + sourceIndex + 
'}';
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java
new file mode 100644
index 0000000..e24a5ef
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/**
+ * Event sent from {@link HybridSourceSplitEnumerator} to {@link 
HybridSourceReader} to switch to
+ * the indicated reader.
+ */
+public class SwitchSourceEvent implements SourceEvent {
+
+    private static final long serialVersionUID = 1L;
+    private final int sourceIndex;
+    private final Source source;
+    private final boolean finalSource;
+
+    /**
+     * Constructor.
+     *
+     * @param sourceIndex
+     */
+    public SwitchSourceEvent(int sourceIndex, Source source, boolean 
finalSource) {
+        this.sourceIndex = sourceIndex;
+        this.source = source;
+        this.finalSource = finalSource;
+    }
+
+    public int sourceIndex() {
+        return sourceIndex;
+    }
+
+    public Source source() {
+        return source;
+    }
+
+    public boolean isFinalSource() {
+        return finalSource;
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName() + '{' + "sourceIndex=" + 
sourceIndex + '}';
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
new file mode 100644
index 0000000..d510514
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** MiniCluster-based integration test for the {@link HybridSource}. */
+public class HybridSourceITCase extends TestLogger {
+
+    // Parallelism cannot exceed number of splits, otherwise test may fail 
intermittently with:
+    // Caused by: org.apache.flink.util.FlinkException: An OperatorEvent from 
an
+    // OperatorCoordinator to a task was lost. Triggering task failover to 
ensure consistency.
+    // Event: '[NoMoreSplitEvent]', targetTask: Source: hybrid-source -> Map 
(1/4) - execution
+    // #3
+    private static final int PARALLELISM = 2;
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    // ------------------------------------------------------------------------
+    //  test cases
+    // ------------------------------------------------------------------------
+
+    /** Test the source in the happy path. */
+    @Test
+    public void testHybridSource() throws Exception {
+        testHybridSource(FailoverType.NONE, sourceWithFixedSwitchPosition());
+    }
+
+    /** Test the source in the happy path with runtime position transfer. */
+    @Test
+    public void testHybridSourceWithDynamicSwitchPosition() throws Exception {
+        testHybridSource(FailoverType.NONE, sourceWithDynamicSwitchPosition());
+    }
+
+    /** Test the source with TaskManager restart. */
+    @Test
+    public void testHybridSourceWithTaskManagerFailover() throws Exception {
+        testHybridSource(FailoverType.TM, sourceWithFixedSwitchPosition());
+    }
+
+    /** Test the source with JobManager failover. */
+    @Test
+    public void testHybridSourceWithJobManagerFailover() throws Exception {
+        testHybridSource(FailoverType.JM, sourceWithFixedSwitchPosition());
+    }
+
+    private Source sourceWithFixedSwitchPosition() {
+        int numSplits = 2;
+        int numRecordsPerSplit = EXPECTED_RESULT.size() / 4;
+        return HybridSource.builder(
+                        new MockBaseSource(numSplits, numRecordsPerSplit, 
Boundedness.BOUNDED))
+                .addSource(
+                        new MockBaseSource(numSplits, numRecordsPerSplit, 20, 
Boundedness.BOUNDED))
+                .build();
+    }
+
+    private Source sourceWithDynamicSwitchPosition() {
+        return HybridSource.builder(new MockBaseSource(2, 10, 
Boundedness.BOUNDED))
+                .addSource(
+                        (enumerator) -> {
+                            // lazily create source here
+                            return new MockBaseSource(2, 10, 20, 
Boundedness.BOUNDED);
+                        },
+                        Boundedness.BOUNDED)
+                .build();
+    }
+
+    private void testHybridSource(FailoverType failoverType, Source source) 
throws Exception {
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.setRestartStrategy(
+                FailoverType.NONE == failoverType
+                        ? RestartStrategies.noRestart()
+                        : RestartStrategies.fixedDelayRestart(1, 0));
+
+        final DataStream<Integer> stream =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"hybrid-source")
+                        .returns(Integer.class);
+
+        final DataStream<Integer> streamFailingInTheMiddleOfReading =
+                RecordCounterToFail.wrapWithFailureAfter(stream, 
EXPECTED_RESULT.size() / 2);
+
+        final ClientAndIterator<Integer> client =
+                DataStreamUtils.collectWithClient(
+                        streamFailingInTheMiddleOfReading,
+                        HybridSourceITCase.class.getSimpleName() + '-' + 
failoverType.name());
+        final JobID jobId = client.client.getJobID();
+
+        RecordCounterToFail.waitToFail();
+        triggerFailover(
+                failoverType,
+                jobId,
+                RecordCounterToFail::continueProcessing,
+                miniClusterResource.getMiniCluster());
+
+        final List<Integer> result = new ArrayList<>();
+        while (result.size() < EXPECTED_RESULT.size() && 
client.iterator.hasNext()) {
+            result.add(client.iterator.next());
+        }
+
+        verifyResult(result);
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+
+    private enum FailoverType {
+        NONE,
+        TM,
+        JM
+    }
+
+    private static void triggerFailover(
+            FailoverType type, JobID jobId, Runnable afterFailAction, 
MiniCluster miniCluster)
+            throws Exception {
+        switch (type) {
+            case NONE:
+                afterFailAction.run();
+                break;
+            case TM:
+                restartTaskManager(afterFailAction, miniCluster);
+                break;
+            case JM:
+                triggerJobManagerFailover(jobId, afterFailAction, miniCluster);
+                break;
+        }
+    }
+
+    private static void triggerJobManagerFailover(
+            JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) 
throws Exception {
+        final HaLeadershipControl haLeadershipControl = 
miniCluster.getHaLeadershipControl().get();
+        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+        afterFailAction.run();
+        haLeadershipControl.grantJobMasterLeadership(jobId).get();
+    }
+
+    private static void restartTaskManager(Runnable afterFailAction, 
MiniCluster miniCluster)
+            throws Exception {
+        miniCluster.terminateTaskManager(0).get();
+        afterFailAction.run();
+        miniCluster.startTaskManager();
+    }
+
+    // ------------------------------------------------------------------------
+    //  verification
+    // ------------------------------------------------------------------------
+    private static final List<Integer> EXPECTED_RESULT =
+            IntStream.rangeClosed(0, 39).boxed().collect(Collectors.toList());
+
+    private static void verifyResult(List<Integer> result) {
+        Collections.sort(result);
+        assertThat(result, equalTo(EXPECTED_RESULT));
+    }
+
+    // ------------------------------------------------------------------------
+    //  mini cluster failover utilities
+    // ------------------------------------------------------------------------
+
+    private static class RecordCounterToFail {
+
+        private static AtomicInteger records;
+        private static CompletableFuture<Void> fail;
+        private static CompletableFuture<Void> continueProcessing;
+
+        private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> 
stream, int failAfter) {
+
+            records = new AtomicInteger();
+            fail = new CompletableFuture<>();
+            continueProcessing = new CompletableFuture<>();
+            return stream.map(
+                    record -> {
+                        final boolean halfOfInputIsRead = 
records.incrementAndGet() > failAfter;
+                        final boolean notFailedYet = !fail.isDone();
+                        if (notFailedYet && halfOfInputIsRead) {
+                            fail.complete(null);
+                            continueProcessing.get();
+                        }
+                        return record;
+                    });
+        }
+
+        private static void waitToFail() throws ExecutionException, 
InterruptedException {
+            fail.get();
+        }
+
+        private static void continueProcessing() {
+            continueProcessing.complete(null);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
new file mode 100644
index 0000000..3bf77f3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.mock.Whitebox;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Tests for {@link HybridSourceReader}. */
+public class HybridSourceReaderTest {
+
+    @Test
+    public void testReader() throws Exception {
+        TestingReaderContext readerContext = new TestingReaderContext();
+        TestingReaderOutput<Integer> readerOutput = new 
TestingReaderOutput<>();
+        MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);
+
+        // 2 underlying readers to exercise switch
+        SourceReader<Integer, MockSourceSplit> mockSplitReader1 =
+                source.createReader(readerContext);
+        SourceReader<Integer, MockSourceSplit> mockSplitReader2 =
+                source.createReader(readerContext);
+
+        Map<Integer, Source> switchedSources = new HashMap<>();
+
+        HybridSourceReader<Integer> reader =
+                new HybridSourceReader<>(readerContext, switchedSources);
+
+        Assert.assertThat(readerContext.getSentEvents(), 
Matchers.emptyIterable());
+        reader.start();
+        assertAndClearSourceReaderFinishedEvent(readerContext, -1);
+        Assert.assertNull(currentReader(reader));
+        Assert.assertEquals(InputStatus.NOTHING_AVAILABLE, 
reader.pollNext(readerOutput));
+
+        Source source1 =
+                new MockSource(null, 0) {
+                    @Override
+                    public SourceReader<Integer, MockSourceSplit> createReader(
+                            SourceReaderContext readerContext) {
+                        return mockSplitReader1;
+                    }
+                };
+        reader.handleSourceEvents(new SwitchSourceEvent(0, source1, false));
+        Assert.assertEquals(source1, switchedSources.get(0));
+        MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1);
+        mockSplit.addRecord(0);
+        HybridSourceSplit hybridSplit = new HybridSourceSplit(0, mockSplit);
+        reader.addSplits(Collections.singletonList(hybridSplit));
+
+        // drain splits
+        InputStatus status = reader.pollNext(readerOutput);
+        while (readerOutput.getEmittedRecords().isEmpty() || status == 
InputStatus.MORE_AVAILABLE) {
+            status = reader.pollNext(readerOutput);
+            Thread.sleep(10);
+        }
+        Assert.assertThat(readerOutput.getEmittedRecords(), 
Matchers.contains(0));
+        reader.pollNext(readerOutput);
+        Assert.assertEquals(
+                "before notifyNoMoreSplits",
+                InputStatus.NOTHING_AVAILABLE,
+                reader.pollNext(readerOutput));
+
+        reader.notifyNoMoreSplits();
+        reader.pollNext(readerOutput);
+        assertAndClearSourceReaderFinishedEvent(readerContext, 0);
+
+        Assert.assertEquals(
+                "reader before switch source event", mockSplitReader1, 
currentReader(reader));
+
+        Source source2 =
+                new MockSource(null, 0) {
+                    @Override
+                    public SourceReader<Integer, MockSourceSplit> createReader(
+                            SourceReaderContext readerContext) {
+                        return mockSplitReader2;
+                    }
+                };
+        reader.handleSourceEvents(new SwitchSourceEvent(1, source2, true));
+        Assert.assertEquals(
+                "reader after switch source event", mockSplitReader2, 
currentReader(reader));
+
+        reader.notifyNoMoreSplits();
+        Assert.assertEquals(
+                "reader 1 after notifyNoMoreSplits",
+                InputStatus.END_OF_INPUT,
+                reader.pollNext(readerOutput));
+
+        reader.close();
+    }
+
+    @Test
+    public void testReaderRecovery() throws Exception {
+        TestingReaderContext readerContext = new TestingReaderContext();
+        TestingReaderOutput<Integer> readerOutput = new 
TestingReaderOutput<>();
+        MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);
+
+        Map<Integer, Source> switchedSources = new HashMap<>();
+
+        HybridSourceReader<Integer> reader =
+                new HybridSourceReader<>(readerContext, switchedSources);
+
+        reader.start();
+        assertAndClearSourceReaderFinishedEvent(readerContext, -1);
+        reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
+        Assert.assertEquals(source, switchedSources.get(0));
+
+        MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647);
+        // mockSplit.addRecord(0);
+        HybridSourceSplit hybridSplit = new HybridSourceSplit(0, mockSplit);
+        reader.addSplits(Collections.singletonList(hybridSplit));
+
+        List<HybridSourceSplit> snapshot = reader.snapshotState(0);
+        Assert.assertThat(snapshot, Matchers.contains(hybridSplit));
+
+        // reader recovery
+        readerContext.clearSentEvents();
+        switchedSources = new HashMap<>();
+        reader = new HybridSourceReader<>(readerContext, switchedSources);
+
+        reader.addSplits(snapshot);
+        Assert.assertNull(currentReader(reader));
+
+        reader.start();
+        Assert.assertNull(currentReader(reader));
+
+        assertAndClearSourceReaderFinishedEvent(readerContext, -1);
+        reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
+        Assert.assertNotNull(currentReader(reader));
+        Assert.assertEquals(source, switchedSources.get(0));
+        Assert.assertThat(reader.snapshotState(1), 
Matchers.contains(hybridSplit));
+
+        reader.close();
+    }
+
+    private static SourceReader<Integer, MockSourceSplit> currentReader(
+            HybridSourceReader<?> reader) {
+        return (SourceReader) Whitebox.getInternalState(reader, 
"currentReader");
+    }
+
+    private static void assertAndClearSourceReaderFinishedEvent(
+            TestingReaderContext context, int sourceIndex) {
+        Assert.assertThat(context.getSentEvents(), 
Matchers.iterableWithSize(1));
+        Assert.assertEquals(
+                sourceIndex,
+                ((SourceReaderFinishedEvent) 
context.getSentEvents().get(0)).sourceIndex());
+        context.clearSentEvents();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000..2a8fd57
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
+import org.apache.flink.mock.Whitebox;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/** Tests for {@link HybridSourceSplitEnumerator}. */
+public class HybridSourceSplitEnumeratorTest {
+
+    private static final int SUBTASK0 = 0;
+    private static final int SUBTASK1 = 1;
+
+    private HybridSource<Integer> source;
+    private MockSplitEnumeratorContext<HybridSourceSplit> context;
+    private HybridSourceSplitEnumerator enumerator;
+    private HybridSourceSplit splitFromSource0;
+    private HybridSourceSplit splitFromSource1;
+
+    private void setupEnumeratorAndTriggerSourceSwitch() {
+        context = new MockSplitEnumeratorContext<>(2);
+        source =
+                HybridSource.builder(new MockBaseSource(1, 1, 
Boundedness.BOUNDED))
+                        .addSource(new MockBaseSource(1, 1, 
Boundedness.BOUNDED))
+                        .build();
+
+        enumerator = (HybridSourceSplitEnumerator) 
source.createEnumerator(context);
+        enumerator.start();
+        // mock enumerator assigns splits once all readers are registered
+        registerReader(context, enumerator, SUBTASK0);
+        assertThat(context.getSplitsAssignmentSequence(), 
Matchers.emptyIterable());
+        registerReader(context, enumerator, SUBTASK1);
+        assertThat(context.getSplitsAssignmentSequence(), 
Matchers.emptyIterable());
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+        assertThat(context.getSplitsAssignmentSequence(), 
Matchers.iterableWithSize(0));
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(-1));
+        assertThat(context.getSplitsAssignmentSequence(), 
Matchers.iterableWithSize(1));
+        splitFromSource0 =
+                
context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0);
+        assertEquals(0, splitFromSource0.sourceIndex());
+        assertEquals(0, getCurrentSourceIndex(enumerator));
+
+        // trigger source switch
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(0));
+        assertEquals("one reader finished", 0, 
getCurrentSourceIndex(enumerator));
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(0));
+        assertEquals("both readers finished", 1, 
getCurrentSourceIndex(enumerator));
+        assertThat(
+                "switch triggers split assignment",
+                context.getSplitsAssignmentSequence(),
+                Matchers.iterableWithSize(2));
+        splitFromSource1 =
+                
context.getSplitsAssignmentSequence().get(1).assignment().get(SUBTASK0).get(0);
+        assertEquals(1, splitFromSource1.sourceIndex());
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(SUBTASK1));
+        assertEquals("reader without assignment", 1, 
getCurrentSourceIndex(enumerator));
+    }
+
+    @Test
+    public void testRegisterReaderAfterSwitchAndReaderReset() {
+        setupEnumeratorAndTriggerSourceSwitch();
+
+        // add split of previous source back (simulates reader reset during 
recovery)
+        context.getSplitsAssignmentSequence().clear();
+        enumerator.addReader(SUBTASK0);
+        enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), 
SUBTASK0);
+        assertThat(context.getSplitsAssignmentSequence(), 
Matchers.iterableWithSize(0));
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+        assertSplitAssignment(
+                "addSplitsBack triggers assignment when reader registered",
+                context,
+                1,
+                splitFromSource0,
+                SUBTASK0);
+
+        // remove reader from context
+        context.getSplitsAssignmentSequence().clear();
+        context.unregisterReader(SUBTASK0);
+        enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), 
SUBTASK0);
+        assertThat(
+                "addSplitsBack doesn't trigger assignment when reader not 
registered",
+                context.getSplitsAssignmentSequence(),
+                Matchers.emptyIterable());
+        registerReader(context, enumerator, SUBTASK0);
+        assertThat(context.getSplitsAssignmentSequence(), 
Matchers.iterableWithSize(0));
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+        assertSplitAssignment(
+                "registerReader triggers assignment", context, 1, 
splitFromSource0, SUBTASK0);
+    }
+
+    @Test
+    public void testHandleSplitRequestAfterSwitchAndReaderReset() {
+        setupEnumeratorAndTriggerSourceSwitch();
+
+        UnderlyingEnumeratorWrapper underlyingEnumeratorWrapper =
+                new UnderlyingEnumeratorWrapper(
+                        (MockSplitEnumerator)
+                                Whitebox.getInternalState(enumerator, 
"currentEnumerator"));
+        Whitebox.setInternalState(enumerator, "currentEnumerator", 
underlyingEnumeratorWrapper);
+
+        List<MockSourceSplit> mockSourceSplits =
+                (List<MockSourceSplit>)
+                        
Whitebox.getInternalState(underlyingEnumeratorWrapper.enumerator, "splits");
+        assertThat(mockSourceSplits, Matchers.emptyIterable());
+
+        // simulate reader reset to before switch by adding split of previous 
source back
+        context.getSplitsAssignmentSequence().clear();
+        assertEquals("current enumerator", 1, 
getCurrentSourceIndex(enumerator));
+
+        assertThat(underlyingEnumeratorWrapper.handleSplitRequests, 
Matchers.emptyIterable());
+        enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
+
+        assertSplitAssignment(
+                "handleSplitRequest triggers assignment of split by underlying 
enumerator",
+                context,
+                1,
+                new HybridSourceSplit(1, UnderlyingEnumeratorWrapper.SPLIT_1),
+                SUBTASK0);
+
+        // handleSplitRequest invalid during reset
+        enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), 
SUBTASK0);
+        try {
+            enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
+            Assert.fail("expected exception");
+        } catch (IllegalStateException ex) {
+        }
+    }
+
+    @Test
+    public void testRestoreEnumerator() throws Exception {
+        setupEnumeratorAndTriggerSourceSwitch();
+        enumerator = (HybridSourceSplitEnumerator) 
source.createEnumerator(context);
+        enumerator.start();
+        HybridSourceEnumeratorState enumeratorState = 
enumerator.snapshotState(0);
+        Assert.assertEquals(1, ((List) 
enumeratorState.getWrappedState()).size());
+        enumerator =
+                (HybridSourceSplitEnumerator) 
source.restoreEnumerator(context, enumeratorState);
+        enumerator.start();
+        enumeratorState = enumerator.snapshotState(0);
+        Assert.assertEquals(1, ((List) 
enumeratorState.getWrappedState()).size());
+    }
+
+    private static class UnderlyingEnumeratorWrapper
+            implements SplitEnumerator<MockSourceSplit, Object> {
+        private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 
0, 1);
+        private final List<Integer> handleSplitRequests = new ArrayList<>();
+        private final MockSplitEnumerator enumerator;
+        private final SplitEnumeratorContext context;
+
+        private UnderlyingEnumeratorWrapper(MockSplitEnumerator enumerator) {
+            this.enumerator = enumerator;
+            this.context =
+                    (SplitEnumeratorContext) 
Whitebox.getInternalState(enumerator, "context");
+        }
+
+        @Override
+        public void handleSplitRequest(int subtaskId, String 
requesterHostname) {
+            handleSplitRequests.add(subtaskId);
+            context.assignSplits(new SplitsAssignment(SPLIT_1, subtaskId));
+        }
+
+        @Override
+        public void start() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void addSplitsBack(List splits, int subtaskId) {
+            enumerator.addSplitsBack(splits, subtaskId);
+        }
+
+        @Override
+        public void addReader(int subtaskId) {
+            enumerator.addReader(subtaskId);
+        }
+
+        @Override
+        public Object snapshotState(long checkpointId) throws Exception {
+            return enumerator.snapshotState(checkpointId);
+        }
+
+        @Override
+        public void close() throws IOException {
+            enumerator.close();
+        }
+    }
+
+    private static void assertSplitAssignment(
+            String reason,
+            MockSplitEnumeratorContext<HybridSourceSplit> context,
+            int size,
+            HybridSourceSplit split,
+            int subtask) {
+        assertThat(reason, context.getSplitsAssignmentSequence(), 
Matchers.iterableWithSize(size));
+        assertEquals(
+                reason,
+                split,
+                context.getSplitsAssignmentSequence()
+                        .get(size - 1)
+                        .assignment()
+                        .get(subtask)
+                        .get(0));
+    }
+
+    private static void registerReader(
+            MockSplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSourceSplitEnumerator enumerator,
+            int reader) {
+        context.registerReader(new ReaderInfo(reader, "location 0"));
+        enumerator.addReader(reader);
+    }
+
+    private static int getCurrentSourceIndex(HybridSourceSplitEnumerator 
enumerator) {
+        return (int) Whitebox.getInternalState(enumerator, 
"currentSourceIndex");
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
new file mode 100644
index 0000000..d43275f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Tests for {@link HybridSourceSplitSerializer}. */
+public class HybridSourceSplitSerializerTest {
+
+    @Test
+    public void testSerialization() throws Exception {
+        Map<Integer, Source> switchedSources = new HashMap<>();
+        switchedSources.put(0, new MockSource(null, 0));
+        HybridSourceSplitSerializer serializer = new 
HybridSourceSplitSerializer(switchedSources);
+        HybridSourceSplit split = new HybridSourceSplit(0, new 
MockSourceSplit(1));
+        byte[] serialized = serializer.serialize(split);
+        HybridSourceSplit clonedSplit = serializer.deserialize(0, serialized);
+        Assert.assertEquals(split, clonedSplit);
+
+        try {
+            serializer.deserialize(1, serialized);
+            Assert.fail();
+        } catch (IOException e) {
+            // expected invalid version
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java
new file mode 100644
index 0000000..786677b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/** Tests for {@link HybridSource}. */
+public class HybridSourceTest {
+
+    @Test
+    public void testBoundedness() {
+        HybridSource<Integer> source;
+
+        source =
+                HybridSource.builder(new MockBaseSource(1, 1, 
Boundedness.BOUNDED))
+                        .addSource(new MockBaseSource(1, 1, 
Boundedness.BOUNDED))
+                        .build();
+        assertEquals(Boundedness.BOUNDED, source.getBoundedness());
+
+        source =
+                HybridSource.builder(new MockBaseSource(1, 1, 
Boundedness.BOUNDED))
+                        .addSource(new MockBaseSource(1, 1, 
Boundedness.CONTINUOUS_UNBOUNDED))
+                        .build();
+        assertEquals(Boundedness.CONTINUOUS_UNBOUNDED, 
source.getBoundedness());
+
+        try {
+            HybridSource.builder(new MockBaseSource(1, 1, 
Boundedness.CONTINUOUS_UNBOUNDED))
+                    .addSource(new MockBaseSource(1, 1, 
Boundedness.CONTINUOUS_UNBOUNDED))
+                    .build();
+            fail("expected exception");
+        } catch (IllegalArgumentException e) {
+            // boundedness check to fail
+        }
+    }
+
+    @Test
+    public void testBuilderWithSourceFactory() {
+        HybridSource.SourceFactory<Integer, Source<Integer, ?, ?>, 
MockSplitEnumerator>
+                sourceFactory =
+                        new HybridSource.SourceFactory<
+                                Integer, Source<Integer, ?, ?>, 
MockSplitEnumerator>() {
+                            @Override
+                            public Source<Integer, ?, ?> create(
+                                    
HybridSource.SourceSwitchContext<MockSplitEnumerator> context) {
+                                MockSplitEnumerator enumerator = 
context.getPreviousEnumerator();
+                                return new MockBaseSource(1, 1, 
Boundedness.BOUNDED);
+                            }
+                        };
+
+        HybridSource<Integer> source =
+                new HybridSource.HybridSourceBuilder<Integer, 
MockSplitEnumerator>()
+                        .<MockSplitEnumerator, Source<Integer, ?, ?>>addSource(
+                                new MockBaseSource(1, 1, Boundedness.BOUNDED))
+                        .addSource(sourceFactory, Boundedness.BOUNDED)
+                        .build();
+        assertNotNull(source);
+    }
+
+    private static class ExtendedMockSplitEnumerator extends 
MockSplitEnumerator {
+        public ExtendedMockSplitEnumerator(
+                List<MockSourceSplit> splits, 
SplitEnumeratorContext<MockSourceSplit> context) {
+            super(splits, context);
+        }
+    }
+
+    @Test
+    public void testBuilderWithEnumeratorSuperclass() {
+        HybridSource.SourceFactory<Integer, Source<Integer, ?, ?>, 
MockSplitEnumerator>
+                sourceFactory =
+                        (HybridSource.SourceFactory<
+                                        Integer, Source<Integer, ?, ?>, 
MockSplitEnumerator>)
+                                context -> {
+                                    MockSplitEnumerator enumerator =
+                                            context.getPreviousEnumerator();
+                                    return new MockBaseSource(1, 1, 
Boundedness.BOUNDED);
+                                };
+
+        HybridSource<Integer> source =
+                new HybridSource.HybridSourceBuilder<Integer, 
MockSplitEnumerator>()
+                        .<ExtendedMockSplitEnumerator, Source<Integer, ?, 
?>>addSource(
+                                new MockBaseSource(1, 1, Boundedness.BOUNDED))
+                        .addSource(sourceFactory, Boundedness.BOUNDED)
+                        .build();
+        assertNotNull(source);
+    }
+}

Reply via email to