This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f4e33b591 [feature][connector][common] Add
`SingleThreadMultiplexSourceReaderBase (#3335)
f4e33b591 is described below
commit f4e33b591278fac52e025ed0e9c4e7a9cb5acfd4
Author: hailin0 <[email protected]>
AuthorDate: Fri Nov 11 10:13:19 2022 +0800
[feature][connector][common] Add `SingleThreadMultiplexSourceReaderBase
(#3335)
* [feature][connector][common] Add `SingleThreadMultiplexSourceReaderBase`
* [translation][spark] Adapt to spark micro batch
---
LICENSE | 1 +
.../common/source/reader/RecordEmitter.java | 47 +++++
.../common/source/reader/RecordsBySplits.java | 62 ++++++
.../common/source/reader/RecordsWithSplitIds.java | 52 +++++
.../SingleThreadMultiplexSourceReaderBase.java | 74 +++++++
.../common/source/reader/SourceReaderBase.java | 235 +++++++++++++++++++++
.../common/source/reader/SourceReaderOptions.java | 54 +++++
.../source/reader/fetcher/AddSplitsTask.java | 48 +++++
.../common/source/reader/fetcher/FetchTask.java | 91 ++++++++
.../reader/fetcher/SingleThreadFetcherManager.java | 67 ++++++
.../common/source/reader/fetcher/SplitFetcher.java | 220 +++++++++++++++++++
.../source/reader/fetcher/SplitFetcherManager.java | 146 +++++++++++++
.../source/reader/fetcher/SplitFetcherTask.java | 34 +++
.../source/reader/splitreader/SplitReader.java | 63 ++++++
.../source/reader/splitreader/SplitsAddition.java | 31 +++
.../source/reader/splitreader/SplitsChange.java | 32 +++
.../spark/common/InternalRowCollector.java | 9 +
.../source/batch/ParallelBatchPartitionReader.java | 5 +-
.../CoordinatedMicroBatchPartitionReader.java | 15 +-
.../micro/ParallelMicroBatchPartitionReader.java | 32 ++-
20 files changed, 1306 insertions(+), 12 deletions(-)
diff --git a/LICENSE b/LICENSE
index 2927f5026..5d51f440a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -213,6 +213,7 @@
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java fro
seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
from https://github.com/apache/flink
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/
from https://github.com/lightbend/config
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/
from https://github.com/apache/flink
+seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/
from https://github.com/apache/flink
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/
from https://github.com/apache/iceberg
generate_client_protocol.sh
from
https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
from https://github.com/hazelcast/hazelcast
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordEmitter.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordEmitter.java
new file mode 100644
index 000000000..9d2313f87
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordEmitter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+
+/**
+ * Emit a record to the downstream.
+ *
+ * @param <E>
+ *
+ * @param <T>
+ *
+ * @param <SplitStateT>
+ *
+ */
+public interface RecordEmitter<E, T, SplitStateT> {
+
+ /**
+ * Process and emit the records to the {@link Collector}.
+ *
+ * @param element
+ *
+ * @param collector
+ *
+ * @param splitState
+ *
+ * @throws Exception
+ *
+ */
+ void emitRecord(E element, Collector<T> collector, SplitStateT splitState)
throws Exception;
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsBySplits.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsBySplits.java
new file mode 100644
index 000000000..82d0807ae
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsBySplits.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
+
+ private final Set<String> finishedSplits;
+ private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator;
+ private Iterator<E> recordsInCurrentSplit;
+
+ public RecordsBySplits(Map<String, Collection<E>> recordsBySplit,
+ Set<String> finishedSplits) {
+ this.splitsIterator = checkNotNull(recordsBySplit,
"recordsBySplit").entrySet().iterator();
+ this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits");
+ }
+
+ @Override
+ public String nextSplit() {
+ if (splitsIterator.hasNext()) {
+ Map.Entry<String, Collection<E>> next = splitsIterator.next();
+ recordsInCurrentSplit = next.getValue().iterator();
+ return next.getKey();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public E nextRecordFromSplit() {
+ if (recordsInCurrentSplit == null) {
+ throw new IllegalStateException();
+ }
+ return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next()
: null;
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsWithSplitIds.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsWithSplitIds.java
new file mode 100644
index 000000000..10cd11b5b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsWithSplitIds.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import java.util.Set;
+
+/**
+ * An interface for the elements passed from the fetchers to the source reader.
+ *
+ * @param <E>
+ *
+ */
+public interface RecordsWithSplitIds<E> {
+
+ /**
+ * Moves to the next split.
+ *
+ * @return Returns null, if no splits are left.
+ */
+ String nextSplit();
+
+ /**
+ * Gets the next record from the current split.
+ *
+ * @return Returns null if no more records are left in this split.
+ */
+ E nextRecordFromSplit();
+
+ /**
+ * Get the finished splits.
+ *
+ * @return
+ */
+ Set<String> finishedSplits();
+
+ default void recycle() {}
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SingleThreadMultiplexSourceReaderBase.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SingleThreadMultiplexSourceReaderBase.java
new file mode 100644
index 000000000..3f670458c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
+
+/**
+ * A base for {@link SourceReader}s that read splits with one thread using one
{@link SplitReader}.
+ *
+ * @param <E> The type of the records (the raw type that typically contains
checkpointing information).
+ *
+ * @param <T> The final type of the records emitted by the source.
+ *
+ * @param <SplitT>
+ *
+ * @param <SplitStateT>
+ *
+ */
+public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT
extends SourceSplit, SplitStateT>
+ extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+
+ public SingleThreadMultiplexSourceReaderBase(Supplier<SplitReader<E,
SplitT>> splitReaderSupplier,
+ RecordEmitter<E, T,
SplitStateT> recordEmitter,
+ SourceReaderOptions options,
+ SourceReader.Context context)
{
+ this(new ArrayBlockingQueue<>(options.getElementQueueCapacity()),
+ splitReaderSupplier,
+ recordEmitter,
+ options,
+ context);
+ }
+
+ public
SingleThreadMultiplexSourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E,
SplitT>> splitReaderSupplier,
+ RecordEmitter<E, T,
SplitStateT> recordEmitter,
+ SourceReaderOptions options,
+ SourceReader.Context context)
{
+ super(elementsQueue,
+ new SingleThreadFetcherManager<>(elementsQueue,
splitReaderSupplier),
+ recordEmitter,
+ options,
+ context);
+ }
+
+ public
SingleThreadMultiplexSourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ SingleThreadFetcherManager<E,
SplitT> splitFetcherManager,
+ RecordEmitter<E, T,
SplitStateT> recordEmitter,
+ SourceReaderOptions options,
+ SourceReader.Context context)
{
+ super(elementsQueue, splitFetcherManager, recordEmitter, options,
context);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
new file mode 100644
index 000000000..9c81c9b73
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An abstract implementation of {@link SourceReader} which provides some
synchronization between
+ * the mail box main thread and the SourceReader internal threads. This class
allows user to just
+ * provide a {@link SplitReader} and snapshot the split state.
+ *
+ * @param <E> The type of the records (the raw type that typically
contains checkpointing information).
+ *
+ * @param <T> The final type of the records emitted by the source.
+ *
+ * @param <SplitT>
+ *
+ * @param <SplitStateT>
+ *
+ */
+@Slf4j
+public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit,
SplitStateT>
+ implements SourceReader<T, SplitT> {
+ private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final Map<String, SplitContext<T, SplitStateT>> splitStates;
+ protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
+ protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
+ protected final SourceReaderOptions options;
+ protected final SourceReader.Context context;
+
+ private RecordsWithSplitIds<E> currentFetch;
+ private SplitContext<T, SplitStateT> currentSplitContext;
+ private Collector<T> currentSplitOutput;
+ private boolean noMoreSplitsAssignment;
+
+ public SourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ SplitFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ SourceReaderOptions options,
+ SourceReader.Context context) {
+ this.elementsQueue = elementsQueue;
+ this.splitFetcherManager = splitFetcherManager;
+ this.recordEmitter = recordEmitter;
+ this.splitStates = new HashMap<>();
+ this.options = options;
+ this.context = context;
+ }
+
+ @Override
+ public void open() {
+ log.info("Open Source Reader.");
+ }
+
+ @Override
+ public void pollNext(Collector<T> output) throws Exception {
+ RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
+ if (recordsWithSplitId == null) {
+ recordsWithSplitId = getNextFetch(output);
+ if (recordsWithSplitId == null) {
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())
+ && noMoreSplitsAssignment
+ && splitFetcherManager.maybeShutdownFinishedFetchers()
+ && elementsQueue.isEmpty()) {
+ context.signalNoMoreElement();
+ log.info("Send NoMoreElement event");
+ }
+ return;
+ }
+ }
+
+ E record = recordsWithSplitId.nextRecordFromSplit();
+ if (record != null) {
+ synchronized (output.getCheckpointLock()) {
+ recordEmitter.emitRecord(record, currentSplitOutput,
currentSplitContext.state);
+ }
+ log.trace("Emitted record: {}", record);
+ } else if (!moveToNextSplit(recordsWithSplitId, output)) {
+ pollNext(output);
+ }
+ }
+
+ @Override
+ public List<SplitT> snapshotState(long checkpointId) {
+ List<SplitT> splits = new ArrayList<>();
+ splitStates.forEach((id, context) -> splits.add(toSplitType(id,
context.state)));
+ log.info("Snapshot state from splits: {}", splits);
+ return splits;
+ }
+
+ @Override
+ public void addSplits(List<SplitT> splits) {
+ log.info("Adding split(s) to reader: {}", splits);
+ splits.forEach(split -> {
+ // Initialize the state for each split.
+ splitStates.put(split.splitId(), new
SplitContext<>(split.splitId(), initializedState(split)));
+ });
+ splitFetcherManager.addSplits(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ log.info("Reader received NoMoreSplits event.");
+ noMoreSplitsAssignment = true;
+ }
+
+ @Override
+ public void handleSourceEvent(SourceEvent sourceEvent) {
+ log.info("Received unhandled source event: {}", sourceEvent);
+ }
+
+ @Override
+ public void close() {
+ log.info("Closing Source Reader.");
+ try {
+ splitFetcherManager.close(options.getSourceReaderCloseTimeout());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private RecordsWithSplitIds<E> getNextFetch(Collector<T> output) {
+ splitFetcherManager.checkErrors();
+ RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
+ if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId,
output)) {
+ log.info("Current fetch is finished.");
+ return null;
+ }
+
+ currentFetch = recordsWithSplitId;
+ return recordsWithSplitId;
+ }
+
+ private boolean moveToNextSplit(RecordsWithSplitIds<E>
recordsWithSplitIds, Collector<T> output) {
+ final String nextSplitId = recordsWithSplitIds.nextSplit();
+ if (nextSplitId == null) {
+ log.trace("Current fetch is finished.");
+ finishCurrentFetch(recordsWithSplitIds, output);
+ return false;
+ }
+
+ currentSplitContext = splitStates.get(nextSplitId);
+ checkState(currentSplitContext != null, "Have records for a split that
was not registered");
+ currentSplitOutput =
currentSplitContext.getOrCreateSplitOutput(output);
+ log.trace("Emitting records from fetch for split {}", nextSplitId);
+ return true;
+ }
+
+ private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final
Collector<T> output) {
+ currentFetch = null;
+ currentSplitContext = null;
+ currentSplitOutput = null;
+
+ Set<String> finishedSplits = fetch.finishedSplits();
+ if (!finishedSplits.isEmpty()) {
+ log.info("Finished reading split(s) {}", finishedSplits);
+ Map<String, SplitStateT> stateOfFinishedSplits = new HashMap<>();
+ for (String finishedSplitId : finishedSplits) {
+ stateOfFinishedSplits.put(finishedSplitId,
splitStates.remove(finishedSplitId).state);
+ }
+ onSplitFinished(stateOfFinishedSplits);
+ }
+
+ fetch.recycle();
+ }
+
+ /**
+ * Handles the finished splits to clean the state if needed.
+ *
+ * @param finishedSplitIds
+ *
+ */
+ protected abstract void onSplitFinished(Map<String, SplitStateT>
finishedSplitIds);
+
+ /**
+ * When new splits are added to the reader. The initialize the state of
the new splits.
+ *
+ * @param split a newly added split.
+ */
+ protected abstract SplitStateT initializedState(SplitT split);
+
+ /**
+ * Convert a mutable SplitStateT to immutable SplitT.
+ *
+ * @param splitState splitState.
+ * @return an immutable Split state.
+ */
+ protected abstract SplitT toSplitType(String splitId, SplitStateT
splitState);
+
+ @RequiredArgsConstructor
+ private static final class SplitContext<T, SplitStateT> {
+ final String splitId;
+ final SplitStateT state;
+ Collector<T> splitOutput;
+
+ Collector<T> getOrCreateSplitOutput(Collector<T> output) {
+ if (splitOutput == null) {
+ splitOutput = output;
+ }
+ return splitOutput;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
new file mode 100644
index 000000000..5df2efe55
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+
+@Getter
+@SuppressWarnings("MagicNumber")
+public class SourceReaderOptions {
+ public static final Option<Long> SOURCE_READER_CLOSE_TIMEOUT =
+ Options.key("source.reader.close.timeout")
+ .longType()
+ .defaultValue(30000L)
+ .withDescription("The timeout when closing the source reader");
+
+ public static final Option<Integer> ELEMENT_QUEUE_CAPACITY =
+ Options.key("source.reader.element.queue.capacity")
+ .intType()
+ .defaultValue(2)
+ .withDescription("The capacity of the element queue in the source
reader.");
+
+ public final long sourceReaderCloseTimeout;
+ public final int elementQueueCapacity;
+
+ public SourceReaderOptions(Config config) {
+ this(ReadonlyConfig.fromConfig(config));
+ }
+
+ public SourceReaderOptions(ReadonlyConfig config) {
+ this.sourceReaderCloseTimeout =
config.get(SOURCE_READER_CLOSE_TIMEOUT);
+ this.elementQueueCapacity = config.get(ELEMENT_QUEUE_CAPACITY);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/AddSplitsTask.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/AddSplitsTask.java
new file mode 100644
index 000000000..41e527a36
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/AddSplitsTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
+
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.util.Collection;
+import java.util.Map;
+
+@RequiredArgsConstructor
+@ToString(of = {"splitsToAdd"})
+class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
+ private final SplitReader<?, SplitT> splitReader;
+ private final Collection<SplitT> splitsToAdd;
+ private final Map<String, SplitT> assignedSplits;
+
+ @Override
+ public void run() {
+ for (SplitT s : splitsToAdd) {
+ assignedSplits.put(s.splitId(), s);
+ }
+ splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
+ }
+
+ @Override
+ public void wakeUp() {
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
new file mode 100644
index 000000000..42b71c927
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+@Slf4j
+@RequiredArgsConstructor
+class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
+ private static final int OFFER_TIMEOUT_MILLIS = 10000;
+
+ private final SplitReader<E, SplitT> splitReader;
+ private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final Consumer<Collection<String>> splitFinishedCallback;
+ private final int fetcherIndex;
+
+ @Getter(value = AccessLevel.PRIVATE)
+ private volatile boolean wakeup;
+ private volatile RecordsWithSplitIds<E> lastRecords;
+
+ @Override
+ public void run() throws IOException {
+ try {
+ if (!isWakeup() && lastRecords == null) {
+ lastRecords = splitReader.fetch();
+ log.debug("Fetch records from split fetcher {}", fetcherIndex);
+ }
+
+ if (!isWakeup()) {
+ if (elementsQueue.offer(lastRecords, OFFER_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS)) {
+ if (!lastRecords.finishedSplits().isEmpty()) {
+
splitFinishedCallback.accept(lastRecords.finishedSplits());
+ }
+ lastRecords = null;
+ log.debug("Enqueued records from split fetcher {}",
fetcherIndex);
+ } else {
+ log.debug("Enqueuing timed out in split fetcher {}, queue
is blocked", fetcherIndex);
+ }
+ }
+ } catch (InterruptedException e) {
+ // this should only happen on shutdown
+ throw new IOException("Source fetch execution was interrupted", e);
+ } finally {
+ // clean up the potential wakeup effect.
+ if (isWakeup()) {
+ wakeup = false;
+ }
+ }
+ }
+
+ @Override
+ public void wakeUp() {
+ // Set the wakeup flag first.
+ wakeup = true;
+
+ if (lastRecords == null) {
+ splitReader.wakeUp();
+ } else {
+ // interrupt enqueuing the records
+ // or waitting records offer into queue timeout, see {@link #run()}
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SingleThreadFetcherManager.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SingleThreadFetcherManager.java
new file mode 100644
index 000000000..137cbe79b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A Fetcher Manager with a single fetching thread (I/O thread) that handles
all splits concurrently.
+ *
+ * @param <E>
+ *
+ * @param <SplitT>
+ *
+ */
+public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
+ extends SplitFetcherManager<E, SplitT> {
+
+ public SingleThreadFetcherManager(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E, SplitT>>
splitReaderSupplier) {
+ super(elementsQueue, splitReaderSupplier);
+ }
+
+ public SingleThreadFetcherManager(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E, SplitT>>
splitReaderSupplier,
+ Consumer<Collection<String>>
splitFinishedHook) {
+ super(elementsQueue, splitReaderSupplier, splitFinishedHook);
+ }
+
+ @Override
+ public void addSplits(Collection<SplitT> splitsToAdd) {
+ SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
+ if (fetcher == null) {
+ fetcher = createSplitFetcher();
+ fetcher.addSplits(splitsToAdd);
+
+ startFetcher(fetcher);
+ } else {
+ fetcher.addSplits(splitsToAdd);
+ }
+ }
+
+ protected SplitFetcher<E, SplitT> getRunningFetcher() {
+ return fetchers.isEmpty() ? null : fetchers.values().iterator().next();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
new file mode 100644
index 000000000..b48b4d5c6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+@Slf4j
+public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
+ @Getter
+ private final int fetcherId;
+ private final Deque<SplitFetcherTask> taskQueue = new ArrayDeque<>();
+ @Getter
+ private final Map<String, SplitT> assignedSplits = new HashMap<>();
+ @Getter
+ private final SplitReader<E, SplitT> splitReader;
+ private final Consumer<Throwable> errorHandler;
+ private final Runnable shutdownHook;
+ private final FetchTask fetchTask;
+
+ private volatile boolean closed;
+ private volatile SplitFetcherTask runningTask = null;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition nonEmpty = lock.newCondition();
+
+ SplitFetcher(int fetcherId,
+ @NonNull BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+ @NonNull SplitReader<E, SplitT> splitReader,
+ @NonNull Consumer<Throwable> errorHandler,
+ @NonNull Runnable shutdownHook,
+ @NonNull Consumer<Collection<String>> splitFinishedHook) {
+ this.fetcherId = fetcherId;
+ this.splitReader = splitReader;
+ this.errorHandler = errorHandler;
+ this.shutdownHook = shutdownHook;
+ this.fetchTask = new FetchTask<>(
+ splitReader,
+ elementsQueue,
+ finishedSplits -> {
+ finishedSplits.forEach(assignedSplits::remove);
+ splitFinishedHook.accept(finishedSplits);
+ log.info("Finished reading from splits {}", finishedSplits);
+ },
+ fetcherId);
+ }
+
+ @Override
+ public void run() {
+ log.info("Starting split fetcher {}", fetcherId);
+ try {
+ while (runOnce()) {
+ // nothing to do, everything is inside #runOnce.
+ }
+ } catch (Throwable t) {
+ errorHandler.accept(t);
+ } finally {
+ try {
+ splitReader.close();
+ } catch (Exception e) {
+ errorHandler.accept(e);
+ } finally {
+ log.info("Split fetcher {} exited.", fetcherId);
+ shutdownHook.run();
+ }
+ }
+ }
+
+ public void addSplits(@NonNull Collection<SplitT> splitsToAdd) {
+ lock.lock();
+ try {
+ addTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd,
assignedSplits));
+ wakeUpUnsafe(true);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void addTask(@NonNull SplitFetcherTask task) {
+ lock.lock();
+ try {
+ addTaskUnsafe(task);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void shutdown() {
+ lock.lock();
+ try {
+ if (!closed) {
+ closed = true;
+ log.info("Shutting down split fetcher {}", fetcherId);
+ wakeUpUnsafe(false);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isIdle() {
+ lock.lock();
+ try {
+ return assignedSplits.isEmpty() && taskQueue.isEmpty() &&
runningTask == null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean runOnce() {
+ lock.lock();
+ SplitFetcherTask nextTask;
+ try {
+ if (closed) {
+ return false;
+ }
+
+ nextTask = getNextTaskUnsafe();
+ if (nextTask == null) {
+ // (spurious) wakeup, so just repeat
+ return true;
+ }
+
+ log.debug("Prepare to run {}", nextTask);
+ // store task for #wakeUp
+ this.runningTask = nextTask;
+ } finally {
+ lock.unlock();
+ }
+
+ // execute the task outside of lock, so that it can be woken up
+ try {
+ nextTask.run();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("SplitFetcher thread %d received unexpected
exception while polling the records",
+ fetcherId), e);
+ }
+
+ // re-acquire lock as all post-processing steps, need it
+ lock.lock();
+ try {
+ this.runningTask = null;
+ } finally {
+ lock.unlock();
+ }
+ return true;
+ }
+
+ private SplitFetcherTask getNextTaskUnsafe() {
+ assert lock.isHeldByCurrentThread();
+
+ try {
+ if (!taskQueue.isEmpty()) {
+ // execute tasks in taskQueue first
+ return taskQueue.poll();
+ } else if (!assignedSplits.isEmpty()) {
+ // use fallback task = fetch if there is at least one split
+ return fetchTask;
+ } else {
+ // nothing to do, wait for signal
+ nonEmpty.await();
+ return taskQueue.poll();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("The thread was interrupted while
waiting for a fetcher task.");
+ }
+ }
+
+ private void wakeUpUnsafe(boolean taskOnly) {
+ assert lock.isHeldByCurrentThread();
+
+ SplitFetcherTask currentTask = runningTask;
+ if (currentTask != null) {
+ log.debug("Waking up running task {}", currentTask);
+ currentTask.wakeUp();
+ } else if (!taskOnly) {
+ log.debug("Waking up fetcher thread.");
+ nonEmpty.signal();
+ }
+ }
+
+ private void addTaskUnsafe(SplitFetcherTask task) {
+ assert lock.isHeldByCurrentThread();
+
+ taskQueue.add(task);
+ nonEmpty.signal();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.java
new file mode 100644
index 000000000..203372774
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The split fetcher manager could be used to support different threading
models by implementing
+ * the {@link #addSplits(Collection)} method differently. For example, a
single thread split fetcher
+ * manager would only start a single fetcher and assign all the splits to it.
A one-thread-per-split
+ * fetcher may spawn a new thread every time a new split is assigned.
+ *
+ * @param <E>
+ *
+ * @param <SplitT>
+ *
+ */
+@Slf4j
+public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
+ protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
+ private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final Supplier<SplitReader<E, SplitT>> splitReaderFactory;
+ private final Consumer<Collection<String>> splitFinishedHook;
+ private final AtomicInteger fetcherIdGenerator;
+ private final AtomicReference<Throwable> uncaughtFetcherException;
+ private final Consumer<Throwable> errorHandler;
+ private final ExecutorService executors;
+ private volatile boolean closed;
+
+ public SplitFetcherManager(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E, SplitT>>
splitReaderFactory) {
+ this(elementsQueue, splitReaderFactory, ignore -> {});
+ }
+
+ public SplitFetcherManager(BlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E, SplitT>>
splitReaderFactory,
+ Consumer<Collection<String>> splitFinishedHook)
{
+ this.fetchers = new ConcurrentHashMap<>();
+ this.elementsQueue = elementsQueue;
+ this.splitReaderFactory = splitReaderFactory;
+ this.splitFinishedHook = splitFinishedHook;
+ this.fetcherIdGenerator = new AtomicInteger(0);
+ this.uncaughtFetcherException = new AtomicReference<>(null);
+ this.errorHandler = throwable -> {
+ log.error("Received uncaught exception.", throwable);
+ if (!uncaughtFetcherException.compareAndSet(null, throwable)) {
+ // Add the exception to the exception list.
+ uncaughtFetcherException.get().addSuppressed(throwable);
+ }
+ };
+ String taskThreadName = Thread.currentThread().getName();
+ this.executors = Executors.newCachedThreadPool(
+ r -> new Thread(r, "Source Data Fetcher for " + taskThreadName));
+ }
+
+ public abstract void addSplits(Collection<SplitT> splitsToAdd);
+
+ protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
+ executors.submit(fetcher);
+ }
+
+ protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
+ if (closed) {
+ throw new IllegalStateException("The split fetcher manager has
closed.");
+ }
+ // Create SplitReader.
+ SplitReader<E, SplitT> splitReader = splitReaderFactory.get();
+ int fetcherId = fetcherIdGenerator.getAndIncrement();
+ SplitFetcher<E, SplitT> splitFetcher =
+ new SplitFetcher<>(
+ fetcherId,
+ elementsQueue,
+ splitReader,
+ errorHandler,
+ () -> {
+ fetchers.remove(fetcherId);
+ },
+ this.splitFinishedHook);
+ fetchers.put(fetcherId, splitFetcher);
+ return splitFetcher;
+ }
+
+ public synchronized boolean maybeShutdownFinishedFetchers() {
+ Iterator<Map.Entry<Integer, SplitFetcher<E, SplitT>>> iter =
fetchers.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Integer, SplitFetcher<E, SplitT>> entry = iter.next();
+ SplitFetcher<E, SplitT> fetcher = entry.getValue();
+ if (fetcher.isIdle()) {
+ log.info("Closing splitFetcher {} because it is idle.",
entry.getKey());
+ fetcher.shutdown();
+ iter.remove();
+ }
+ }
+ return fetchers.isEmpty();
+ }
+
+ public synchronized void close(long timeoutMs) throws Exception {
+ closed = true;
+ fetchers.values().forEach(SplitFetcher::shutdown);
+ executors.shutdown();
+ if (!executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+ log.warn("Failed to close the source reader in {} ms. There are
still {} split fetchers running",
+ timeoutMs,
+ fetchers.size());
+ }
+ }
+
+ public void checkErrors() {
+ if (uncaughtFetcherException.get() != null) {
+ throw new RuntimeException("One or more fetchers have encountered
exception",
+ uncaughtFetcherException.get());
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherTask.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherTask.java
new file mode 100644
index 000000000..d36f976de
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import java.io.IOException;
+
+public interface SplitFetcherTask {
+
+ /**
+ * Run the logic. This method allows throwing an interrupted exception on
wakeup, but the
+ * implementation does not have to.
+ */
+ void run() throws IOException;
+
+ /**
+ * Wake up the running thread.
+ */
+ void wakeUp();
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitReader.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitReader.java
new file mode 100644
index 000000000..d5ec80754
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitReader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+
+import java.io.IOException;
+
+/**
+ *
+ * An interface used to read from splits.
+ *
+ */
+public interface SplitReader<E, SplitT extends SourceSplit> {
+
+ /**
+ * Fetch elements into the blocking queue for the given splits. The fetch
call could be blocking
+ * but it should get unblocked when {@link #wakeUp()} is invoked. In that
case, the
+ * implementation may either decide to return without throwing an
exception, or it can just
+ * throw an interrupted exception. In either case, this method should be
reentrant, meaning that
+ * the next fetch call should just resume from where the last fetch call
was waken up or
+ * interrupted.
+ *
+ */
+ RecordsWithSplitIds<E> fetch() throws IOException;
+
+ /**
+ * Handle the split changes. This call should be non-blocking.
+ *
+ * @param splitsChanges
+ *
+ */
+ void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
+
+ /**
+ * Wake up the split reader in case the fetcher thread is blocking in
{@link #fetch()}.
+ */
+ void wakeUp();
+
+ /**
+ * Close the split reader.
+ *
+ * @throws Exception
+ *
+ */
+ void close() throws Exception;
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsAddition.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsAddition.java
new file mode 100644
index 000000000..79d365330
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsAddition.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader;
+
+import java.util.Collection;
+
+public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {
+
+ public SplitsAddition(Collection<SplitT> splits) {
+ super(splits);
+ }
+
+ public String toString() {
+ return String.format("SplitAddition:[%s]", splits());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsChange.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsChange.java
new file mode 100644
index 000000000..1b21a9a2d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsChange.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader;
+
+import lombok.AllArgsConstructor;
+
+import java.util.Collection;
+import java.util.Collections;
+
+@AllArgsConstructor
+public abstract class SplitsChange<SplitT> {
+ private final Collection<SplitT> splits;
+
+ public Collection<SplitT> splits() {
+ return Collections.unmodifiableCollection(splits);
+ }
+}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
index 1a02c3a43..892873feb 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
@@ -25,15 +25,19 @@ import
org.apache.seatunnel.translation.spark.common.serialization.InternalRowCo
import org.apache.spark.sql.catalyst.InternalRow;
+import java.util.concurrent.atomic.AtomicLong;
+
public class InternalRowCollector implements Collector<SeaTunnelRow> {
private final Handover<InternalRow> handover;
private final Object checkpointLock;
private final InternalRowConverter rowSerialization;
+ private final AtomicLong collectTotalCount;
public InternalRowCollector(Handover<InternalRow> handover, Object
checkpointLock, SeaTunnelDataType<?> dataType) {
this.handover = handover;
this.checkpointLock = checkpointLock;
this.rowSerialization = new InternalRowConverter(dataType);
+ this.collectTotalCount = new AtomicLong(0);
}
@Override
@@ -42,11 +46,16 @@ public class InternalRowCollector implements
Collector<SeaTunnelRow> {
synchronized (checkpointLock) {
handover.produce(rowSerialization.convert(record));
}
+ collectTotalCount.incrementAndGet();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ public long collectTotalCount() {
+ return collectTotalCount.get();
+ }
+
@Override
public Object getCheckpointLock() {
return this.checkpointLock;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
index 81c491e74..6fa2aa94a 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
@@ -53,6 +53,7 @@ public class ParallelBatchPartitionReader {
protected volatile boolean prepare = true;
protected volatile BaseSourceFunction<SeaTunnelRow> internalSource;
+ protected volatile InternalRowCollector internalRowCollector;
public ParallelBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId) {
this.source = source;
@@ -90,9 +91,11 @@ public class ParallelBatchPartitionReader {
running = false;
throw new RuntimeException("Failed to open internal source.", e);
}
+
+ this.internalRowCollector = new InternalRowCollector(handover,
checkpointLock, source.getProducedType());
executorService.execute(() -> {
try {
- internalSource.run(new InternalRowCollector(handover,
checkpointLock, source.getProducedType()));
+ internalSource.run(internalRowCollector);
} catch (Exception e) {
handover.reportError(e);
log.error("BatchPartitionReader execute failed.", e);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
index 83ca10cca..9aa326844 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -55,7 +55,20 @@ public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPart
@Override
public void virtualCheckpoint() {
try {
- internalCheckpoint(collectorMap.values().iterator(), 0);
+ int checkpointRetries = Math.max(1, CHECKPOINT_RETRIES);
+ do {
+ checkpointRetries--;
+ long collectedReader =
collectorMap.values().stream().mapToLong(e -> e.collectTotalCount() > 0 ? 1 :
0).sum();
+ if (collectedReader == 0) {
+ Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
+ }
+
+ collectedReader = collectorMap.values().stream().mapToLong(e
-> e.collectTotalCount() > 0 ? 1 : 0).sum();
+ if (collectedReader != 0 || checkpointRetries == 0) {
+ checkpointRetries = 0;
+ internalCheckpoint(collectorMap.values().iterator(), 0);
+ }
+ } while (checkpointRetries > 0);
} catch (Exception e) {
throw new RuntimeException("An error occurred in virtual
checkpoint execution.", e);
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
index 30c5c3402..35a6e09a7 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
public class ParallelMicroBatchPartitionReader extends
ParallelBatchPartitionReader {
protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
+ protected static final Integer CHECKPOINT_RETRIES = 3;
protected volatile Integer checkpointId;
protected final Integer checkpointInterval;
protected final String checkpointPath;
@@ -117,19 +118,30 @@ public class ParallelMicroBatchPartitionReader extends
ParallelBatchPartitionRea
public void virtualCheckpoint() {
try {
- synchronized (checkpointLock) {
- while (!handover.isEmpty()) {
+ int checkpointRetries = Math.max(1, CHECKPOINT_RETRIES);
+ do {
+ checkpointRetries--;
+ if (internalRowCollector.collectTotalCount() == 0) {
Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
}
- // Block #next() method
- synchronized (handover) {
- final int currentCheckpoint = checkpointId;
- ReaderState readerState = snapshotState();
- saveState(readerState, currentCheckpoint);
- internalSource.notifyCheckpointComplete(currentCheckpoint);
- running = false;
+ synchronized (checkpointLock) {
+ if (internalRowCollector.collectTotalCount() != 0 ||
checkpointRetries == 0) {
+ checkpointRetries = 0;
+
+ while (!handover.isEmpty()) {
+ Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
+ }
+ // Block #next() method
+ synchronized (handover) {
+ final int currentCheckpoint = checkpointId;
+ ReaderState readerState = snapshotState();
+ saveState(readerState, currentCheckpoint);
+
internalSource.notifyCheckpointComplete(currentCheckpoint);
+ running = false;
+ }
+ }
}
- }
+ } while (checkpointRetries > 0);
} catch (Exception e) {
throw new RuntimeException("An error occurred in virtual
checkpoint execution.", e);
}