This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 691aabcb [api-draft] Support coordinated source & Fix checkpoint lock
implementation (#1951)
691aabcb is described below
commit 691aabcb873392349af9e6379bd29a1fb5dd9f01
Author: Zongwen Li <[email protected]>
AuthorDate: Fri May 27 09:55:50 2022 +0800
[api-draft] Support coordinated source & Fix checkpoint lock implementation
(#1951)
* [api-draft][flink] Support coordinated source & Fix checkpoint lock
implementation
* [api-draft][spark] Support coordinated source
* [api-draft][spark] Coordinated source support parallel checkpoint lock
---
.../org/apache/seatunnel/api/source/Collector.java | 4 +-
.../apache/seatunnel/api/state/CheckpointLock.java | 26 ---
.../translation/source/BaseSourceFunction.java} | 23 +-
.../source/CoordinatedEnumeratorContext.java | 10 +-
...rContext.java => CoordinatedReaderContext.java} | 24 +-
.../translation/source/CoordinatedSource.java | 241 ++++++++++++++++++++-
.../translation/source/ParallelReaderContext.java | 1 -
.../translation/source/ParallelSource.java | 49 +++--
.../seatunnel/translation/state/EmptyLock.java | 32 ---
...ource.java => BaseSeaTunnelSourceFunction.java} | 67 +++---
.../flink/source/FlinkCheckpointLock.java | 89 --------
.../translation/flink/source/RowCollector.java | 7 +-
.../flink/source/SeaTunnelCoordinatedSource.java | 43 ++++
.../flink/source/SeaTunnelParallelSource.java | 114 +---------
.../spark/source/InternalRowCollector.java | 12 +-
.../translation/spark/source/ReaderState.java | 7 +-
.../spark/source/SeaTunnelSourceSupport.java | 23 +-
.../spark/source/batch/BatchPartition.java | 7 +-
...lelSourceReader.java => BatchSourceReader.java} | 17 +-
.../batch/CoordinatedBatchPartitionReader.java | 100 +++++++++
...ader.java => ParallelBatchPartitionReader.java} | 45 ++--
.../source/continnous/ContinuousPartition.java | 7 +-
...urceReader.java => ContinuousSourceReader.java} | 7 +-
...java => ParallelContinuousPartitionReader.java} | 17 +-
.../CoordinatedMicroBatchPartitionReader.java | 134 ++++++++++++
.../spark/source/micro/MicroBatchPartition.java | 7 +-
...urceReader.java => MicroBatchSourceReader.java} | 17 +-
...java => ParallelMicroBatchPartitionReader.java} | 78 +++----
28 files changed, 756 insertions(+), 452 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index a2bc77ba..0b924bb5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.api.source;
-import org.apache.seatunnel.api.state.CheckpointLock;
-
/**
* A {@link Collector} is used to collect data from {@link SourceReader}.
*
@@ -33,5 +31,5 @@ public interface Collector<T> {
*
* @return The object to use as the lock
*/
- CheckpointLock getCheckpointLock();
+ Object getCheckpointLock();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointLock.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointLock.java
deleted file mode 100644
index 4a63cb15..00000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.state;
-
-public interface CheckpointLock {
-
- void lock();
-
- void unlock();
-
-}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/SingleReaderCheckpointLock.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/BaseSourceFunction.java
similarity index 63%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/SingleReaderCheckpointLock.java
rename to
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/BaseSourceFunction.java
index d0ceb189..eaa7325e 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/SingleReaderCheckpointLock.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/BaseSourceFunction.java
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.spark.source.micro;
+package org.apache.seatunnel.translation.source;
-import org.apache.seatunnel.api.state.CheckpointLock;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.state.CheckpointListener;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.List;
+import java.util.Map;
-public class SingleReaderCheckpointLock implements CheckpointLock {
- private final ReentrantLock fairLock = new ReentrantLock(true);
+public interface BaseSourceFunction<T> extends AutoCloseable,
CheckpointListener {
- @Override
- public void lock() {
- fairLock.lock();
- }
+ void open() throws Exception;
- @Override
- public void unlock() {
- fairLock.unlock();
- }
+ void run(Collector<T> collector) throws Exception;
+
+ Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws
Exception;
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
index 8c06e3c8..5f585b97 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
@@ -34,27 +34,27 @@ public class CoordinatedEnumeratorContext<SplitT extends
SourceSplit> implements
@Override
public int currentParallelism() {
- return 0;
+ return coordinatedSource.currentReaderCount();
}
@Override
public Set<Integer> registeredReaders() {
- return null;
+ return coordinatedSource.registeredReaders();
}
@Override
public void assignSplit(int subtaskId, List<SplitT> splits) {
-
+ coordinatedSource.addSplits(subtaskId, splits);
}
@Override
public void signalNoMoreSplits(int subtaskId) {
-
+ coordinatedSource.handleNoMoreSplits(subtaskId);
}
@Override
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
-
+ coordinatedSource.handleEnumeratorEvent(subtaskId, event);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
similarity index 69%
copy from
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
copy to
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
index 9080c578..6f410844 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
@@ -21,44 +21,42 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
-public class ParallelReaderContext implements SourceReader.Context {
+public class CoordinatedReaderContext implements SourceReader.Context {
- protected final ParallelSource<?, ?, ?> parallelSource;
+ protected final CoordinatedSource<?, ?, ?> coordinatedSource;
protected final Boundedness boundedness;
protected final Integer subtaskId;
- public ParallelReaderContext(ParallelSource<?, ?, ?> parallelSource,
- Boundedness boundedness,
- Integer subtaskId) {
- this.parallelSource = parallelSource;
+ public CoordinatedReaderContext(CoordinatedSource<?, ?, ?>
coordinatedSource,
+ Boundedness boundedness,
+ Integer subtaskId) {
+ this.coordinatedSource = coordinatedSource;
this.boundedness = boundedness;
this.subtaskId = subtaskId;
}
@Override
public int getIndexOfSubtask() {
- return subtaskId;
+ return this.subtaskId;
}
@Override
public Boundedness getBoundedness() {
- return boundedness;
+ return this.boundedness;
}
@Override
public void signalNoMoreElement() {
- // todo: if we have multiple subtasks, we need to know if all subtask
is stopped
- parallelSource.handleNoMoreElement();
+ coordinatedSource.handleNoMoreElement(subtaskId);
}
@Override
public void sendSplitRequest() {
- parallelSource.handleSplitRequest(subtaskId);
+ coordinatedSource.handleSplitRequest(subtaskId);
}
@Override
public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
- // TODO: exception
- throw new RuntimeException("");
+ coordinatedSource.handleReaderEvent(subtaskId, sourceEvent);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 142bd8e3..9806a6d0 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -17,13 +17,250 @@
package org.apache.seatunnel.translation.source;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
-public class CoordinatedSource<T, SplitT extends SourceSplit, StateT> {
- protected volatile Map<Integer, SourceReader<T, SplitT>> readerMap = new
ConcurrentHashMap<>();
+public class CoordinatedSource<T, SplitT extends SourceSplit, StateT>
implements BaseSourceFunction<T> {
+ protected static final long SLEEP_TIME_INTERVAL = 5L;
+ protected final SeaTunnelSource<T, SplitT, StateT> source;
+ protected final Map<Integer, List<byte[]>> restoredState;
+ protected final Integer parallelism;
+ protected final Serializer<SplitT> splitSerializer;
+ protected final Serializer<StateT> enumeratorStateSerializer;
+
+ protected final CoordinatedEnumeratorContext<SplitT>
coordinatedEnumeratorContext;
+ protected final Map<Integer, CoordinatedReaderContext> readerContextMap;
+ protected final Map<Integer, List<SplitT>> restoredSplitStateMap = new
HashMap<>();
+
+ protected transient volatile SourceSplitEnumerator<SplitT, StateT>
splitEnumerator;
+ protected transient Map<Integer, SourceReader<T, SplitT>> readerMap = new
ConcurrentHashMap<>();
+ protected final Map<Integer, AtomicBoolean> readerRunningMap;
+ protected transient volatile ScheduledThreadPoolExecutor executorService;
+
+ /**
+ * Flag indicating whether the consumer is still running.
+ */
+ protected volatile boolean running = true;
+
+ public CoordinatedSource(SeaTunnelSource<T, SplitT, StateT> source,
+ Map<Integer, List<byte[]>> restoredState,
+ int parallelism) {
+ this.source = source;
+ this.restoredState = restoredState;
+ this.parallelism = parallelism;
+ this.splitSerializer = source.getSplitSerializer();
+ this.enumeratorStateSerializer = source.getEnumeratorStateSerializer();
+
+ this.coordinatedEnumeratorContext = new
CoordinatedEnumeratorContext<>(this);
+ this.readerContextMap = new ConcurrentHashMap<>(parallelism);
+ this.readerRunningMap = new ConcurrentHashMap<>(parallelism);
+ try {
+ createSplitEnumerator();
+ createReaders();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void createSplitEnumerator() throws Exception {
+ if (restoredState != null && restoredState.size() > 0) {
+ StateT restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+ splitEnumerator =
source.restoreEnumerator(coordinatedEnumeratorContext, restoredEnumeratorState);
+ restoredState.forEach((subtaskId, splitBytes) -> {
+ if (subtaskId == -1) {
+ return;
+ }
+ List<SplitT> restoredSplitState = new
ArrayList<>(splitBytes.size());
+ for (byte[] splitByte : splitBytes) {
+ try {
+
restoredSplitState.add(splitSerializer.deserialize(splitByte));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ restoredSplitStateMap.put(subtaskId, restoredSplitState);
+ });
+ } else {
+ splitEnumerator =
source.createEnumerator(coordinatedEnumeratorContext);
+ }
+ }
+
+ private void createReaders() throws Exception {
+ for (int subtaskId = 0; subtaskId < this.parallelism; subtaskId++) {
+ CoordinatedReaderContext readerContext = new
CoordinatedReaderContext(this, source.getBoundedness(), subtaskId);
+ readerContextMap.put(subtaskId, readerContext);
+ SourceReader<T, SplitT> reader =
source.createReader(readerContext);
+ readerMap.put(subtaskId, reader);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ executorService =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(parallelism,
"parallel-split-enumerator-executor");
+ splitEnumerator.open();
+ restoredSplitStateMap.forEach((subtaskId, splits) -> {
+ splitEnumerator.addSplitsBack(splits, subtaskId);
+ });
+ readerMap.entrySet().parallelStream().forEach(entry -> {
+ try {
+ entry.getValue().open();
+ splitEnumerator.registerReader(entry.getKey());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public void run(Collector<T> collector) throws Exception {
+ readerMap.entrySet().parallelStream().forEach(entry -> {
+ final AtomicBoolean flag = readerRunningMap.get(entry.getKey());
+ final SourceReader<T, SplitT> reader = entry.getValue();
+ executorService.execute(() -> {
+ while (flag.get()) {
+ try {
+ reader.pollNext(collector);
+ } catch (Exception e) {
+ running = false;
+ flag.set(false);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ });
+ splitEnumerator.run();
+ while (running) {
+ Thread.sleep(SLEEP_TIME_INTERVAL);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ running = false;
+
+ for (Map.Entry<Integer, SourceReader<T, SplitT>> entry :
readerMap.entrySet()) {
+ readerRunningMap.get(entry.getKey()).set(false);
+ entry.getValue().close();
+ }
+
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+
+ try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator) {
+ // just close the resources
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Checkpoint & state
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws
Exception {
+ StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+ byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
+ Map<Integer, List<byte[]>> allStates = readerMap.entrySet()
+ .parallelStream()
+ .collect(Collectors.toMap(
+ Map.Entry<Integer, SourceReader<T, SplitT>>::getKey,
+ readerEntry -> {
+ try {
+ List<SplitT> splitStates =
readerEntry.getValue().snapshotState(checkpointId);
+ final List<byte[]> rawValues = new
ArrayList<>(splitStates.size());
+ for (SplitT splitState : splitStates) {
+
rawValues.add(splitSerializer.serialize(splitState));
+ }
+ return rawValues;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+ return allStates;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ splitEnumerator.notifyCheckpointComplete(checkpointId);
+ readerMap.values().parallelStream().forEach(reader -> {
+ try {
+ reader.notifyCheckpointComplete(checkpointId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ splitEnumerator.notifyCheckpointAborted(checkpointId);
+ readerMap.values().parallelStream().forEach(reader -> {
+ try {
+ reader.notifyCheckpointAborted(checkpointId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Reader context methods
+ //
--------------------------------------------------------------------------------------------
+
+ protected void handleNoMoreElement(int subtaskId) {
+ readerRunningMap.get(subtaskId).set(false);
+ readerContextMap.remove(subtaskId);
+ }
+
+ protected void handleSplitRequest(int subtaskId) {
+ splitEnumerator.handleSplitRequest(subtaskId);
+ }
+
+ protected void handleReaderEvent(int subtaskId, SourceEvent event) {
+ splitEnumerator.handleSourceEvent(subtaskId, event);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Enumerator context methods
+ //
--------------------------------------------------------------------------------------------
+
+ public int currentReaderCount() {
+ return readerContextMap.size();
+ }
+
+ public Set<Integer> registeredReaders() {
+ return readerMap.keySet();
+ }
+
+ protected void addSplits(int subtaskId, List<SplitT> splits) {
+ readerMap.get(subtaskId).addSplits(splits);
+ }
+
+ protected void handleNoMoreSplits(int subtaskId) {
+ readerMap.get(subtaskId).handleNoMoreSplits();
+ }
+
+ protected void handleEnumeratorEvent(int subtaskId, SourceEvent event) {
+ readerMap.get(subtaskId).handleSourceEvent(event);
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 9080c578..24c082cf 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -47,7 +47,6 @@ public class ParallelReaderContext implements
SourceReader.Context {
@Override
public void signalNoMoreElement() {
- // todo: if we have multiple subtasks, we need to know if all subtask
is stopped
parallelSource.handleNoMoreElement();
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 83e61f78..1f25d736 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -23,19 +23,18 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.state.CheckpointListener;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements
AutoCloseable, CheckpointListener {
-
- private final long splitEnumeratorTimeInterval = 5L;
+public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements
BaseSourceFunction<T> {
protected final SeaTunnelSource<T, SplitT, StateT> source;
protected final ParallelEnumeratorContext<SplitT>
parallelEnumeratorContext;
@@ -48,8 +47,8 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
protected final List<SplitT> restoredSplitState;
- protected transient volatile SourceSplitEnumerator<SplitT, StateT>
splitEnumerator;
- protected transient volatile SourceReader<T, SplitT> reader;
+ protected final SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
+ protected final SourceReader<T, SplitT> reader;
protected transient volatile ScheduledThreadPoolExecutor executorService;
/**
@@ -58,7 +57,7 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
private volatile boolean running = true;
public ParallelSource(SeaTunnelSource<T, SplitT, StateT> source,
- List<byte[]> restoredState,
+ Map<Integer, List<byte[]>> restoredState,
int parallelism,
int subtaskId) {
this.source = source;
@@ -73,10 +72,10 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
// Create or restore split enumerator & reader
try {
if (restoredState != null && restoredState.size() > 0) {
- StateT restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(0));
- restoredSplitState = new ArrayList<>(restoredState.size());
- for (int i = 1; i < restoredState.size(); i++) {
-
restoredSplitState.add(splitSerializer.deserialize(restoredState.get(i)));
+ StateT restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+ restoredSplitState = new
ArrayList<>(restoredState.get(subtaskId).size());
+ for (byte[] splitBytes : restoredState.get(subtaskId)) {
+
restoredSplitState.add(splitSerializer.deserialize(splitBytes));
}
splitEnumerator =
source.restoreEnumerator(parallelEnumeratorContext, restoredEnumeratorState);
@@ -90,17 +89,19 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
}
}
- private transient volatile Thread splitEnumeratorThread;
-
+ @Override
public void open() throws Exception {
executorService =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1,
String.format("parallel-split-enumerator-executor-%s", subtaskId));
splitEnumerator.open();
- splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
+ if (restoredSplitState.size() > 0) {
+ splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
+ }
reader.open();
parallelEnumeratorContext.register();
splitEnumerator.registerReader(subtaskId);
}
+ @Override
public void run(Collector<T> collector) throws Exception {
Future<?> future = executorService.submit(() -> {
try {
@@ -161,16 +162,22 @@ public class ParallelSource<T, SplitT extends
SourceSplit, StateT> implements Au
reader.handleNoMoreSplits();
}
- public List<byte[]> snapshotState(long checkpointId) throws Exception {
- StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
- byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
+ //
--------------------------------------------------------------------------------------------
+ // Checkpoint & state
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws
Exception {
+ byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(splitEnumerator.snapshotState(checkpointId));
List<SplitT> splitStates = reader.snapshotState(checkpointId);
- final List<byte[]> rawValues = new ArrayList<>(splitStates.size() + 1);
- rawValues.add(enumeratorStateBytes);
+ final List<byte[]> readerStateBytes = new
ArrayList<>(splitStates.size());
for (SplitT splitState : splitStates) {
- rawValues.add(splitSerializer.serialize(splitState));
+ readerStateBytes.add(splitSerializer.serialize(splitState));
}
- return rawValues;
+ Map<Integer, List<byte[]>> allStates = new HashMap<>(2);
+ allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+ allStates.put(subtaskId, readerStateBytes);
+ return allStates;
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/state/EmptyLock.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/state/EmptyLock.java
deleted file mode 100644
index 363956ad..00000000
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/state/EmptyLock.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.state;
-
-import org.apache.seatunnel.api.state.CheckpointLock;
-
-public class EmptyLock implements CheckpointLock {
- @Override
- public void lock() {
- // nothing
- }
-
- @Override
- public void unlock() {
- // nothing
- }
-}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
similarity index 64%
copy from
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
copy to
seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 48374567..1778596f 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -21,77 +21,70 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-import org.apache.seatunnel.translation.source.ParallelSource;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
-public class SeaTunnelParallelSource extends RichParallelSourceFunction<Row>
+public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row>
implements CheckpointListener, ResultTypeQueryable<Row>,
CheckpointedFunction {
- private static final Logger LOG =
LoggerFactory.getLogger(SeaTunnelParallelSource.class);
- protected static final String PARALLEL_SOURCE_STATE_NAME =
"parallel-source-states";
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseSeaTunnelSourceFunction.class);
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
- protected transient volatile ParallelSource<SeaTunnelRow, ?, ?>
parallelSource;
+ protected transient volatile BaseSourceFunction<SeaTunnelRow>
internalSource;
- protected transient ListState<byte[]> sourceState;
- protected transient volatile List<byte[]> restoredState;
+ protected transient MapState<Integer, List<byte[]>> sourceState;
+ protected transient volatile Map<Integer, List<byte[]>> restoredState =
new HashMap<>();
/**
* Flag indicating whether the consumer is still running.
*/
private volatile boolean running = true;
- private FlinkCheckpointLock checkpointLock = null;
-
- public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source)
{
- // TODO: Make sure the source is uncoordinated.
+ public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?>
source) {
this.source = source;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- this.parallelSource = new ParallelSource<>(source,
- restoredState,
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask());
- this.parallelSource.open();
+ this.internalSource = createInternalSource();
+ this.internalSource.open();
}
+ protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();
+
@Override
public void run(SourceFunction.SourceContext<Row> sourceContext) throws
Exception {
- checkpointLock = new
FlinkCheckpointLock(sourceContext.getCheckpointLock(),
getRuntimeContext().getIndexOfThisSubtask());
- parallelSource.run(new RowCollector(sourceContext, checkpointLock));
+ internalSource.run(new RowCollector(sourceContext,
sourceContext.getCheckpointLock()));
}
@Override
public void cancel() {
running = false;
try {
- parallelSource.close();
- if (checkpointLock != null) {
- checkpointLock.close();
- }
+ internalSource.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -99,12 +92,12 @@ public class SeaTunnelParallelSource extends
RichParallelSourceFunction<Row>
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- parallelSource.notifyCheckpointComplete(checkpointId);
+ internalSource.notifyCheckpointComplete(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
- parallelSource.notifyCheckpointAborted(checkpointId);
+ internalSource.notifyCheckpointAborted(checkpointId);
}
@Override
@@ -120,22 +113,26 @@ public class SeaTunnelParallelSource extends
RichParallelSourceFunction<Row>
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
-
sourceState.update(parallelSource.snapshotState(snapshotContext.getCheckpointId()));
+ sourceState.clear();
+
sourceState.putAll(internalSource.snapshotState(snapshotContext.getCheckpointId()));
}
}
@Override
public void initializeState(FunctionInitializationContext
initializeContext) throws Exception {
- OperatorStateStore stateStore =
initializeContext.getOperatorStateStore();
- this.sourceState = stateStore.getListState(new
ListStateDescriptor<>(PARALLEL_SOURCE_STATE_NAME,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+ this.sourceState = initializeContext.getKeyedStateStore()
+ .getMapState(new MapStateDescriptor<>(
+ getStateName(),
+ BasicTypeInfo.INT_TYPE_INFO,
+
Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)));
if (initializeContext.isRestored()) {
- restoredState = new ArrayList<>();
// populate actual holder for restored state
- sourceState.get().forEach(restoredState::add);
-
+ sourceState.entries().forEach(entry ->
restoredState.put(entry.getKey(), entry.getValue()));
LOG.info("Consumer subtask {} restored state",
getRuntimeContext().getIndexOfThisSubtask());
} else {
LOG.info("Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
+
+ protected abstract String getStateName();
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkCheckpointLock.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkCheckpointLock.java
deleted file mode 100644
index 59c10514..00000000
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkCheckpointLock.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.state.CheckpointLock;
-import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
-
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FlinkCheckpointLock implements CheckpointLock, AutoCloseable {
-
- private static final Integer LOOP_INTERVAL = 10;
- private final Object flinkLock;
- private final AtomicInteger lock = new AtomicInteger(0);
- private volatile boolean engineLock = false;
- private volatile boolean running = true;
- private final ScheduledThreadPoolExecutor executor;
- public FlinkCheckpointLock(Object flinkLock, int subtaskId) {
- this.flinkLock = flinkLock;
- this.executor =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1,
String.format("checkpoint-lock-monitor-%s", subtaskId));
- executor.execute(this::monitorLock);
- }
-
- private void monitorLock() {
- if (!running) {
- return;
- }
- while (lock.get() == 0 && running) {
- sleep();
- }
- if (!running) {
- return;
- }
- synchronized (flinkLock) {
- engineLock = true;
- while (lock.get() != 0 && running) {
- sleep();
- }
- }
- engineLock = false;
- monitorLock();
- }
-
- @Override
- public void lock() {
- lock.incrementAndGet();
- while (!engineLock) {
- sleep();
- }
- }
-
- @Override
- public void unlock() {
- int num = lock.decrementAndGet();
- while (engineLock && num == 0) {
- sleep();
- }
- }
-
- private void sleep() {
- try {
- Thread.sleep(LOOP_INTERVAL);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() throws Exception {
- running = false;
- executor.shutdown();
- }
-}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
index 1541cf31..f9727926 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.state.CheckpointLock;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
@@ -31,9 +30,9 @@ public class RowCollector implements Collector<SeaTunnelRow> {
protected final SourceFunction.SourceContext<Row> internalCollector;
protected final FlinkRowSerialization rowSerialization = new
FlinkRowSerialization();
- protected final CheckpointLock checkpointLock;
+ protected final Object checkpointLock;
- public RowCollector(SourceFunction.SourceContext<Row> internalCollector,
CheckpointLock checkpointLock) {
+ public RowCollector(SourceFunction.SourceContext<Row> internalCollector,
Object checkpointLock) {
this.internalCollector = internalCollector;
this.checkpointLock = checkpointLock;
}
@@ -48,7 +47,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
}
@Override
- public CheckpointLock getCheckpointLock() {
+ public Object getCheckpointLock() {
return this.checkpointLock;
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
new file mode 100644
index 00000000..a6992341
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
+import org.apache.seatunnel.translation.source.CoordinatedSource;
+
+public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {
+
+ protected static final String COORDINATED_SOURCE_STATE_NAME =
"coordinated-source-states";
+
+ public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?>
source) {
+ // TODO: Make sure the source is coordinated.
+ super(source);
+ }
+
+ @Override
+ protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+ return new CoordinatedSource<>(source, restoredState,
getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ protected String getStateName() {
+ return COORDINATED_SOURCE_STATE_NAME;
+ }
+}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index 48374567..c3f8bfeb 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -19,123 +19,31 @@ package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.ParallelSource;
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction
implements ParallelSourceFunction<Row> {
-public class SeaTunnelParallelSource extends RichParallelSourceFunction<Row>
- implements CheckpointListener, ResultTypeQueryable<Row>,
CheckpointedFunction {
- private static final Logger LOG =
LoggerFactory.getLogger(SeaTunnelParallelSource.class);
protected static final String PARALLEL_SOURCE_STATE_NAME =
"parallel-source-states";
- protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
- protected transient volatile ParallelSource<SeaTunnelRow, ?, ?>
parallelSource;
-
- protected transient ListState<byte[]> sourceState;
- protected transient volatile List<byte[]> restoredState;
-
- /**
- * Flag indicating whether the consumer is still running.
- */
- private volatile boolean running = true;
-
- private FlinkCheckpointLock checkpointLock = null;
-
public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source)
{
// TODO: Make sure the source is uncoordinated.
- this.source = source;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.parallelSource = new ParallelSource<>(source,
- restoredState,
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask());
- this.parallelSource.open();
- }
-
- @Override
- public void run(SourceFunction.SourceContext<Row> sourceContext) throws
Exception {
- checkpointLock = new
FlinkCheckpointLock(sourceContext.getCheckpointLock(),
getRuntimeContext().getIndexOfThisSubtask());
- parallelSource.run(new RowCollector(sourceContext, checkpointLock));
+ super(source);
}
@Override
- public void cancel() {
- running = false;
- try {
- parallelSource.close();
- if (checkpointLock != null) {
- checkpointLock.close();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+ return new ParallelSource<>(source,
+ restoredState,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask());
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- parallelSource.notifyCheckpointComplete(checkpointId);
- }
-
- @Override
- public void notifyCheckpointAborted(long checkpointId) throws Exception {
- parallelSource.notifyCheckpointAborted(checkpointId);
- }
-
- @Override
- public TypeInformation<Row> getProducedType() {
- SeaTunnelRowTypeInfo rowTypeInfo = source.getRowTypeInfo();
- TypeInformation<?>[] typeInformation =
Arrays.stream(rowTypeInfo.getSeaTunnelDataTypes())
-
.map(TypeConverterUtils::convertType).toArray(TypeInformation[]::new);
- return new RowTypeInfo(typeInformation, rowTypeInfo.getFieldNames());
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext snapshotContext) throws
Exception {
- if (!running) {
- LOG.debug("snapshotState() called on closed source");
- } else {
-
sourceState.update(parallelSource.snapshotState(snapshotContext.getCheckpointId()));
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext
initializeContext) throws Exception {
- OperatorStateStore stateStore =
initializeContext.getOperatorStateStore();
- this.sourceState = stateStore.getListState(new
ListStateDescriptor<>(PARALLEL_SOURCE_STATE_NAME,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
- if (initializeContext.isRestored()) {
- restoredState = new ArrayList<>();
- // populate actual holder for restored state
- sourceState.get().forEach(restoredState::add);
-
- LOG.info("Consumer subtask {} restored state",
getRuntimeContext().getIndexOfThisSubtask());
- } else {
- LOG.info("Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());
- }
+ protected String getStateName() {
+ return PARALLEL_SOURCE_STATE_NAME;
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
index b5678eb8..88e1da93 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/InternalRowCollector.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.state.CheckpointLock;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.spark.serialization.InternalRowSerialization;
@@ -27,10 +26,10 @@ import org.apache.spark.sql.types.StructType;
public class InternalRowCollector implements Collector<SeaTunnelRow> {
private final Handover<InternalRow> handover;
- private final CheckpointLock checkpointLock;
+ private final Object checkpointLock;
private final InternalRowSerialization rowSerialization;
- public InternalRowCollector(Handover<InternalRow> handover, CheckpointLock
checkpointLock, StructType sparkSchema) {
+ public InternalRowCollector(Handover<InternalRow> handover, Object
checkpointLock, StructType sparkSchema) {
this.handover = handover;
this.checkpointLock = checkpointLock;
this.rowSerialization = new InternalRowSerialization(sparkSchema);
@@ -39,15 +38,16 @@ public class InternalRowCollector implements
Collector<SeaTunnelRow> {
@Override
public void collect(SeaTunnelRow record) {
try {
- // TODO: Lock InternalRowCollector while checkpoint is running
- handover.produce(rowSerialization.serialize(record));
+ synchronized (checkpointLock) {
+ handover.produce(rowSerialization.serialize(record));
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
- public CheckpointLock getCheckpointLock() {
+ public Object getCheckpointLock() {
return this.checkpointLock;
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
index 45765a36..5eed6425 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ReaderState.java
@@ -20,19 +20,20 @@ package org.apache.seatunnel.translation.spark.source;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import java.util.List;
+import java.util.Map;
public class ReaderState implements PartitionOffset {
- private final List<byte[]> bytes;
+ private final Map<Integer, List<byte[]>> bytes;
private final Integer subtaskId;
private final Integer checkpointId;
- public ReaderState(List<byte[]> bytes, Integer subtaskId, Integer
checkpointId) {
+ public ReaderState(Map<Integer, List<byte[]>> bytes, Integer subtaskId,
Integer checkpointId) {
this.bytes = bytes;
this.subtaskId = subtaskId;
this.checkpointId = checkpointId;
}
- public List<byte[]> getBytes() {
+ public Map<Integer, List<byte[]>> getBytes() {
return bytes;
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 09b969b1..5d388e82 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -20,9 +20,9 @@ package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SerializationUtils;
-import
org.apache.seatunnel.translation.spark.source.batch.BatchParallelSourceReader;
-import
org.apache.seatunnel.translation.spark.source.continnous.ContinuousParallelSourceReader;
-import
org.apache.seatunnel.translation.spark.source.micro.MicroBatchParallelSourceReader;
+import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
+import
org.apache.seatunnel.translation.spark.source.continnous.ContinuousSourceReader;
+import
org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -38,11 +38,13 @@ import
org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Optional;
public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport,
MicroBatchReadSupport, ContinuousReadSupport, DataSourceRegister {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(SeaTunnelSourceSupport.class);
public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnelSource";
public static final Integer CHECKPOINT_INTERVAL_DEFAULT = 10000;
@@ -54,9 +56,8 @@ public class SeaTunnelSourceSupport implements DataSourceV2,
ReadSupport, MicroB
@Override
public DataSourceReader createReader(StructType rowType, DataSourceOptions
options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
- Integer parallelism = options.getInt("source.parallelism", 1);
- // TODO: case coordinated source
- return new BatchParallelSourceReader(seaTunnelSource, parallelism,
rowType);
+ int parallelism = options.getInt("source.parallelism", 1);
+ return new BatchSourceReader(seaTunnelSource, parallelism, rowType);
}
@Override
@@ -73,10 +74,9 @@ public class SeaTunnelSourceSupport implements DataSourceV2,
ReadSupport, MicroB
String checkpointPath = StringUtils.replacePattern(checkpointLocation,
"sources/\\d+", "sources-state");
Configuration configuration =
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
String hdfsRoot =
options.get("hdfs.root").orElse(FileSystem.getDefaultUri(configuration).toString());
- String hdfsUser = options.get("hdfs.user").orElseThrow(() -> new
UnsupportedOperationException("HDFS user is required."));
+ String hdfsUser = options.get("hdfs.user").orElse("");
Integer checkpointId = options.getInt("checkpoint.id", 1);
- // TODO: case coordinated source
- return new MicroBatchParallelSourceReader(seaTunnelSource,
parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot,
hdfsUser, rowType);
+ return new MicroBatchSourceReader(seaTunnelSource, parallelism,
checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser, rowType);
}
@Override
@@ -84,8 +84,7 @@ public class SeaTunnelSourceSupport implements DataSourceV2,
ReadSupport, MicroB
StructType rowType = checkRowType(rowTypeOptional);
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
Integer parallelism = options.getInt("source.parallelism", 1);
- // TODO: case coordinated source
- return new ContinuousParallelSourceReader(seaTunnelSource,
parallelism, rowType);
+ return new ContinuousSourceReader(seaTunnelSource, parallelism,
rowType);
}
private SeaTunnelSource<SeaTunnelRow, ?, ?>
getSeaTunnelSource(DataSourceOptions options) {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
index 321f8ae8..d8c050c1 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartition.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source.batch;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -40,6 +41,10 @@ public class BatchPartition implements
InputPartition<InternalRow> {
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new BatchPartitionReader(source, parallelism, subtaskId,
rowType);
+ if (source instanceof SupportCoordinate) {
+ return new CoordinatedBatchPartitionReader(source, parallelism,
subtaskId, rowType);
+ } else {
+ return new ParallelBatchPartitionReader(source, parallelism,
subtaskId, rowType);
+ }
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchSourceReader.java
similarity index 72%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
rename to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchSourceReader.java
index 0498bf28..d0a26d80 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchParallelSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source.batch;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -28,13 +29,13 @@ import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
-public class BatchParallelSourceReader implements DataSourceReader {
+public class BatchSourceReader implements DataSourceReader {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
protected final StructType rowType;
- public BatchParallelSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, StructType rowType) {
+ public BatchSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism, StructType rowType) {
this.source = source;
this.parallelism = parallelism;
this.rowType = rowType;
@@ -47,9 +48,15 @@ public class BatchParallelSourceReader implements
DataSourceReader {
@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
- List<InputPartition<InternalRow>> virtualPartitions = new
ArrayList<>(parallelism);
- for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
- virtualPartitions.add(new BatchPartition(source, parallelism,
subtaskId, rowType));
+ List<InputPartition<InternalRow>> virtualPartitions;
+ if (source instanceof SupportCoordinate) {
+ virtualPartitions = new ArrayList<>(1);
+ virtualPartitions.add(new BatchPartition(source, parallelism, 0,
rowType));
+ } else {
+ virtualPartitions = new ArrayList<>(parallelism);
+ for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
+ virtualPartitions.add(new BatchPartition(source, parallelism,
subtaskId, rowType));
+ }
}
return virtualPartitions;
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
new file mode 100644
index 00000000..00c2cd71
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.source.batch;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
+import org.apache.seatunnel.translation.source.CoordinatedSource;
+import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CoordinatedBatchPartitionReader extends
ParallelBatchPartitionReader {
+
+ protected final Map<Integer, InternalRowCollector> collectorMap;
+
+ public CoordinatedBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, StructType rowType) {
+ super(source, parallelism, subtaskId, rowType);
+ this.collectorMap = new HashMap<>(parallelism);
+ for (int i = 0; i < parallelism; i++) {
+ collectorMap.put(i, new InternalRowCollector(handover, new
Object(), rowType));
+ }
+ }
+
+ @Override
+ protected String getEnumeratorThreadName() {
+ return "coordinated-split-enumerator-executor";
+ }
+
+ @Override
+ protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+ return new InternalCoordinatedSource<>(source,
+ null,
+ parallelism);
+ }
+
+ public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT>
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
+ protected final AtomicInteger completedReader = new AtomicInteger(0);
+
+ public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT,
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
+ super(source, restoredState, parallelism);
+ }
+
+ @Override
+ public void run(Collector<SeaTunnelRow> collector) throws Exception {
+ readerMap.entrySet().parallelStream().forEach(entry -> {
+ final AtomicBoolean flag =
readerRunningMap.get(entry.getKey());
+ final SourceReader<SeaTunnelRow, SplitT> reader =
entry.getValue();
+ final Collector<SeaTunnelRow> rowCollector =
collectorMap.get(entry.getKey());
+ executorService.execute(() -> {
+ while (flag.get()) {
+ try {
+ reader.pollNext(rowCollector);
+ } catch (Exception e) {
+ this.running = false;
+ flag.set(false);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ });
+ splitEnumerator.run();
+ while (this.running) {
+ Thread.sleep(SLEEP_TIME_INTERVAL);
+ }
+ }
+
+ @Override
+ protected void handleNoMoreElement(int subtaskId) {
+ super.handleNoMoreElement(subtaskId);
+ if (completedReader.incrementAndGet() == this.parallelism) {
+ CoordinatedBatchPartitionReader.this.running = false;
+ }
+ }
+ }
+}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/ParallelBatchPartitionReader.java
similarity index 76%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
rename to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/ParallelBatchPartitionReader.java
index 9f5beeb5..44320228 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/BatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/ParallelBatchPartitionReader.java
@@ -17,14 +17,13 @@
package org.apache.seatunnel.translation.spark.source.batch;
-import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.ParallelSource;
import org.apache.seatunnel.translation.spark.source.Handover;
import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
-import org.apache.seatunnel.translation.state.EmptyLock;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -35,11 +34,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
-public class BatchPartitionReader implements InputPartitionReader<InternalRow>
{
+public class ParallelBatchPartitionReader implements
InputPartitionReader<InternalRow> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BatchPartitionReader.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ParallelBatchPartitionReader.class);
protected static final Integer INTERVAL = 100;
@@ -51,21 +51,26 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
protected final ExecutorService executorService;
protected final Handover<InternalRow> handover;
+ protected final Object checkpointLock = new Object();
+
protected volatile boolean running = true;
protected volatile boolean prepare = true;
- protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
- protected volatile Collector<SeaTunnelRow> collector;
+ protected volatile BaseSourceFunction<SeaTunnelRow> internalSource;
- public BatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism, Integer subtaskId, StructType rowType) {
+ public ParallelBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, StructType rowType) {
this.source = source;
this.parallelism = parallelism;
this.subtaskId = subtaskId;
this.rowType = rowType;
- this.executorService =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1,
String.format("parallel-split-enumerator-executor-%s", subtaskId));
+ this.executorService =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1,
getEnumeratorThreadName());
this.handover = new Handover<>();
}
+ protected String getEnumeratorThreadName() {
+ return String.format("parallel-split-enumerator-executor-%s",
subtaskId);
+ }
+
@Override
public boolean next() throws IOException {
prepare();
@@ -83,31 +88,27 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
if (!prepare) {
return;
}
- this.collector = createCollector();
- this.parallelSource = createParallelSource();
+
+ this.internalSource = createInternalSource();
try {
- this.parallelSource.open();
+ this.internalSource.open();
} catch (Exception e) {
running = false;
throw new RuntimeException("");
}
executorService.execute(() -> {
try {
- parallelSource.run(collector);
+ internalSource.run(new InternalRowCollector(handover,
checkpointLock, rowType));
} catch (Exception e) {
handover.reportError(e);
- LOGGER.error("ParallelSource execute failed.", e);
+ LOGGER.error("BatchPartitionReader execute failed.", e);
running = false;
}
});
prepare = false;
}
- protected Collector<SeaTunnelRow> createCollector() {
- return new InternalRowCollector(handover, new EmptyLock(), rowType);
- }
-
- protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
+ protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
return new InternalParallelSource<>(source,
null,
parallelism,
@@ -126,13 +127,17 @@ public class BatchPartitionReader implements
InputPartitionReader<InternalRow> {
@Override
public void close() throws IOException {
running = false;
- parallelSource.close();
+ try {
+ internalSource.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
executorService.shutdown();
}
public class InternalParallelSource<SplitT extends SourceSplit, StateT>
extends ParallelSource<SeaTunnelRow, SplitT, StateT> {
- public InternalParallelSource(SeaTunnelSource<SeaTunnelRow, SplitT,
StateT> source, List<byte[]> restoredState, int parallelism, int subtaskId) {
+ public InternalParallelSource(SeaTunnelSource<SeaTunnelRow, SplitT,
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism, int
subtaskId) {
super(source, restoredState, parallelism, subtaskId);
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
index 52f0a9d9..e0c8c915 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartition.java
@@ -26,6 +26,7 @@ import
org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
import java.util.List;
+import java.util.Map;
public class ContinuousPartition implements InputPartition<InternalRow> {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
@@ -33,14 +34,14 @@ public class ContinuousPartition implements
InputPartition<InternalRow> {
protected final Integer subtaskId;
protected final StructType rowType;
protected final Integer checkpointId;
- protected final List<byte[]> restoredState;
+ protected final Map<Integer, List<byte[]>> restoredState;
public ContinuousPartition(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism,
Integer subtaskId,
StructType rowType,
Integer checkpointId,
- List<byte[]> restoredState) {
+ Map<Integer, List<byte[]>> restoredState) {
this.source = source;
this.parallelism = parallelism;
this.subtaskId = subtaskId;
@@ -51,6 +52,6 @@ public class ContinuousPartition implements
InputPartition<InternalRow> {
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new ContinuousPartitionReader(source, parallelism, subtaskId,
rowType, checkpointId, restoredState);
+ return new ParallelContinuousPartitionReader(source, parallelism,
subtaskId, rowType, checkpointId, restoredState);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
similarity index 92%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
rename to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
index 1c1f05ac..d4f1db31 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousParallelSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousSourceReader.java
@@ -35,19 +35,20 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-public class ContinuousParallelSourceReader implements ContinuousReader {
+public class ContinuousSourceReader implements ContinuousReader {
private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
private final Integer parallelism;
private final StructType rowType;
- private Map<Integer, ReaderState> readerStateMap = new HashMap<>();
+ private final Map<Integer, ReaderState> readerStateMap = new HashMap<>();
private CoordinationState coordinationState;
private int checkpointId = 1;
- public ContinuousParallelSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, StructType rowType) {
+ public ContinuousSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism, StructType rowType) {
this.source = source;
this.parallelism = parallelism;
this.rowType = rowType;
+ throw new UnsupportedOperationException("Continuous source is not
currently supported.");
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ParallelContinuousPartitionReader.java
similarity index 75%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
rename to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ParallelContinuousPartitionReader.java
index 180d3ef2..1276421a 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ContinuousPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/continnous/ParallelContinuousPartitionReader.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.source.ParallelSource;
import org.apache.seatunnel.translation.spark.source.ReaderState;
-import
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
+import
org.apache.seatunnel.translation.spark.source.batch.ParallelBatchPartitionReader;
import org.apache.spark.sql.catalyst.InternalRow;
import
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
@@ -30,19 +30,20 @@ import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
-public class ContinuousPartitionReader extends BatchPartitionReader implements
ContinuousInputPartitionReader<InternalRow> {
+public class ParallelContinuousPartitionReader extends
ParallelBatchPartitionReader implements
ContinuousInputPartitionReader<InternalRow> {
protected volatile Integer checkpointId;
- protected final List<byte[]> restoredState;
+ protected final Map<Integer, List<byte[]>> restoredState;
- public ContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer subtaskId, StructType rowType, Integer
checkpointId, List<byte[]> restoredState) {
+ public ParallelContinuousPartitionReader(SeaTunnelSource<SeaTunnelRow, ?,
?> source, Integer parallelism, Integer subtaskId, StructType rowType, Integer
checkpointId, Map<Integer, List<byte[]>> restoredState) {
super(source, parallelism, subtaskId, rowType);
this.checkpointId = checkpointId;
this.restoredState = restoredState;
}
@Override
- protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
+ protected ParallelSource<SeaTunnelRow, ?, ?> createInternalSource() {
return new InternalParallelSource<>(source,
restoredState,
parallelism,
@@ -51,9 +52,9 @@ public class ContinuousPartitionReader extends
BatchPartitionReader implements C
@Override
public PartitionOffset getOffset() {
- List<byte[]> bytes;
+ Map<Integer, List<byte[]>> bytes;
try {
- bytes = parallelSource.snapshotState(checkpointId);
+ bytes = internalSource.snapshotState(checkpointId);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -66,7 +67,7 @@ public class ContinuousPartitionReader extends
BatchPartitionReader implements C
* The method is called by RPC
*/
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- parallelSource.notifyCheckpointComplete(checkpointId);
+ internalSource.notifyCheckpointComplete(checkpointId);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
new file mode 100644
index 00000000..c2482588
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.source.micro;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
+import org.apache.seatunnel.translation.source.CoordinatedSource;
+import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
+import org.apache.seatunnel.translation.spark.source.ReaderState;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPartitionReader {
+ protected final Map<Integer, InternalRowCollector> collectorMap;
+
+ public CoordinatedMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow,
?, ?> source, Integer parallelism, Integer subtaskId, StructType rowType,
Integer checkpointId, Integer checkpointInterval, String checkpointPath, String
hdfsRoot, String hdfsUser) {
+ super(source, parallelism, subtaskId, rowType, checkpointId,
checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
+ this.collectorMap = new HashMap<>(parallelism);
+ for (int i = 0; i < parallelism; i++) {
+ collectorMap.put(i, new InternalRowCollector(handover, new
Object(), rowType));
+ }
+ }
+
+ @Override
+ public void virtualCheckpoint() {
+ try {
+ internalCheckpoint(collectorMap.values().iterator(), 0);
+ } catch (Exception e) {
+ throw new RuntimeException("An error occurred in virtual
checkpoint execution.", e);
+ }
+ }
+
+ private void internalCheckpoint(Iterator<InternalRowCollector> iterator,
int loop) throws Exception {
+ if (!iterator.hasNext()) {
+ return;
+ }
+ synchronized (iterator.next().getCheckpointLock()) {
+ internalCheckpoint(iterator, ++loop);
+ if (loop != this.parallelism) {
+ // Avoid backtracking calls
+ return;
+ }
+ while (!handover.isEmpty()) {
+ Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
+ }
+ // Block #next() method
+ synchronized (handover) {
+ final int currentCheckpoint = checkpointId;
+ ReaderState readerState = snapshotState();
+ saveState(readerState, currentCheckpoint);
+ internalSource.notifyCheckpointComplete(currentCheckpoint);
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ protected String getEnumeratorThreadName() {
+ return "coordinated-split-enumerator-executor";
+ }
+
+ @Override
+ protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
+ return new InternalCoordinatedSource<>(source,
+ null,
+ parallelism);
+ }
+
+ public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT>
extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
+ protected final AtomicInteger completedReader = new AtomicInteger(0);
+
+ public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT,
StateT> source, Map<Integer, List<byte[]>> restoredState, int parallelism) {
+ super(source, restoredState, parallelism);
+ }
+
+ @Override
+ public void run(Collector<SeaTunnelRow> collector) throws Exception {
+ readerMap.entrySet().parallelStream().forEach(entry -> {
+ final AtomicBoolean flag =
readerRunningMap.get(entry.getKey());
+ final SourceReader<SeaTunnelRow, SplitT> reader =
entry.getValue();
+ final Collector<SeaTunnelRow> rowCollector =
collectorMap.get(entry.getKey());
+ executorService.execute(() -> {
+ while (flag.get()) {
+ try {
+ reader.pollNext(rowCollector);
+ } catch (Exception e) {
+ this.running = false;
+ flag.set(false);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ });
+ splitEnumerator.run();
+ while (this.running) {
+ Thread.sleep(SLEEP_TIME_INTERVAL);
+ }
+ }
+
+ @Override
+ protected void handleNoMoreElement(int subtaskId) {
+ super.handleNoMoreElement(subtaskId);
+ if (completedReader.incrementAndGet() == this.parallelism) {
+ CoordinatedMicroBatchPartitionReader.this.running = false;
+ }
+ }
+ }
+}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
index 82457f82..e0636d2a 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartition.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source.micro;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -58,6 +59,10 @@ public class MicroBatchPartition implements
InputPartition<InternalRow> {
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new MicroBatchPartitionReader(source, parallelism, subtaskId,
rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
+ if (source instanceof SupportCoordinate) {
+ return new CoordinatedMicroBatchPartitionReader(source,
parallelism, subtaskId, rowType, checkpointId, checkpointInterval,
checkpointPath, hdfsRoot, hdfsUser);
+ } else {
+ return new ParallelMicroBatchPartitionReader(source, parallelism,
subtaskId, rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot,
hdfsUser);
+ }
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchSourceReader.java
similarity index 77%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
rename to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchSourceReader.java
index 0e12d6d2..a2128251 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.source.micro;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -31,7 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-public class MicroBatchParallelSourceReader implements MicroBatchReader {
+public class MicroBatchSourceReader implements MicroBatchReader {
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected final Integer parallelism;
@@ -45,7 +46,7 @@ public class MicroBatchParallelSourceReader implements
MicroBatchReader {
protected MicroBatchState startOffset;
protected MicroBatchState endOffset;
- public MicroBatchParallelSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source, Integer parallelism, Integer checkpointId, Integer checkpointInterval,
String checkpointPath, String hdfsRoot, String hdfsUser, StructType rowType) {
+ public MicroBatchSourceReader(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Integer parallelism, Integer checkpointId, Integer checkpointInterval, String
checkpointPath, String hdfsRoot, String hdfsUser, StructType rowType) {
this.source = source;
this.parallelism = parallelism;
this.checkpointId = checkpointId;
@@ -95,9 +96,15 @@ public class MicroBatchParallelSourceReader implements
MicroBatchReader {
@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
- List<InputPartition<InternalRow>> virtualPartitions = new
ArrayList<>(parallelism);
- for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
- virtualPartitions.add(new MicroBatchPartition(source, parallelism,
subtaskId, rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot,
hdfsUser));
+ List<InputPartition<InternalRow>> virtualPartitions;
+ if (source instanceof SupportCoordinate) {
+ virtualPartitions = new ArrayList<>(1);
+ virtualPartitions.add(new MicroBatchPartition(source, parallelism,
0, rowType, checkpointId, checkpointInterval, checkpointPath, hdfsRoot,
hdfsUser));
+ } else {
+ virtualPartitions = new ArrayList<>(parallelism);
+ for (int subtaskId = 0; subtaskId < parallelism; subtaskId++) {
+ virtualPartitions.add(new MicroBatchPartition(source,
parallelism, subtaskId, rowType, checkpointId, checkpointInterval,
checkpointPath, hdfsRoot, hdfsUser));
+ }
}
checkpointId++;
return virtualPartitions;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/ParallelMicroBatchPartitionReader.java
similarity index 70%
rename from
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
rename to
seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/ParallelMicroBatchPartitionReader.java
index 28f74e8c..33a1427b 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/ParallelMicroBatchPartitionReader.java
@@ -17,17 +17,15 @@
package org.apache.seatunnel.translation.spark.source.micro;
-import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.state.CheckpointLock;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.translation.source.ParallelSource;
-import org.apache.seatunnel.translation.spark.source.InternalRowCollector;
+import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.spark.source.ReaderState;
-import
org.apache.seatunnel.translation.spark.source.batch.BatchPartitionReader;
+import
org.apache.seatunnel.translation.spark.source.batch.ParallelBatchPartitionReader;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,33 +37,33 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public class MicroBatchPartitionReader extends BatchPartitionReader {
+public class ParallelMicroBatchPartitionReader extends
ParallelBatchPartitionReader {
protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
protected volatile Integer checkpointId;
-
- protected final CheckpointLock checkpointLock = new
SingleReaderCheckpointLock();
protected final Integer checkpointInterval;
protected final String checkpointPath;
protected final String hdfsRoot;
protected final String hdfsUser;
- protected List<byte[]> restoredState;
+ protected Map<Integer, List<byte[]>> restoredState;
protected ScheduledThreadPoolExecutor executor;
protected FileSystem fileSystem;
- public MicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?>
source,
- Integer parallelism,
- Integer subtaskId,
- StructType rowType,
- Integer checkpointId,
- Integer checkpointInterval,
- String checkpointPath,
- String hdfsRoot,
- String hdfsUser) {
+ public ParallelMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?,
?> source,
+ Integer parallelism,
+ Integer subtaskId,
+ StructType rowType,
+ Integer checkpointId,
+ Integer checkpointInterval,
+ String checkpointPath,
+ String hdfsRoot,
+ String hdfsUser) {
super(source, parallelism, subtaskId, rowType);
this.checkpointId = checkpointId;
this.checkpointInterval = checkpointInterval;
@@ -75,24 +73,17 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
}
@Override
- protected ParallelSource<SeaTunnelRow, ?, ?> createParallelSource() {
+ protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
return new InternalParallelSource<>(source,
restoredState,
parallelism,
subtaskId);
}
- @Override
- protected Collector<SeaTunnelRow> createCollector() {
- return new InternalRowCollector(handover, checkpointLock, rowType);
- }
-
@Override
protected void prepare() {
- Configuration configuration = new Configuration();
- configuration.set("fs.defaultFS", hdfsRoot);
try {
- this.fileSystem = FileSystem.get(new URI(hdfsRoot), configuration,
hdfsUser);
+ this.fileSystem = getFileSystem();
this.restoredState = restoreState(checkpointId - 1);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -101,10 +92,20 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
prepareCheckpoint();
}
+ protected FileSystem getFileSystem() throws URISyntaxException,
IOException, InterruptedException {
+ Configuration configuration = new Configuration();
+ configuration.set("fs.defaultFS", hdfsRoot);
+ if (StringUtils.isNotBlank(hdfsUser)) {
+ return FileSystem.get(new URI(hdfsRoot), configuration, hdfsUser);
+ } else {
+ return FileSystem.get(new URI(hdfsRoot), configuration);
+ }
+ }
+
protected ReaderState snapshotState() {
- List<byte[]> bytes;
+ Map<Integer, List<byte[]>> bytes;
try {
- bytes = parallelSource.snapshotState(checkpointId);
+ bytes = internalSource.snapshotState(checkpointId);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -117,9 +118,8 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
}
public void virtualCheckpoint() {
- checkpointLock.lock();
try {
- synchronized (collector) {
+ synchronized (checkpointLock) {
while (!handover.isEmpty()) {
Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
}
@@ -128,19 +128,17 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
final int currentCheckpoint = checkpointId;
ReaderState readerState = snapshotState();
saveState(readerState, currentCheckpoint);
- parallelSource.notifyCheckpointComplete(currentCheckpoint);
+ internalSource.notifyCheckpointComplete(currentCheckpoint);
running = false;
}
}
} catch (Exception e) {
throw new RuntimeException("An error occurred in virtual
checkpoint execution.", e);
- } finally {
- checkpointLock.unlock();
}
}
- private List<byte[]> restoreState(int checkpointId) throws IOException {
- Path hdfsPath = new Path(checkpointPath + File.separator +
checkpointId);
+ private Map<Integer, List<byte[]>> restoreState(int checkpointId) throws
IOException {
+ Path hdfsPath = getCheckpointPathWithId(checkpointId);
if (!fileSystem.exists(hdfsPath)) {
return null;
}
@@ -159,9 +157,9 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
}
}
- private void saveState(ReaderState readerState, int checkpointId) throws
IOException {
+ protected void saveState(ReaderState readerState, int checkpointId) throws
IOException {
byte[] bytes = SerializationUtils.serialize(readerState);
- Path hdfsPath = new Path(checkpointPath + File.separator +
checkpointId);
+ Path hdfsPath = getCheckpointPathWithId(checkpointId);
if (!fileSystem.exists(hdfsPath)) {
fileSystem.createNewFile(hdfsPath);
}
@@ -173,6 +171,10 @@ public class MicroBatchPartitionReader extends
BatchPartitionReader {
}
}
+ private Path getCheckpointPathWithId(int checkpointId) {
+ return new Path(this.checkpointPath + File.separator + this.subtaskId
+ File.separator + checkpointId);
+ }
+
@Override
public void close() throws IOException {
fileSystem.close();