This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c72aa4b97848625f23bac331b8e916c6a3e2acd3 Author: Thomas Weise <[email protected]> AuthorDate: Fri Feb 26 19:32:32 2021 -0800 [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline --- .../connector/base/source/hybrid/HybridSource.java | 257 ++++++++++++++ .../source/hybrid/HybridSourceEnumeratorState.java | 38 ++ .../HybridSourceEnumeratorStateSerializer.java | 106 ++++++ .../base/source/hybrid/HybridSourceReader.java | 254 ++++++++++++++ .../base/source/hybrid/HybridSourceSplit.java | 94 +++++ .../source/hybrid/HybridSourceSplitEnumerator.java | 390 +++++++++++++++++++++ .../source/hybrid/HybridSourceSplitSerializer.java | 98 ++++++ .../source/hybrid/SourceReaderFinishedEvent.java | 49 +++ .../base/source/hybrid/SwitchSourceEvent.java | 62 ++++ .../base/source/hybrid/HybridSourceITCase.java | 246 +++++++++++++ .../base/source/hybrid/HybridSourceReaderTest.java | 181 ++++++++++ .../hybrid/HybridSourceSplitEnumeratorTest.java | 252 +++++++++++++ .../hybrid/HybridSourceSplitSerializerTest.java | 52 +++ .../base/source/hybrid/HybridSourceTest.java | 115 ++++++ 14 files changed, 2194 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java new file mode 100644 index 0000000..e3d66de --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Hybrid source that switches underlying sources based on configured source chain. + * + * <p>A simple example with FileSource and KafkaSource with fixed Kafka start position: + * + * <pre>{@code + * FileSource<String> fileSource = + * FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); + * KafkaSource<String> kafkaSource = + * KafkaSource.<String>builder() + * .setBootstrapServers("localhost:9092") + * .setGroupId("MyGroup") + * .setTopics(Arrays.asList("quickstart-events")) + * .setDeserializer( + * KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) + * .setStartingOffsets(OffsetsInitializer.earliest()) + * .build(); + * HybridSource<String> hybridSource = + * HybridSource.builder(fileSource) + * .addSource(kafkaSource) + * .build(); + * }</pre> + * + * <p>A more complex example with Kafka start position derived from previous source: + * + * <pre>{@code + * HybridSource<String> hybridSource = + * HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource) + * .addSource( + * switchContext -> { + * StaticFileSplitEnumerator previousEnumerator = + * switchContext.getPreviousEnumerator(); + * // how to get timestamp depends on specific enumerator + * long timestamp = previousEnumerator.getEndTimestamp(); + * OffsetsInitializer offsets = + * OffsetsInitializer.timestamp(timestamp); + * KafkaSource<String> kafkaSource = + * KafkaSource.<String>builder() + * .setBootstrapServers("localhost:9092") + * .setGroupId("MyGroup") + * .setTopics(Arrays.asList("quickstart-events")) + * .setDeserializer( + * KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) + * .setStartingOffsets(offsets) + * .build(); + * return kafkaSource; + * }, + * Boundedness.CONTINUOUS_UNBOUNDED) + * .build(); + * }</pre> + */ +@PublicEvolving +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> { + + private final List<SourceListEntry> sources; + // sources are populated per subtask at switch time + private final Map<Integer, Source> switchedSources; + + /** Protected for subclass, use {@link #builder(Source)} to construct source. */ + protected HybridSource(List<SourceListEntry> sources) { + Preconditions.checkArgument(!sources.isEmpty()); + this.sources = sources; + this.switchedSources = new HashMap<>(sources.size()); + } + + /** Builder for {@link HybridSource}. */ + public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, EnumT> builder( + Source<T, ?, ?> firstSource) { + HybridSourceBuilder<T, EnumT> builder = new HybridSourceBuilder<>(); + return builder.addSource(firstSource); + } + + @Override + public Boundedness getBoundedness() { + return sources.get(sources.size() - 1).boundedness; + } + + @Override + public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + return new HybridSourceReader(readerContext, switchedSources); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext) { + return new HybridSourceSplitEnumerator(enumContext, sources, 0, switchedSources, null); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext, + HybridSourceEnumeratorState checkpoint) + throws Exception { + return new HybridSourceSplitEnumerator( + enumContext, + sources, + checkpoint.getCurrentSourceIndex(), + switchedSources, + checkpoint.getWrappedState()); + } + + @Override + public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() { + return new HybridSourceSplitSerializer(switchedSources); + } + + @Override + public SimpleVersionedSerializer<HybridSourceEnumeratorState> + getEnumeratorCheckpointSerializer() { + return new HybridSourceEnumeratorStateSerializer(switchedSources); + } + + /** + * Context provided to source factory. + * + * <p>To derive a start position at switch time, the source can be initialized from context of + * the previous enumerator. A specific enumerator implementation may carry state such as an end + * timestamp, that can be used to derive the start position of the next source. + * + * <p>Currently only the previous enumerator is exposed. The context interface allows for + * backward compatible extension, i.e. additional information about the previous source can be + * supplied in the future. + */ + public interface SourceSwitchContext<EnumT> { + EnumT getPreviousEnumerator(); + } + + /** + * Factory for underlying sources of {@link HybridSource}. + * + * <p>This factory permits building of a source at graph construction time or deferred at switch + * time. Provides the ability to set a start position in any way a specific source allows. + * Future convenience could be built on top of it, for example a default implementation that + * recognizes optional interfaces to transfer position in a universal format. + * + * <p>Called when the current enumerator has finished. The previous source's final state can + * thus be used to construct the next source, as required for dynamic position transfer at time + * of switching. + * + * <p>If start position is known at job submission time, the source can be constructed in the + * entry point and simply wrapped into the factory, providing the benefit of validation during + * submission. + */ + @FunctionalInterface + public interface SourceFactory< + T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator> + extends Serializable { + SourceT create(SourceSwitchContext<FromEnumT> context); + } + + private static class PassthroughSourceFactory< + T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator> + implements SourceFactory<T, SourceT, FromEnumT> { + + private final SourceT source; + + private PassthroughSourceFactory(SourceT source) { + this.source = source; + } + + @Override + public SourceT create(SourceSwitchContext<FromEnumT> context) { + return source; + } + } + + /** Entry for list of underlying sources. */ + static class SourceListEntry implements Serializable { + protected final SourceFactory factory; + protected final Boundedness boundedness; + + private SourceListEntry(SourceFactory factory, Boundedness boundedness) { + this.factory = Preconditions.checkNotNull(factory); + this.boundedness = Preconditions.checkNotNull(boundedness); + } + + static SourceListEntry of(SourceFactory configurer, Boundedness boundedness) { + return new SourceListEntry(configurer, boundedness); + } + } + + /** Builder for HybridSource. */ + public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator> + implements Serializable { + private final List<SourceListEntry> sources; + + public HybridSourceBuilder() { + sources = new ArrayList<>(); + } + + /** Add pre-configured source (without switch time modification). */ + public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>> + HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) { + return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness()); + } + + /** Add source with deferred instantiation based on previous enumerator. */ + public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>> + HybridSourceBuilder<T, ToEnumT> addSource( + SourceFactory<T, NextSourceT, ? super EnumT> sourceFactory, + Boundedness boundedness) { + if (!sources.isEmpty()) { + Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness), + "All sources except the final source need to be bounded."); + } + ClosureCleaner.clean( + sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + sources.add(SourceListEntry.of(sourceFactory, boundedness)); + return (HybridSourceBuilder) this; + } + + /** Build the source. */ + public HybridSource<T> build() { + return new HybridSource(sources); + } + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java new file mode 100644 index 0000000..2da99ee --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +/** The state of hybrid source enumerator. */ +public class HybridSourceEnumeratorState { + private final int currentSourceIndex; + private final Object wrappedState; + + HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) { + this.currentSourceIndex = currentSourceIndex; + this.wrappedState = wrappedState; + } + + public int getCurrentSourceIndex() { + return this.currentSourceIndex; + } + + public Object getWrappedState() { + return wrappedState; + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java new file mode 100644 index 0000000..721e010 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. */ +public class HybridSourceEnumeratorStateSerializer + implements SimpleVersionedSerializer<HybridSourceEnumeratorState> { + + private static final int CURRENT_VERSION = 0; + + private final Map<Integer, SimpleVersionedSerializer<Object>> cachedSerializers; + private final Map<Integer, Source> switchedSources; + + public HybridSourceEnumeratorStateSerializer(Map<Integer, Source> switchedSources) { + this.switchedSources = switchedSources; + this.cachedSerializers = new HashMap<>(); + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(HybridSourceEnumeratorState enumState) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + out.writeInt(enumState.getCurrentSourceIndex()); + SimpleVersionedSerializer<Object> serializer = + serializerOf(enumState.getCurrentSourceIndex()); + out.writeInt(serializer.getVersion()); + byte[] enumStateBytes = serializer.serialize(enumState.getWrappedState()); + out.writeInt(enumStateBytes.length); + out.write(enumStateBytes); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public HybridSourceEnumeratorState deserialize(int version, byte[] serialized) + throws IOException { + if (version == 0) { + return deserializeV0(serialized); + } + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version up to %d", + version, CURRENT_VERSION)); + } + + private HybridSourceEnumeratorState deserializeV0(byte[] serialized) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + int sourceIndex = in.readInt(); + int nestedVersion = in.readInt(); + int length = in.readInt(); + byte[] nestedBytes = new byte[length]; + in.readFully(nestedBytes); + Object nested = serializerOf(sourceIndex).deserialize(nestedVersion, nestedBytes); + return new HybridSourceEnumeratorState(sourceIndex, nested); + } + } + + private SimpleVersionedSerializer<Object> serializerOf(int sourceIndex) { + return cachedSerializers.computeIfAbsent( + sourceIndex, + (k -> { + Source source = + Preconditions.checkNotNull( + switchedSources.get(k), + "Source for index=%s not available", + sourceIndex); + return source.getEnumeratorCheckpointSerializer(); + })); + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java new file mode 100644 index 0000000..cdd2e8b --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * <p>This reader processes splits from a sequence of sources as determined by the enumerator. The + * current source is provided with {@link SwitchSourceEvent} and the reader does not require upfront + * knowledge of the number and order of sources. At a given point in time one underlying reader is + * active. + * + * <p>When the underlying reader has consumed all input for a source, {@link HybridSourceReader} + * sends {@link SourceReaderFinishedEvent} to the coordinator. + * + * <p>This reader does not make assumptions about the order in which sources are activated. When + * recovering from a checkpoint it may start processing splits for a previous source, which is + * indicated via {@link SwitchSourceEvent}. + */ +public class HybridSourceReader<T> implements SourceReader<T, HybridSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); + private final SourceReaderContext readerContext; + private final Map<Integer, Source> switchedSources; + private int currentSourceIndex = -1; + private boolean isFinalSource; + private SourceReader<T, ? extends SourceSplit> currentReader; + private CompletableFuture<Void> availabilityFuture = new CompletableFuture<>(); + private List<HybridSourceSplit> restoredSplits = new ArrayList<>(); + + public HybridSourceReader( + SourceReaderContext readerContext, Map<Integer, Source> switchedSources) { + this.readerContext = readerContext; + this.switchedSources = switchedSources; + } + + @Override + public void start() { + // underlying reader starts on demand with split assignment + int initialSourceIndex = currentSourceIndex; + if (!restoredSplits.isEmpty()) { + initialSourceIndex = restoredSplits.get(0).sourceIndex() - 1; + } + readerContext.sendSourceEventToCoordinator( + new SourceReaderFinishedEvent(initialSourceIndex)); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + if (currentReader == null) { + return InputStatus.NOTHING_AVAILABLE; + } + + InputStatus status = currentReader.pollNext(output); + if (status == InputStatus.END_OF_INPUT) { + // trap END_OF_INPUT unless all sources have finished + LOG.info( + "End of input subtask={} sourceIndex={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + // Signal the coordinator that this reader has consumed all input and the + // next source can potentially be activated. + readerContext.sendSourceEventToCoordinator( + new SourceReaderFinishedEvent(currentSourceIndex)); + if (!isFinalSource) { + // More splits may arrive for a subsequent reader. + // InputStatus.NOTHING_AVAILABLE suspends poll, requires completion of the + // availability future after receiving more splits to resume. + if (availabilityFuture.isDone()) { + // reset to avoid continued polling + availabilityFuture = new CompletableFuture(); + } + return InputStatus.NOTHING_AVAILABLE; + } + } + return status; + } + + @Override + public List<HybridSourceSplit> snapshotState(long checkpointId) { + List<? extends SourceSplit> state = + currentReader != null + ? currentReader.snapshotState(checkpointId) + : Collections.emptyList(); + return HybridSourceSplit.wrapSplits(currentSourceIndex, state); + } + + @Override + public CompletableFuture<Void> isAvailable() { + return availabilityFuture; + } + + @Override + public void addSplits(List<HybridSourceSplit> splits) { + LOG.info( + "Adding splits subtask={} sourceIndex={} currentReader={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader, + splits); + if (currentSourceIndex < 0) { + // splits added back during reader recovery + restoredSplits.addAll(splits); + } else { + List<SourceSplit> realSplits = new ArrayList<>(splits.size()); + for (HybridSourceSplit split : splits) { + Preconditions.checkState( + split.sourceIndex() == currentSourceIndex, + "Split %s while current source is %s", + split, + currentSourceIndex); + realSplits.add(split.getWrappedSplit()); + } + currentReader.addSplits((List) realSplits); + } + } + + @Override + public void notifyNoMoreSplits() { + if (currentReader != null) { + currentReader.notifyNoMoreSplits(); + } + LOG.debug( + "No more splits for subtask={} sourceIndex={} currentReader={}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof SwitchSourceEvent) { + SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent; + LOG.info( + "Switch source event: subtask={} sourceIndex={} source={}", + readerContext.getIndexOfSubtask(), + sse.sourceIndex(), + sse.source()); + switchedSources.put(sse.sourceIndex(), sse.source()); + setCurrentReader(sse.sourceIndex()); + isFinalSource = sse.isFinalSource(); + if (!availabilityFuture.isDone()) { + // continue polling + availabilityFuture.complete(null); + } + } else { + currentReader.handleSourceEvents(sourceEvent); + } + } + + @Override + public void close() throws Exception { + if (currentReader != null) { + currentReader.close(); + } + LOG.debug( + "Reader closed: subtask={} sourceIndex={} currentReader={}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + } + + private void setCurrentReader(int index) { + Preconditions.checkArgument(index != currentSourceIndex); + if (currentReader != null) { + try { + currentReader.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close current reader", e); + } + LOG.debug( + "Reader closed: subtask={} sourceIndex={} currentReader={}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + } + // TODO: track previous readers splits till checkpoint + Source source = + Preconditions.checkNotNull( + switchedSources.get(index), "Source for index=%s not available", index); + SourceReader<T, ?> reader; + try { + reader = source.createReader(readerContext); + } catch (Exception e) { + throw new RuntimeException("Failed tp create reader", e); + } + reader.start(); + currentSourceIndex = index; + currentReader = reader; + currentReader + .isAvailable() + .whenComplete( + (result, ex) -> { + if (ex == null) { + availabilityFuture.complete(result); + } else { + availabilityFuture.completeExceptionally(ex); + } + }); + LOG.debug( + "Reader started: subtask={} sourceIndex={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + reader); + // add restored splits + if (!restoredSplits.isEmpty()) { + List<HybridSourceSplit> splits = new ArrayList<>(restoredSplits.size()); + Iterator<HybridSourceSplit> it = restoredSplits.iterator(); + while (it.hasNext()) { + HybridSourceSplit hybridSplit = it.next(); + if (hybridSplit.sourceIndex() == index) { + splits.add(hybridSplit); + it.remove(); + } + } + addSplits(splits); + } + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java new file mode 100644 index 0000000..9057aa6 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** Source split that wraps the actual split type. */ +public class HybridSourceSplit implements SourceSplit { + + private final SourceSplit wrappedSplit; + private final int sourceIndex; + + public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) { + this.sourceIndex = sourceIndex; + this.wrappedSplit = wrappedSplit; + } + + public int sourceIndex() { + return this.sourceIndex; + } + + public SourceSplit getWrappedSplit() { + return wrappedSplit; + } + + @Override + public String splitId() { + return wrappedSplit.splitId(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HybridSourceSplit that = (HybridSourceSplit) o; + return sourceIndex == that.sourceIndex && wrappedSplit.equals(that.wrappedSplit); + } + + @Override + public int hashCode() { + return Objects.hash(wrappedSplit, sourceIndex); + } + + @Override + public String toString() { + return "HybridSourceSplit{" + + "realSplit=" + + wrappedSplit + + ", sourceIndex=" + + sourceIndex + + '}'; + } + + public static List<HybridSourceSplit> wrapSplits( + int readerIndex, List<? extends SourceSplit> state) { + List<HybridSourceSplit> wrappedSplits = new ArrayList<>(state.size()); + for (SourceSplit split : state) { + wrappedSplits.add(new HybridSourceSplit(readerIndex, split)); + } + return wrappedSplits; + } + + public static List<SourceSplit> unwrapSplits(List<HybridSourceSplit> splits) { + List<SourceSplit> unwrappedSplits = new ArrayList<>(splits.size()); + for (HybridSourceSplit split : splits) { + unwrappedSplits.add(split.getWrappedSplit()); + } + return unwrappedSplits; + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java new file mode 100644 index 0000000..f213492 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by creating the new enumerator via {@link + * Source#createEnumerator(SplitEnumeratorContext)}. The start position can be fixed at pipeline + * construction time through the source or supplied at switch time through a converter function by + * using the end state of the previous enumerator. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. These splits may originate from a previous enumerator that + * is no longer active. In that case {@link HybridSourceSplitEnumerator} will suspend forwarding to + * the current enumerator and replay the returned splits by activating the previous readers. After + * returned splits were processed, delegation to the current underlying enumerator resumes. + */ +public class HybridSourceSplitEnumerator + implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final List<HybridSource.SourceListEntry> sources; + private final Map<Integer, Source> switchedSources; + // Splits that have been returned due to subtask reset + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits; + private final Set<Integer> finishedReaders; + private final Map<Integer, Integer> readerSourceIndex; + private int currentSourceIndex; + private Object restoredEnumeratorState; + private SplitEnumerator<SourceSplit, Object> currentEnumerator; + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + List<HybridSource.SourceListEntry> sources, + int initialSourceIndex, + Map<Integer, Source> switchedSources, + Object restoredEnumeratorState) { + Preconditions.checkArgument(initialSourceIndex < sources.size()); + this.context = context; + this.sources = sources; + this.currentSourceIndex = initialSourceIndex; + this.pendingSplits = new HashMap<>(); + this.finishedReaders = new HashSet<>(); + this.readerSourceIndex = new HashMap<>(); + this.switchedSources = switchedSources; + this.restoredEnumeratorState = restoredEnumeratorState; + } + + @Override + public void start() { + switchEnumerator(); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + LOG.debug( + "handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", + subtaskId, + currentSourceIndex, + pendingSplits); + Preconditions.checkState(pendingSplits.isEmpty() || !pendingSplits.containsKey(subtaskId)); + currentEnumerator.handleSplitRequest(subtaskId, requesterHostname); + } + + @Override + public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) { + LOG.debug("Adding splits back for subtask={} splits={}", subtaskId, splits); + + // Splits returned can belong to multiple sources, after switching since last checkpoint + TreeMap<Integer, List<HybridSourceSplit>> splitsBySourceIndex = new TreeMap<>(); + + for (HybridSourceSplit split : splits) { + splitsBySourceIndex + .computeIfAbsent(split.sourceIndex(), k -> new ArrayList<>()) + .add(split); + } + + splitsBySourceIndex.forEach( + (k, splitsPerSource) -> { + if (k == currentSourceIndex) { + currentEnumerator.addSplitsBack( + HybridSourceSplit.unwrapSplits(splitsPerSource), subtaskId); + } else { + pendingSplits + .computeIfAbsent(subtaskId, sourceIndex -> new TreeMap<>()) + .put(k, splitsPerSource); + } + }); + } + + @Override + public void addReader(int subtaskId) { + LOG.debug("addReader subtaskId={}", subtaskId); + readerSourceIndex.remove(subtaskId); + } + + private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) { + readerSourceIndex.put(subtaskId, sourceIndex); + Source source = Preconditions.checkNotNull(switchedSources.get(sourceIndex)); + context.sendEventToSourceReader( + subtaskId, + new SwitchSourceEvent(sourceIndex, source, sourceIndex >= (sources.size() - 1))); + // send pending splits, if any + TreeMap<Integer, List<HybridSourceSplit>> splitsBySource = pendingSplits.get(subtaskId); + if (splitsBySource != null) { + List<HybridSourceSplit> splits = splitsBySource.remove(sourceIndex); + if (splits != null && !splits.isEmpty()) { + LOG.debug("Restoring splits to subtask={} {}", subtaskId, splits); + context.assignSplits( + new SplitsAssignment<>(Collections.singletonMap(subtaskId, splits))); + context.signalNoMoreSplits(subtaskId); + } + if (splitsBySource.isEmpty()) { + pendingSplits.remove(subtaskId); + } + } + + if (sourceIndex == currentSourceIndex) { + LOG.debug("adding reader subtask={} sourceIndex={}", subtaskId, currentSourceIndex); + currentEnumerator.addReader(subtaskId); + } + } + + @Override + public HybridSourceEnumeratorState snapshotState(long checkpointId) throws Exception { + Object enumState = currentEnumerator.snapshotState(checkpointId); + return new HybridSourceEnumeratorState(currentSourceIndex, enumState); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + LOG.debug( + "handleSourceEvent {} subtask={} pendingSplits={}", + sourceEvent, + subtaskId, + pendingSplits); + if (sourceEvent instanceof SourceReaderFinishedEvent) { + SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) sourceEvent; + + int subtaskSourceIndex = + readerSourceIndex.computeIfAbsent( + subtaskId, + k -> { + // first time we see reader after cold start or recovery + LOG.debug( + "New reader subtask={} sourceIndex={}", + subtaskId, + srfe.sourceIndex()); + return srfe.sourceIndex(); + }); + + if (srfe.sourceIndex() < subtaskSourceIndex) { + // duplicate event + return; + } + + if (subtaskSourceIndex < currentSourceIndex) { + subtaskSourceIndex++; + sendSwitchSourceEvent(subtaskId, subtaskSourceIndex); + return; + } + + // track readers that have finished processing for current enumerator + finishedReaders.add(subtaskId); + if (finishedReaders.size() == context.currentParallelism()) { + LOG.debug("All readers finished, ready to switch enumerator!"); + if (currentSourceIndex + 1 < sources.size()) { + switchEnumerator(); + // switch all readers prior to sending split assignments + for (int i = 0; i < context.currentParallelism(); i++) { + sendSwitchSourceEvent(i, currentSourceIndex); + } + } + } + } else { + currentEnumerator.handleSourceEvent(subtaskId, sourceEvent); + } + } + + @Override + public void close() throws IOException { + currentEnumerator.close(); + } + + private void switchEnumerator() { + + SplitEnumerator<SourceSplit, Object> previousEnumerator = currentEnumerator; + if (currentEnumerator != null) { + try { + currentEnumerator.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + currentEnumerator = null; + currentSourceIndex++; + } + + HybridSource.SourceSwitchContext<?> switchContext = + new HybridSource.SourceSwitchContext<Object>() { + @Override + public Object getPreviousEnumerator() { + return previousEnumerator; + } + }; + + Source<?, ? extends SourceSplit, Object> source = + switchedSources.computeIfAbsent( + currentSourceIndex, + k -> { + return sources.get(currentSourceIndex).factory.create(switchContext); + }); + switchedSources.put(currentSourceIndex, source); + SplitEnumeratorContextProxy delegatingContext = + new SplitEnumeratorContextProxy(currentSourceIndex, context, readerSourceIndex); + try { + if (restoredEnumeratorState == null) { + currentEnumerator = source.createEnumerator(delegatingContext); + } else { + LOG.info("Restoring enumerator for sourceIndex={}", currentSourceIndex); + currentEnumerator = + source.restoreEnumerator(delegatingContext, restoredEnumeratorState); + restoredEnumeratorState = null; + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to create enumerator for sourceIndex=" + currentSourceIndex, e); + } + LOG.info("Starting enumerator for sourceIndex={}", currentSourceIndex); + currentEnumerator.start(); + } + + /** + * The {@link SplitEnumeratorContext} that is provided to the currently active enumerator. + * + * <p>This context is used to wrap the splits into {@link HybridSourceSplit} and track + * assignment to readers. + */ + private static class SplitEnumeratorContextProxy<SplitT extends SourceSplit> + implements SplitEnumeratorContext<SplitT> { + private static final Logger LOG = + LoggerFactory.getLogger(SplitEnumeratorContextProxy.class); + + private final SplitEnumeratorContext<HybridSourceSplit> realContext; + private final int sourceIndex; + private final Map<Integer, Integer> readerSourceIndex; + + private SplitEnumeratorContextProxy( + int sourceIndex, + SplitEnumeratorContext<HybridSourceSplit> realContext, + Map<Integer, Integer> readerSourceIndex) { + this.realContext = realContext; + this.sourceIndex = sourceIndex; + this.readerSourceIndex = readerSourceIndex; + } + + @Override + public MetricGroup metricGroup() { + return realContext.metricGroup(); + } + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) { + realContext.sendEventToSourceReader(subtaskId, event); + } + + @Override + public int currentParallelism() { + return realContext.currentParallelism(); + } + + @Override + public Map<Integer, ReaderInfo> registeredReaders() { + // TODO: not start enumerator until readers are ready? + Map<Integer, ReaderInfo> readers = realContext.registeredReaders(); + if (readers.size() != readerSourceIndex.size()) { + return filterRegisteredReaders(readers); + } + Integer lastIndex = null; + for (Integer sourceIndex : readerSourceIndex.values()) { + if (lastIndex != null && lastIndex != sourceIndex) { + return filterRegisteredReaders(readers); + } + lastIndex = sourceIndex; + } + return readers; + } + + private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) { + Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size()); + for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) { + if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) { + readersForSource.put(e.getKey(), e.getValue()); + } + } + return readersForSource; + } + + @Override + public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) { + Map<Integer, List<HybridSourceSplit>> wrappedAssignmentMap = new HashMap<>(); + for (Map.Entry<Integer, List<SplitT>> e : newSplitAssignments.assignment().entrySet()) { + List<HybridSourceSplit> splits = + HybridSourceSplit.wrapSplits(sourceIndex, e.getValue()); + wrappedAssignmentMap.put(e.getKey(), splits); + } + SplitsAssignment<HybridSourceSplit> wrappedAssignments = + new SplitsAssignment<>(wrappedAssignmentMap); + LOG.debug("Assigning splits sourceIndex={} {}", sourceIndex, wrappedAssignments); + realContext.assignSplits(wrappedAssignments); + } + + @Override + public void assignSplit(SplitT split, int subtask) { + HybridSourceSplit wrappedSplit = new HybridSourceSplit(sourceIndex, split); + realContext.assignSplit(wrappedSplit, subtask); + } + + @Override + public void signalNoMoreSplits(int subtask) { + realContext.signalNoMoreSplits(subtask); + } + + @Override + public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) { + realContext.callAsync(callable, handler); + } + + @Override + public <T> void callAsync( + Callable<T> callable, + BiConsumer<T, Throwable> handler, + long initialDelay, + long period) { + realContext.callAsync(callable, handler, initialDelay, period); + } + + @Override + public void runInCoordinatorThread(Runnable runnable) { + realContext.runInCoordinatorThread(runnable); + } + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java new file mode 100644 index 0000000..025733c --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** Serializes splits by delegating to the source-indexed underlying split serializer. */ +public class HybridSourceSplitSerializer implements SimpleVersionedSerializer<HybridSourceSplit> { + + final Map<Integer, SimpleVersionedSerializer<SourceSplit>> cachedSerializers; + final Map<Integer, Source> switchedSources; + + public HybridSourceSplitSerializer(Map<Integer, Source> switchedSources) { + this.cachedSerializers = new HashMap<>(); + this.switchedSources = switchedSources; + } + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(HybridSourceSplit split) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + out.writeInt(split.sourceIndex()); + out.writeInt(serializerOf(split.sourceIndex()).getVersion()); + byte[] serializedSplit = + serializerOf(split.sourceIndex()).serialize(split.getWrappedSplit()); + out.writeInt(serializedSplit.length); + out.write(serializedSplit); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public HybridSourceSplit deserialize(int version, byte[] serialized) throws IOException { + if (version == 0) { + return deserializeV0(serialized); + } + throw new IOException(String.format("Invalid version %d", version)); + } + + private HybridSourceSplit deserializeV0(byte[] serialized) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + int sourceIndex = in.readInt(); + int nestedVersion = in.readInt(); + int length = in.readInt(); + byte[] splitBytes = new byte[length]; + in.readFully(splitBytes); + SourceSplit split = serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes); + return new HybridSourceSplit(sourceIndex, split); + } + } + + private SimpleVersionedSerializer<SourceSplit> serializerOf(int sourceIndex) { + return cachedSerializers.computeIfAbsent( + sourceIndex, + (k -> { + Source source = + Preconditions.checkNotNull( + switchedSources.get(k), + "Source for index=%s not available", + sourceIndex); + return source.getSplitSerializer(); + })); + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java new file mode 100644 index 0000000..df88d41 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * A source event sent from the HybridSourceReader to the enumerator to indicate that the current + * reader has finished and splits for the next reader can be sent. + */ +public class SourceReaderFinishedEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + private final int sourceIndex; + + /** + * Constructor. + * + * @param sourceIndex + */ + public SourceReaderFinishedEvent(int sourceIndex) { + this.sourceIndex = sourceIndex; + } + + public int sourceIndex() { + return sourceIndex; + } + + @Override + public String toString() { + return "SourceReaderFinishedEvent{" + "sourceIndex=" + sourceIndex + '}'; + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java new file mode 100644 index 0000000..e24a5ef --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * Event sent from {@link HybridSourceSplitEnumerator} to {@link HybridSourceReader} to switch to + * the indicated reader. + */ +public class SwitchSourceEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + private final int sourceIndex; + private final Source source; + private final boolean finalSource; + + /** + * Constructor. + * + * @param sourceIndex + */ + public SwitchSourceEvent(int sourceIndex, Source source, boolean finalSource) { + this.sourceIndex = sourceIndex; + this.source = source; + this.finalSource = finalSource; + } + + public int sourceIndex() { + return sourceIndex; + } + + public Source source() { + return source; + } + + public boolean isFinalSource() { + return finalSource; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + '{' + "sourceIndex=" + sourceIndex + '}'; + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java new file mode 100644 index 0000000..d510514 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.ClientAndIterator; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** MiniCluster-based integration test for the {@link HybridSource}. */ +public class HybridSourceITCase extends TestLogger { + + // Parallelism cannot exceed number of splits, otherwise test may fail intermittently with: + // Caused by: org.apache.flink.util.FlinkException: An OperatorEvent from an + // OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. + // Event: '[NoMoreSplitEvent]', targetTask: Source: hybrid-source -> Map (1/4) - execution + // #3 + private static final int PARALLELISM = 2; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + + // ------------------------------------------------------------------------ + // test cases + // ------------------------------------------------------------------------ + + /** Test the source in the happy path. */ + @Test + public void testHybridSource() throws Exception { + testHybridSource(FailoverType.NONE, sourceWithFixedSwitchPosition()); + } + + /** Test the source in the happy path with runtime position transfer. */ + @Test + public void testHybridSourceWithDynamicSwitchPosition() throws Exception { + testHybridSource(FailoverType.NONE, sourceWithDynamicSwitchPosition()); + } + + /** Test the source with TaskManager restart. */ + @Test + public void testHybridSourceWithTaskManagerFailover() throws Exception { + testHybridSource(FailoverType.TM, sourceWithFixedSwitchPosition()); + } + + /** Test the source with JobManager failover. */ + @Test + public void testHybridSourceWithJobManagerFailover() throws Exception { + testHybridSource(FailoverType.JM, sourceWithFixedSwitchPosition()); + } + + private Source sourceWithFixedSwitchPosition() { + int numSplits = 2; + int numRecordsPerSplit = EXPECTED_RESULT.size() / 4; + return HybridSource.builder( + new MockBaseSource(numSplits, numRecordsPerSplit, Boundedness.BOUNDED)) + .addSource( + new MockBaseSource(numSplits, numRecordsPerSplit, 20, Boundedness.BOUNDED)) + .build(); + } + + private Source sourceWithDynamicSwitchPosition() { + return HybridSource.builder(new MockBaseSource(2, 10, Boundedness.BOUNDED)) + .addSource( + (enumerator) -> { + // lazily create source here + return new MockBaseSource(2, 10, 20, Boundedness.BOUNDED); + }, + Boundedness.BOUNDED) + .build(); + } + + private void testHybridSource(FailoverType failoverType, Source source) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.setRestartStrategy( + FailoverType.NONE == failoverType + ? RestartStrategies.noRestart() + : RestartStrategies.fixedDelayRestart(1, 0)); + + final DataStream<Integer> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "hybrid-source") + .returns(Integer.class); + + final DataStream<Integer> streamFailingInTheMiddleOfReading = + RecordCounterToFail.wrapWithFailureAfter(stream, EXPECTED_RESULT.size() / 2); + + final ClientAndIterator<Integer> client = + DataStreamUtils.collectWithClient( + streamFailingInTheMiddleOfReading, + HybridSourceITCase.class.getSimpleName() + '-' + failoverType.name()); + final JobID jobId = client.client.getJobID(); + + RecordCounterToFail.waitToFail(); + triggerFailover( + failoverType, + jobId, + RecordCounterToFail::continueProcessing, + miniClusterResource.getMiniCluster()); + + final List<Integer> result = new ArrayList<>(); + while (result.size() < EXPECTED_RESULT.size() && client.iterator.hasNext()) { + result.add(client.iterator.next()); + } + + verifyResult(result); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + private enum FailoverType { + NONE, + TM, + JM + } + + private static void triggerFailover( + FailoverType type, JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) + throws Exception { + switch (type) { + case NONE: + afterFailAction.run(); + break; + case TM: + restartTaskManager(afterFailAction, miniCluster); + break; + case JM: + triggerJobManagerFailover(jobId, afterFailAction, miniCluster); + break; + } + } + + private static void triggerJobManagerFailover( + JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception { + final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + haLeadershipControl.grantJobMasterLeadership(jobId).get(); + } + + private static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster) + throws Exception { + miniCluster.terminateTaskManager(0).get(); + afterFailAction.run(); + miniCluster.startTaskManager(); + } + + // ------------------------------------------------------------------------ + // verification + // ------------------------------------------------------------------------ + private static final List<Integer> EXPECTED_RESULT = + IntStream.rangeClosed(0, 39).boxed().collect(Collectors.toList()); + + private static void verifyResult(List<Integer> result) { + Collections.sort(result); + assertThat(result, equalTo(EXPECTED_RESULT)); + } + + // ------------------------------------------------------------------------ + // mini cluster failover utilities + // ------------------------------------------------------------------------ + + private static class RecordCounterToFail { + + private static AtomicInteger records; + private static CompletableFuture<Void> fail; + private static CompletableFuture<Void> continueProcessing; + + private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) { + + records = new AtomicInteger(); + fail = new CompletableFuture<>(); + continueProcessing = new CompletableFuture<>(); + return stream.map( + record -> { + final boolean halfOfInputIsRead = records.incrementAndGet() > failAfter; + final boolean notFailedYet = !fail.isDone(); + if (notFailedYet && halfOfInputIsRead) { + fail.complete(null); + continueProcessing.get(); + } + return record; + }); + } + + private static void waitToFail() throws ExecutionException, InterruptedException { + fail.get(); + } + + private static void continueProcessing() { + continueProcessing.complete(null); + } + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java new file mode 100644 index 0000000..3bf77f3 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.mock.Whitebox; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Tests for {@link HybridSourceReader}. */ +public class HybridSourceReaderTest { + + @Test + public void testReader() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + TestingReaderOutput<Integer> readerOutput = new TestingReaderOutput<>(); + MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED); + + // 2 underlying readers to exercise switch + SourceReader<Integer, MockSourceSplit> mockSplitReader1 = + source.createReader(readerContext); + SourceReader<Integer, MockSourceSplit> mockSplitReader2 = + source.createReader(readerContext); + + Map<Integer, Source> switchedSources = new HashMap<>(); + + HybridSourceReader<Integer> reader = + new HybridSourceReader<>(readerContext, switchedSources); + + Assert.assertThat(readerContext.getSentEvents(), Matchers.emptyIterable()); + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + Assert.assertNull(currentReader(reader)); + Assert.assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(readerOutput)); + + Source source1 = + new MockSource(null, 0) { + @Override + public SourceReader<Integer, MockSourceSplit> createReader( + SourceReaderContext readerContext) { + return mockSplitReader1; + } + }; + reader.handleSourceEvents(new SwitchSourceEvent(0, source1, false)); + Assert.assertEquals(source1, switchedSources.get(0)); + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1); + mockSplit.addRecord(0); + HybridSourceSplit hybridSplit = new HybridSourceSplit(0, mockSplit); + reader.addSplits(Collections.singletonList(hybridSplit)); + + // drain splits + InputStatus status = reader.pollNext(readerOutput); + while (readerOutput.getEmittedRecords().isEmpty() || status == InputStatus.MORE_AVAILABLE) { + status = reader.pollNext(readerOutput); + Thread.sleep(10); + } + Assert.assertThat(readerOutput.getEmittedRecords(), Matchers.contains(0)); + reader.pollNext(readerOutput); + Assert.assertEquals( + "before notifyNoMoreSplits", + InputStatus.NOTHING_AVAILABLE, + reader.pollNext(readerOutput)); + + reader.notifyNoMoreSplits(); + reader.pollNext(readerOutput); + assertAndClearSourceReaderFinishedEvent(readerContext, 0); + + Assert.assertEquals( + "reader before switch source event", mockSplitReader1, currentReader(reader)); + + Source source2 = + new MockSource(null, 0) { + @Override + public SourceReader<Integer, MockSourceSplit> createReader( + SourceReaderContext readerContext) { + return mockSplitReader2; + } + }; + reader.handleSourceEvents(new SwitchSourceEvent(1, source2, true)); + Assert.assertEquals( + "reader after switch source event", mockSplitReader2, currentReader(reader)); + + reader.notifyNoMoreSplits(); + Assert.assertEquals( + "reader 1 after notifyNoMoreSplits", + InputStatus.END_OF_INPUT, + reader.pollNext(readerOutput)); + + reader.close(); + } + + @Test + public void testReaderRecovery() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + TestingReaderOutput<Integer> readerOutput = new TestingReaderOutput<>(); + MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED); + + Map<Integer, Source> switchedSources = new HashMap<>(); + + HybridSourceReader<Integer> reader = + new HybridSourceReader<>(readerContext, switchedSources); + + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + Assert.assertEquals(source, switchedSources.get(0)); + + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + // mockSplit.addRecord(0); + HybridSourceSplit hybridSplit = new HybridSourceSplit(0, mockSplit); + reader.addSplits(Collections.singletonList(hybridSplit)); + + List<HybridSourceSplit> snapshot = reader.snapshotState(0); + Assert.assertThat(snapshot, Matchers.contains(hybridSplit)); + + // reader recovery + readerContext.clearSentEvents(); + switchedSources = new HashMap<>(); + reader = new HybridSourceReader<>(readerContext, switchedSources); + + reader.addSplits(snapshot); + Assert.assertNull(currentReader(reader)); + + reader.start(); + Assert.assertNull(currentReader(reader)); + + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + Assert.assertNotNull(currentReader(reader)); + Assert.assertEquals(source, switchedSources.get(0)); + Assert.assertThat(reader.snapshotState(1), Matchers.contains(hybridSplit)); + + reader.close(); + } + + private static SourceReader<Integer, MockSourceSplit> currentReader( + HybridSourceReader<?> reader) { + return (SourceReader) Whitebox.getInternalState(reader, "currentReader"); + } + + private static void assertAndClearSourceReaderFinishedEvent( + TestingReaderContext context, int sourceIndex) { + Assert.assertThat(context.getSentEvents(), Matchers.iterableWithSize(1)); + Assert.assertEquals( + sourceIndex, + ((SourceReaderFinishedEvent) context.getSentEvents().get(0)).sourceIndex()); + context.clearSentEvents(); + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java new file mode 100644 index 0000000..2a8fd57 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator; +import org.apache.flink.mock.Whitebox; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** Tests for {@link HybridSourceSplitEnumerator}. */ +public class HybridSourceSplitEnumeratorTest { + + private static final int SUBTASK0 = 0; + private static final int SUBTASK1 = 1; + + private HybridSource<Integer> source; + private MockSplitEnumeratorContext<HybridSourceSplit> context; + private HybridSourceSplitEnumerator enumerator; + private HybridSourceSplit splitFromSource0; + private HybridSourceSplit splitFromSource1; + + private void setupEnumeratorAndTriggerSourceSwitch() { + context = new MockSplitEnumeratorContext<>(2); + source = + HybridSource.builder(new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .addSource(new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .build(); + + enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context); + enumerator.start(); + // mock enumerator assigns splits once all readers are registered + registerReader(context, enumerator, SUBTASK0); + assertThat(context.getSplitsAssignmentSequence(), Matchers.emptyIterable()); + registerReader(context, enumerator, SUBTASK1); + assertThat(context.getSplitsAssignmentSequence(), Matchers.emptyIterable()); + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1)); + assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(0)); + enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1)); + assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(1)); + splitFromSource0 = + context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0); + assertEquals(0, splitFromSource0.sourceIndex()); + assertEquals(0, getCurrentSourceIndex(enumerator)); + + // trigger source switch + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0)); + assertEquals("one reader finished", 0, getCurrentSourceIndex(enumerator)); + enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(0)); + assertEquals("both readers finished", 1, getCurrentSourceIndex(enumerator)); + assertThat( + "switch triggers split assignment", + context.getSplitsAssignmentSequence(), + Matchers.iterableWithSize(2)); + splitFromSource1 = + context.getSplitsAssignmentSequence().get(1).assignment().get(SUBTASK0).get(0); + assertEquals(1, splitFromSource1.sourceIndex()); + enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(SUBTASK1)); + assertEquals("reader without assignment", 1, getCurrentSourceIndex(enumerator)); + } + + @Test + public void testRegisterReaderAfterSwitchAndReaderReset() { + setupEnumeratorAndTriggerSourceSwitch(); + + // add split of previous source back (simulates reader reset during recovery) + context.getSplitsAssignmentSequence().clear(); + enumerator.addReader(SUBTASK0); + enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0); + assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(0)); + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1)); + assertSplitAssignment( + "addSplitsBack triggers assignment when reader registered", + context, + 1, + splitFromSource0, + SUBTASK0); + + // remove reader from context + context.getSplitsAssignmentSequence().clear(); + context.unregisterReader(SUBTASK0); + enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0); + assertThat( + "addSplitsBack doesn't trigger assignment when reader not registered", + context.getSplitsAssignmentSequence(), + Matchers.emptyIterable()); + registerReader(context, enumerator, SUBTASK0); + assertThat(context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(0)); + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1)); + assertSplitAssignment( + "registerReader triggers assignment", context, 1, splitFromSource0, SUBTASK0); + } + + @Test + public void testHandleSplitRequestAfterSwitchAndReaderReset() { + setupEnumeratorAndTriggerSourceSwitch(); + + UnderlyingEnumeratorWrapper underlyingEnumeratorWrapper = + new UnderlyingEnumeratorWrapper( + (MockSplitEnumerator) + Whitebox.getInternalState(enumerator, "currentEnumerator")); + Whitebox.setInternalState(enumerator, "currentEnumerator", underlyingEnumeratorWrapper); + + List<MockSourceSplit> mockSourceSplits = + (List<MockSourceSplit>) + Whitebox.getInternalState(underlyingEnumeratorWrapper.enumerator, "splits"); + assertThat(mockSourceSplits, Matchers.emptyIterable()); + + // simulate reader reset to before switch by adding split of previous source back + context.getSplitsAssignmentSequence().clear(); + assertEquals("current enumerator", 1, getCurrentSourceIndex(enumerator)); + + assertThat(underlyingEnumeratorWrapper.handleSplitRequests, Matchers.emptyIterable()); + enumerator.handleSplitRequest(SUBTASK0, "fakehostname"); + + assertSplitAssignment( + "handleSplitRequest triggers assignment of split by underlying enumerator", + context, + 1, + new HybridSourceSplit(1, UnderlyingEnumeratorWrapper.SPLIT_1), + SUBTASK0); + + // handleSplitRequest invalid during reset + enumerator.addSplitsBack(Collections.singletonList(splitFromSource0), SUBTASK0); + try { + enumerator.handleSplitRequest(SUBTASK0, "fakehostname"); + Assert.fail("expected exception"); + } catch (IllegalStateException ex) { + } + } + + @Test + public void testRestoreEnumerator() throws Exception { + setupEnumeratorAndTriggerSourceSwitch(); + enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context); + enumerator.start(); + HybridSourceEnumeratorState enumeratorState = enumerator.snapshotState(0); + Assert.assertEquals(1, ((List) enumeratorState.getWrappedState()).size()); + enumerator = + (HybridSourceSplitEnumerator) source.restoreEnumerator(context, enumeratorState); + enumerator.start(); + enumeratorState = enumerator.snapshotState(0); + Assert.assertEquals(1, ((List) enumeratorState.getWrappedState()).size()); + } + + private static class UnderlyingEnumeratorWrapper + implements SplitEnumerator<MockSourceSplit, Object> { + private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1); + private final List<Integer> handleSplitRequests = new ArrayList<>(); + private final MockSplitEnumerator enumerator; + private final SplitEnumeratorContext context; + + private UnderlyingEnumeratorWrapper(MockSplitEnumerator enumerator) { + this.enumerator = enumerator; + this.context = + (SplitEnumeratorContext) Whitebox.getInternalState(enumerator, "context"); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + handleSplitRequests.add(subtaskId); + context.assignSplits(new SplitsAssignment(SPLIT_1, subtaskId)); + } + + @Override + public void start() { + throw new UnsupportedOperationException(); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + enumerator.addSplitsBack(splits, subtaskId); + } + + @Override + public void addReader(int subtaskId) { + enumerator.addReader(subtaskId); + } + + @Override + public Object snapshotState(long checkpointId) throws Exception { + return enumerator.snapshotState(checkpointId); + } + + @Override + public void close() throws IOException { + enumerator.close(); + } + } + + private static void assertSplitAssignment( + String reason, + MockSplitEnumeratorContext<HybridSourceSplit> context, + int size, + HybridSourceSplit split, + int subtask) { + assertThat(reason, context.getSplitsAssignmentSequence(), Matchers.iterableWithSize(size)); + assertEquals( + reason, + split, + context.getSplitsAssignmentSequence() + .get(size - 1) + .assignment() + .get(subtask) + .get(0)); + } + + private static void registerReader( + MockSplitEnumeratorContext<HybridSourceSplit> context, + HybridSourceSplitEnumerator enumerator, + int reader) { + context.registerReader(new ReaderInfo(reader, "location 0")); + enumerator.addReader(reader); + } + + private static int getCurrentSourceIndex(HybridSourceSplitEnumerator enumerator) { + return (int) Whitebox.getInternalState(enumerator, "currentSourceIndex"); + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java new file mode 100644 index 0000000..d43275f --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** Tests for {@link HybridSourceSplitSerializer}. */ +public class HybridSourceSplitSerializerTest { + + @Test + public void testSerialization() throws Exception { + Map<Integer, Source> switchedSources = new HashMap<>(); + switchedSources.put(0, new MockSource(null, 0)); + HybridSourceSplitSerializer serializer = new HybridSourceSplitSerializer(switchedSources); + HybridSourceSplit split = new HybridSourceSplit(0, new MockSourceSplit(1)); + byte[] serialized = serializer.serialize(split); + HybridSourceSplit clonedSplit = serializer.deserialize(0, serialized); + Assert.assertEquals(split, clonedSplit); + + try { + serializer.deserialize(1, serialized); + Assert.fail(); + } catch (IOException e) { + // expected invalid version + } + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java new file mode 100644 index 0000000..786677b --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** Tests for {@link HybridSource}. */ +public class HybridSourceTest { + + @Test + public void testBoundedness() { + HybridSource<Integer> source; + + source = + HybridSource.builder(new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .addSource(new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .build(); + assertEquals(Boundedness.BOUNDED, source.getBoundedness()); + + source = + HybridSource.builder(new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .addSource(new MockBaseSource(1, 1, Boundedness.CONTINUOUS_UNBOUNDED)) + .build(); + assertEquals(Boundedness.CONTINUOUS_UNBOUNDED, source.getBoundedness()); + + try { + HybridSource.builder(new MockBaseSource(1, 1, Boundedness.CONTINUOUS_UNBOUNDED)) + .addSource(new MockBaseSource(1, 1, Boundedness.CONTINUOUS_UNBOUNDED)) + .build(); + fail("expected exception"); + } catch (IllegalArgumentException e) { + // boundedness check to fail + } + } + + @Test + public void testBuilderWithSourceFactory() { + HybridSource.SourceFactory<Integer, Source<Integer, ?, ?>, MockSplitEnumerator> + sourceFactory = + new HybridSource.SourceFactory< + Integer, Source<Integer, ?, ?>, MockSplitEnumerator>() { + @Override + public Source<Integer, ?, ?> create( + HybridSource.SourceSwitchContext<MockSplitEnumerator> context) { + MockSplitEnumerator enumerator = context.getPreviousEnumerator(); + return new MockBaseSource(1, 1, Boundedness.BOUNDED); + } + }; + + HybridSource<Integer> source = + new HybridSource.HybridSourceBuilder<Integer, MockSplitEnumerator>() + .<MockSplitEnumerator, Source<Integer, ?, ?>>addSource( + new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .addSource(sourceFactory, Boundedness.BOUNDED) + .build(); + assertNotNull(source); + } + + private static class ExtendedMockSplitEnumerator extends MockSplitEnumerator { + public ExtendedMockSplitEnumerator( + List<MockSourceSplit> splits, SplitEnumeratorContext<MockSourceSplit> context) { + super(splits, context); + } + } + + @Test + public void testBuilderWithEnumeratorSuperclass() { + HybridSource.SourceFactory<Integer, Source<Integer, ?, ?>, MockSplitEnumerator> + sourceFactory = + (HybridSource.SourceFactory< + Integer, Source<Integer, ?, ?>, MockSplitEnumerator>) + context -> { + MockSplitEnumerator enumerator = + context.getPreviousEnumerator(); + return new MockBaseSource(1, 1, Boundedness.BOUNDED); + }; + + HybridSource<Integer> source = + new HybridSource.HybridSourceBuilder<Integer, MockSplitEnumerator>() + .<ExtendedMockSplitEnumerator, Source<Integer, ?, ?>>addSource( + new MockBaseSource(1, 1, Boundedness.BOUNDED)) + .addSource(sourceFactory, Boundedness.BOUNDED) + .build(); + assertNotNull(source); + } +}
