This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4191c62bae [Improve][API & Zeta] Using connector custom serializer
encode/decode states (#5238)
4191c62bae is described below
commit 4191c62bae33dbe68a6dd1f4855cd193105c31cf
Author: hailin0 <[email protected]>
AuthorDate: Fri Aug 11 15:45:38 2023 +0800
[Improve][API & Zeta] Using connector custom serializer encode/decode
states (#5238)
* API: Using DefaultSerializer as connector sink default serializer
* Zeta: Using connector custom serializer encode/decode states
---
.../api/serialization/DefaultSerializer.java | 3 ++
.../server/dag/physical/PhysicalPlanGenerator.java | 5 ++-
.../server/task/SinkAggregatedCommitterTask.java | 6 +++
.../engine/server/task/SourceSeaTunnelTask.java | 15 +++++++-
.../server/task/SourceSplitEnumeratorTask.java | 3 ++
.../context/SeaTunnelSplitEnumeratorContext.java | 16 +++++---
.../engine/server/task/flow/SinkFlowLifeCycle.java | 14 +++++--
.../server/task/flow/SourceFlowLifeCycle.java | 10 +----
.../operation/sink/SinkPrepareCommitOperation.java | 20 +++++++---
.../operation/source/AssignSplitOperation.java | 37 ++++++++++++------
.../operation/source/RestoredSplitOperation.java | 45 +++++++++++++---------
11 files changed, 119 insertions(+), 55 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index 2100b9529c..5fabe2a284 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -35,6 +35,9 @@ public class DefaultSerializer<T extends Serializable>
implements Serializer<T>
@Override
public T deserialize(byte[] serialized) throws IOException {
+ if (serialized == null) {
+ return null;
+ }
return SerializationUtils.deserialize(serialized);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 69d72d7130..a238ae134c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -552,7 +552,10 @@ public class PhysicalPlanGenerator {
.getJobId(),
taskLocation,
finalParallelismIndex,
- f);
+
(PhysicalExecutionFlow<
+
SourceAction,
+
SourceConfig>)
+
f);
} else {
return new
TransformSeaTunnelTask(
jobImmutableInformation
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 1a8ecf29c8..a904146a1d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.commons.collections4.CollectionUtils;
import com.hazelcast.cluster.Address;
+import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -75,6 +77,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
private final SinkAggregatedCommitter<CommandInfoT, AggregatedCommitInfoT>
aggregatedCommitter;
private transient Serializer<AggregatedCommitInfoT>
aggregatedCommitInfoSerializer;
+ @Getter private transient Serializer<CommandInfoT> commitInfoSerializer;
+
private Map<Long, Address> writerAddressMap;
private ConcurrentMap<Long, List<CommandInfoT>> commitInfoCache;
@@ -107,6 +111,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
this.writerAddressMap = new ConcurrentHashMap<>();
this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
this.completableFuture = new CompletableFuture<>();
+ this.commitInfoSerializer =
sink.getSink().getCommitInfoSerializer().get();
this.aggregatedCommitInfoSerializer =
sink.getSink().getAggregatedCommitInfoSerializer().get();
log.debug(
@@ -250,6 +255,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
actionStateList.stream()
.map(ActionSubtaskState::getState)
.flatMap(Collection::stream)
+ .filter(Objects::nonNull)
.map(
bytes ->
sneaky(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 842cf8a602..8650dc7f2a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -18,10 +18,11 @@
package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
-import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
@@ -29,6 +30,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import lombok.Getter;
import lombok.NonNull;
import java.util.List;
@@ -41,15 +43,24 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
private transient SeaTunnelSourceCollector<T> collector;
private transient Object checkpointLock;
+ @Getter private transient Serializer<SplitT> splitSerializer;
+ private final PhysicalExecutionFlow<SourceAction, SourceConfig> sourceFlow;
- public SourceSeaTunnelTask(long jobID, TaskLocation taskID, int indexID,
Flow executionFlow) {
+ public SourceSeaTunnelTask(
+ long jobID,
+ TaskLocation taskID,
+ int indexID,
+ PhysicalExecutionFlow<SourceAction, SourceConfig> executionFlow) {
super(jobID, taskID, indexID, executionFlow);
+ this.sourceFlow = executionFlow;
}
@Override
public void init() throws Exception {
super.init();
this.checkpointLock = new Object();
+ this.splitSerializer =
sourceFlow.getAction().getSource().getSplitSerializer();
+
LOGGER.info("starting seatunnel source task, index " + indexID);
if (!(startFlowLifeCycle instanceof SourceFlowLifeCycle)) {
throw new TaskRuntimeException(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index da5fa8aeb3..25fdbc9638 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import com.hazelcast.cluster.Address;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -77,6 +78,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
private SeaTunnelSplitEnumeratorContext<SplitT> enumeratorContext;
private Serializer<Serializable> enumeratorStateSerializer;
+ @Getter private Serializer<SplitT> splitSerializer;
private int maxReaderSize;
private Set<Long> unfinishedReaders;
@@ -102,6 +104,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
new SeaTunnelSplitEnumeratorContext<>(
this.source.getParallelism(), this,
getMetricsContext());
enumeratorStateSerializer =
this.source.getSource().getEnumeratorStateSerializer();
+ splitSerializer = this.source.getSource().getSplitSerializer();
taskMemberMapping = new ConcurrentHashMap<>();
taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
taskIndexToTaskLocationMapping = new ConcurrentHashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index c3cce03d3b..110562e494 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
@@ -31,6 +30,9 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@Slf4j
public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit>
@@ -67,22 +69,26 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
log.warn("No reader is obtained, skip this assign!");
return;
}
+
+ List<byte[]> splitBytes =
+ splits.stream()
+ .map(split -> sneaky(() ->
task.getSplitSerializer().serialize(split)))
+ .collect(Collectors.toList());
task.getExecutionContext()
.sendToMember(
new AssignSplitOperation<>(
-
task.getTaskMemberLocationByIndex(subtaskIndex),
-
SerializationUtils.serialize(splits.toArray())),
+
task.getTaskMemberLocationByIndex(subtaskIndex), splitBytes),
task.getTaskMemberAddressByIndex(subtaskIndex))
.join();
}
@Override
public void signalNoMoreSplits(int subtaskIndex) {
+ List<byte[]> emptySplits = Collections.emptyList();
task.getExecutionContext()
.sendToMember(
new AssignSplitOperation<>(
-
task.getTaskMemberLocationByIndex(subtaskIndex),
-
SerializationUtils.serialize(Collections.emptyList().toArray())),
+
task.getTaskMemberLocationByIndex(subtaskIndex), emptySplits),
task.getTaskMemberAddressByIndex(subtaskIndex))
.join();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 7e6d73c549..c51e3483c0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -48,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -66,6 +66,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT>
sinkAction;
private SinkWriter<T, CommitInfoT, StateT> writer;
+ private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
private transient Optional<Serializer<StateT>> writerStateSerializer;
private final int indexID;
@@ -110,6 +111,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
@Override
public void init() throws Exception {
+ this.commitInfoSerializer =
sinkAction.getSink().getCommitInfoSerializer();
this.writerStateSerializer =
sinkAction.getSink().getWriterStateSerializer();
this.committer = sinkAction.getSink().createCommitter();
this.lastCommitInfo = Optional.empty();
@@ -184,10 +186,14 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
runningTask
.getExecutionContext()
.sendToMember(
- new SinkPrepareCommitOperation(
+ new
SinkPrepareCommitOperation<CommitInfoT>(
barrier,
committerTaskLocation,
-
SerializationUtils.serialize(commitInfoT)),
+
commitInfoSerializer.isPresent()
+ ? commitInfoSerializer
+ .get()
+
.serialize(commitInfoT)
+ : null),
committerTaskAddress)
.join();
}
@@ -247,9 +253,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
if (writerStateSerializer.isPresent()) {
states =
actionStateList.stream()
- .filter(state -> writerStateSerializer.isPresent())
.map(ActionSubtaskState::getState)
.flatMap(Collection::stream)
+ .filter(Objects::nonNull)
.map(
bytes ->
sneaky(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b883bd8ffd..572836fe51 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -59,7 +58,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
import static
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
@Slf4j
@@ -338,21 +336,17 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
if (actionStateList.isEmpty()) {
return;
}
- List<SplitT> splits =
+ List<byte[]> splits =
actionStateList.stream()
.map(ActionSubtaskState::getState)
.flatMap(Collection::stream)
.filter(Objects::nonNull)
- .map(bytes -> sneaky(() ->
splitSerializer.deserialize(bytes)))
.collect(Collectors.toList());
try {
runningTask
.getExecutionContext()
.sendToMember(
- new RestoredSplitOperation(
- enumeratorTaskLocation,
-
SerializationUtils.serialize(splits.toArray()),
- indexID),
+ new RestoredSplitOperation(enumeratorTaskLocation,
splits, indexID),
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index 06945a61b2..5ed6f81a7a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.server.task.operation.sink;
-import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -33,7 +32,7 @@ import lombok.NoArgsConstructor;
import java.io.IOException;
@NoArgsConstructor
-public class SinkPrepareCommitOperation extends BarrierFlowOperation {
+public class SinkPrepareCommitOperation<CommitInfoT> extends
BarrierFlowOperation {
private byte[] commitInfos;
public SinkPrepareCommitOperation(
@@ -73,15 +72,24 @@ public class SinkPrepareCommitOperation extends
BarrierFlowOperation {
public void run() throws Exception {
TaskExecutionService taskExecutionService =
((SeaTunnelServer) getService()).getTaskExecutionService();
- SinkAggregatedCommitterTask<?, ?> committerTask =
+ SinkAggregatedCommitterTask<CommitInfoT, ?> committerTask =
taskExecutionService.getTask(taskLocation);
- ClassLoader classLoader =
+ ClassLoader taskClassLoader =
taskExecutionService
.getExecutionContext(taskLocation.getTaskGroupLocation())
.getClassLoader();
+ ClassLoader mainClassLoader =
Thread.currentThread().getContextClassLoader();
+
if (commitInfos != null) {
- committerTask.receivedWriterCommitInfo(
- barrier.getId(),
SerializationUtils.deserialize(commitInfos, classLoader));
+ CommitInfoT deserializeCommitInfo = null;
+ try {
+ Thread.currentThread().setContextClassLoader(taskClassLoader);
+ deserializeCommitInfo =
+
committerTask.getCommitInfoSerializer().deserialize(commitInfos);
+ } finally {
+ Thread.currentThread().setContextClassLoader(mainClassLoader);
+ }
+ committerTask.receivedWriterCommitInfo(barrier.getId(),
deserializeCommitInfo);
}
committerTask.triggerBarrier(barrier);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 637a48e8ab..b21111e18f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.RetryUtils;
-import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
@@ -33,18 +32,18 @@ import
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
+import java.util.List;
public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
implements IdentifiedDataSerializable {
- private byte[] splits;
+ private List<byte[]> splits;
private TaskLocation taskID;
public AssignSplitOperation() {}
- public AssignSplitOperation(TaskLocation taskID, byte[] splits) {
+ public AssignSplitOperation(TaskLocation taskID, List<byte[]> splits) {
this.taskID = taskID;
this.splits = splits;
}
@@ -56,13 +55,22 @@ public class AssignSplitOperation<SplitT extends
SourceSplit> extends Operation
() -> {
SourceSeaTunnelTask<?, SplitT> task =
server.getTaskExecutionService().getTask(taskID);
- ClassLoader classLoader =
+ ClassLoader taskClassLoader =
server.getTaskExecutionService()
.getExecutionContext(taskID.getTaskGroupLocation())
.getClassLoader();
- Object[] o = SerializationUtils.deserialize(splits,
classLoader);
- task.receivedSourceSplit(
- Arrays.stream(o).map(i -> (SplitT)
i).collect(Collectors.toList()));
+ ClassLoader mainClassLoader =
Thread.currentThread().getContextClassLoader();
+ List<SplitT> deserializeSplits = new ArrayList<>();
+ try {
+
Thread.currentThread().setContextClassLoader(taskClassLoader);
+ for (byte[] split : this.splits) {
+
deserializeSplits.add(task.getSplitSerializer().deserialize(split));
+ }
+ } finally {
+
Thread.currentThread().setContextClassLoader(mainClassLoader);
+ }
+
+ task.receivedSourceSplit(deserializeSplits);
return null;
},
new RetryUtils.RetryMaterial(
@@ -76,13 +84,20 @@ public class AssignSplitOperation<SplitT extends
SourceSplit> extends Operation
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- out.writeByteArray(splits);
+ out.writeInt(splits.size());
+ for (byte[] split : splits) {
+ out.writeByteArray(split);
+ }
out.writeObject(taskID);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- splits = in.readByteArray();
+ int splitCount = in.readInt();
+ splits = new ArrayList<>(splitCount);
+ for (int i = 0; i < splitCount; i++) {
+ splits.add(in.readByteArray());
+ }
taskID = in.readObject();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 0c9c3d95c9..05fbf6537e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.RetryUtils;
-import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
@@ -34,19 +33,18 @@ import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
public class RestoredSplitOperation extends TaskOperation {
- private byte[] splits;
+ private List<byte[]> splits;
private Integer subtaskIndex;
public RestoredSplitOperation() {}
public RestoredSplitOperation(
- TaskLocation enumeratorLocation, byte[] splits, int subtaskIndex) {
+ TaskLocation enumeratorLocation, List<byte[]> splits, int
subtaskIndex) {
super(enumeratorLocation);
this.splits = splits;
this.subtaskIndex = subtaskIndex;
@@ -55,14 +53,21 @@ public class RestoredSplitOperation extends TaskOperation {
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- out.writeByteArray(splits);
+ out.writeInt(splits.size());
+ for (byte[] split : splits) {
+ out.writeByteArray(split);
+ }
out.writeInt(subtaskIndex);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- splits = in.readByteArray();
+ int splitCount = in.readInt();
+ splits = new ArrayList<>(splitCount);
+ for (int i = 0; i < splitCount; i++) {
+ splits.add(in.readByteArray());
+ }
subtaskIndex = in.readInt();
}
@@ -82,27 +87,31 @@ public class RestoredSplitOperation extends TaskOperation {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
RetryUtils.retryWithException(
() -> {
- ClassLoader classLoader =
+ SourceSplitEnumeratorTask<SourceSplit> task =
+ taskExecutionService.getTask(taskLocation);
+ ClassLoader taskClassLoader =
taskExecutionService
.getExecutionContext(taskLocation.getTaskGroupLocation())
.getClassLoader();
+ ClassLoader mainClassLoader =
Thread.currentThread().getContextClassLoader();
+
+ List<SourceSplit> deserializeSplits = new ArrayList<>();
+ try {
+
Thread.currentThread().setContextClassLoader(taskClassLoader);
+ for (byte[] split : splits) {
+
deserializeSplits.add(task.getSplitSerializer().deserialize(split));
+ }
+ } finally {
+
Thread.currentThread().setContextClassLoader(mainClassLoader);
+ }
- List<SourceSplit> deserialize =
- Arrays.stream(
- (Object[])
-
SerializationUtils.deserialize(
- splits,
classLoader))
- .map(o -> (SourceSplit) o)
- .collect(Collectors.toList());
- SourceSplitEnumeratorTask<SourceSplit> task =
- taskExecutionService.getTask(taskLocation);
task.getExecutionContext()
.getTaskExecutionService()
.asyncExecuteFunction(
taskLocation.getTaskGroupLocation(),
() -> {
try {
- task.addSplitsBack(deserialize,
subtaskIndex);
+
task.addSplitsBack(deserializeSplits, subtaskIndex);
} catch (Exception e) {
task.getExecutionContext()
.sendToMaster(