This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4191c62bae [Improve][API & Zeta] Using connector custom serializer 
encode/decode states (#5238)
4191c62bae is described below

commit 4191c62bae33dbe68a6dd1f4855cd193105c31cf
Author: hailin0 <[email protected]>
AuthorDate: Fri Aug 11 15:45:38 2023 +0800

    [Improve][API & Zeta] Using connector custom serializer encode/decode 
states (#5238)
    
    * API: Using DefaultSerializer as connector sink default serializer
    * Zeta: Using connector custom serializer encode/decode states
---
 .../api/serialization/DefaultSerializer.java       |  3 ++
 .../server/dag/physical/PhysicalPlanGenerator.java |  5 ++-
 .../server/task/SinkAggregatedCommitterTask.java   |  6 +++
 .../engine/server/task/SourceSeaTunnelTask.java    | 15 +++++++-
 .../server/task/SourceSplitEnumeratorTask.java     |  3 ++
 .../context/SeaTunnelSplitEnumeratorContext.java   | 16 +++++---
 .../engine/server/task/flow/SinkFlowLifeCycle.java | 14 +++++--
 .../server/task/flow/SourceFlowLifeCycle.java      | 10 +----
 .../operation/sink/SinkPrepareCommitOperation.java | 20 +++++++---
 .../operation/source/AssignSplitOperation.java     | 37 ++++++++++++------
 .../operation/source/RestoredSplitOperation.java   | 45 +++++++++++++---------
 11 files changed, 119 insertions(+), 55 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index 2100b9529c..5fabe2a284 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -35,6 +35,9 @@ public class DefaultSerializer<T extends Serializable> 
implements Serializer<T>
 
     @Override
     public T deserialize(byte[] serialized) throws IOException {
+        if (serialized == null) {
+            return null;
+        }
         return SerializationUtils.deserialize(serialized);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 69d72d7130..a238ae134c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -552,7 +552,10 @@ public class PhysicalPlanGenerator {
                                                                                
 .getJobId(),
                                                                         
taskLocation,
                                                                         
finalParallelismIndex,
-                                                                        f);
+                                                                        
(PhysicalExecutionFlow<
+                                                                               
         SourceAction,
+                                                                               
         SourceConfig>)
+                                                                               
 f);
                                                             } else {
                                                                 return new 
TransformSeaTunnelTask(
                                                                         
jobImmutableInformation
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 1a8ecf29c8..a904146a1d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.commons.collections4.CollectionUtils;
 
 import com.hazelcast.cluster.Address;
+import lombok.Getter;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -75,6 +77,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
     private final SinkAggregatedCommitter<CommandInfoT, AggregatedCommitInfoT> 
aggregatedCommitter;
 
     private transient Serializer<AggregatedCommitInfoT> 
aggregatedCommitInfoSerializer;
+    @Getter private transient Serializer<CommandInfoT> commitInfoSerializer;
+
     private Map<Long, Address> writerAddressMap;
 
     private ConcurrentMap<Long, List<CommandInfoT>> commitInfoCache;
@@ -107,6 +111,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
         this.writerAddressMap = new ConcurrentHashMap<>();
         this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
         this.completableFuture = new CompletableFuture<>();
+        this.commitInfoSerializer = 
sink.getSink().getCommitInfoSerializer().get();
         this.aggregatedCommitInfoSerializer =
                 sink.getSink().getAggregatedCommitInfoSerializer().get();
         log.debug(
@@ -250,6 +255,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
                 actionStateList.stream()
                         .map(ActionSubtaskState::getState)
                         .flatMap(Collection::stream)
+                        .filter(Objects::nonNull)
                         .map(
                                 bytes ->
                                         sneaky(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 842cf8a602..8650dc7f2a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -18,10 +18,11 @@
 package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
-import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
+import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
@@ -29,6 +30,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import lombok.Getter;
 import lombok.NonNull;
 
 import java.util.List;
@@ -41,15 +43,24 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
     private transient SeaTunnelSourceCollector<T> collector;
 
     private transient Object checkpointLock;
+    @Getter private transient Serializer<SplitT> splitSerializer;
+    private final PhysicalExecutionFlow<SourceAction, SourceConfig> sourceFlow;
 
-    public SourceSeaTunnelTask(long jobID, TaskLocation taskID, int indexID, 
Flow executionFlow) {
+    public SourceSeaTunnelTask(
+            long jobID,
+            TaskLocation taskID,
+            int indexID,
+            PhysicalExecutionFlow<SourceAction, SourceConfig> executionFlow) {
         super(jobID, taskID, indexID, executionFlow);
+        this.sourceFlow = executionFlow;
     }
 
     @Override
     public void init() throws Exception {
         super.init();
         this.checkpointLock = new Object();
+        this.splitSerializer = 
sourceFlow.getAction().getSource().getSplitSerializer();
+
         LOGGER.info("starting seatunnel source task, index " + indexID);
         if (!(startFlowLifeCycle instanceof SourceFlowLifeCycle)) {
             throw new TaskRuntimeException(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index da5fa8aeb3..25fdbc9638 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import lombok.Getter;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -77,6 +78,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
     private SeaTunnelSplitEnumeratorContext<SplitT> enumeratorContext;
 
     private Serializer<Serializable> enumeratorStateSerializer;
+    @Getter private Serializer<SplitT> splitSerializer;
 
     private int maxReaderSize;
     private Set<Long> unfinishedReaders;
@@ -102,6 +104,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 new SeaTunnelSplitEnumeratorContext<>(
                         this.source.getParallelism(), this, 
getMetricsContext());
         enumeratorStateSerializer = 
this.source.getSource().getEnumeratorStateSerializer();
+        splitSerializer = this.source.getSource().getSplitSerializer();
         taskMemberMapping = new ConcurrentHashMap<>();
         taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
         taskIndexToTaskLocationMapping = new ConcurrentHashMap<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index c3cce03d3b..110562e494 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 import 
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
 
@@ -31,6 +30,9 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
 
 @Slf4j
 public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit>
@@ -67,22 +69,26 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
             log.warn("No reader is obtained, skip this assign!");
             return;
         }
+
+        List<byte[]> splitBytes =
+                splits.stream()
+                        .map(split -> sneaky(() -> 
task.getSplitSerializer().serialize(split)))
+                        .collect(Collectors.toList());
         task.getExecutionContext()
                 .sendToMember(
                         new AssignSplitOperation<>(
-                                
task.getTaskMemberLocationByIndex(subtaskIndex),
-                                
SerializationUtils.serialize(splits.toArray())),
+                                
task.getTaskMemberLocationByIndex(subtaskIndex), splitBytes),
                         task.getTaskMemberAddressByIndex(subtaskIndex))
                 .join();
     }
 
     @Override
     public void signalNoMoreSplits(int subtaskIndex) {
+        List<byte[]> emptySplits = Collections.emptyList();
         task.getExecutionContext()
                 .sendToMember(
                         new AssignSplitOperation<>(
-                                
task.getTaskMemberLocationByIndex(subtaskIndex),
-                                
SerializationUtils.serialize(Collections.emptyList().toArray())),
+                                
task.getTaskMemberLocationByIndex(subtaskIndex), emptySplits),
                         task.getTaskMemberAddressByIndex(subtaskIndex))
                 .join();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 7e6d73c549..c51e3483c0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -48,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -66,6 +66,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
     private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> 
sinkAction;
     private SinkWriter<T, CommitInfoT, StateT> writer;
 
+    private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
     private transient Optional<Serializer<StateT>> writerStateSerializer;
 
     private final int indexID;
@@ -110,6 +111,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     @Override
     public void init() throws Exception {
+        this.commitInfoSerializer = 
sinkAction.getSink().getCommitInfoSerializer();
         this.writerStateSerializer = 
sinkAction.getSink().getWriterStateSerializer();
         this.committer = sinkAction.getSink().createCommitter();
         this.lastCommitInfo = Optional.empty();
@@ -184,10 +186,14 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                         runningTask
                                 .getExecutionContext()
                                 .sendToMember(
-                                        new SinkPrepareCommitOperation(
+                                        new 
SinkPrepareCommitOperation<CommitInfoT>(
                                                 barrier,
                                                 committerTaskLocation,
-                                                
SerializationUtils.serialize(commitInfoT)),
+                                                
commitInfoSerializer.isPresent()
+                                                        ? commitInfoSerializer
+                                                                .get()
+                                                                
.serialize(commitInfoT)
+                                                        : null),
                                         committerTaskAddress)
                                 .join();
                     }
@@ -247,9 +253,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         if (writerStateSerializer.isPresent()) {
             states =
                     actionStateList.stream()
-                            .filter(state -> writerStateSerializer.isPresent())
                             .map(ActionSubtaskState::getState)
                             .flatMap(Collection::stream)
+                            .filter(Objects::nonNull)
                             .map(
                                     bytes ->
                                             sneaky(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b883bd8ffd..572836fe51 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -59,7 +58,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
 import static 
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
 
 @Slf4j
@@ -338,21 +336,17 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         if (actionStateList.isEmpty()) {
             return;
         }
-        List<SplitT> splits =
+        List<byte[]> splits =
                 actionStateList.stream()
                         .map(ActionSubtaskState::getState)
                         .flatMap(Collection::stream)
                         .filter(Objects::nonNull)
-                        .map(bytes -> sneaky(() -> 
splitSerializer.deserialize(bytes)))
                         .collect(Collectors.toList());
         try {
             runningTask
                     .getExecutionContext()
                     .sendToMember(
-                            new RestoredSplitOperation(
-                                    enumeratorTaskLocation,
-                                    
SerializationUtils.serialize(splits.toArray()),
-                                    indexID),
+                            new RestoredSplitOperation(enumeratorTaskLocation, 
splits, indexID),
                             enumeratorTaskAddress)
                     .get();
         } catch (InterruptedException | ExecutionException e) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index 06945a61b2..5ed6f81a7a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.server.task.operation.sink;
 
-import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -33,7 +32,7 @@ import lombok.NoArgsConstructor;
 import java.io.IOException;
 
 @NoArgsConstructor
-public class SinkPrepareCommitOperation extends BarrierFlowOperation {
+public class SinkPrepareCommitOperation<CommitInfoT> extends 
BarrierFlowOperation {
     private byte[] commitInfos;
 
     public SinkPrepareCommitOperation(
@@ -73,15 +72,24 @@ public class SinkPrepareCommitOperation extends 
BarrierFlowOperation {
     public void run() throws Exception {
         TaskExecutionService taskExecutionService =
                 ((SeaTunnelServer) getService()).getTaskExecutionService();
-        SinkAggregatedCommitterTask<?, ?> committerTask =
+        SinkAggregatedCommitterTask<CommitInfoT, ?> committerTask =
                 taskExecutionService.getTask(taskLocation);
-        ClassLoader classLoader =
+        ClassLoader taskClassLoader =
                 taskExecutionService
                         
.getExecutionContext(taskLocation.getTaskGroupLocation())
                         .getClassLoader();
+        ClassLoader mainClassLoader = 
Thread.currentThread().getContextClassLoader();
+
         if (commitInfos != null) {
-            committerTask.receivedWriterCommitInfo(
-                    barrier.getId(), 
SerializationUtils.deserialize(commitInfos, classLoader));
+            CommitInfoT deserializeCommitInfo = null;
+            try {
+                Thread.currentThread().setContextClassLoader(taskClassLoader);
+                deserializeCommitInfo =
+                        
committerTask.getCommitInfoSerializer().deserialize(commitInfos);
+            } finally {
+                Thread.currentThread().setContextClassLoader(mainClassLoader);
+            }
+            committerTask.receivedWriterCommitInfo(barrier.getId(), 
deserializeCommitInfo);
         }
         committerTask.triggerBarrier(barrier);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 637a48e8ab..b21111e18f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.common.utils.RetryUtils;
-import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
@@ -33,18 +32,18 @@ import 
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
+import java.util.List;
 
 public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
         implements IdentifiedDataSerializable {
 
-    private byte[] splits;
+    private List<byte[]> splits;
     private TaskLocation taskID;
 
     public AssignSplitOperation() {}
 
-    public AssignSplitOperation(TaskLocation taskID, byte[] splits) {
+    public AssignSplitOperation(TaskLocation taskID, List<byte[]> splits) {
         this.taskID = taskID;
         this.splits = splits;
     }
@@ -56,13 +55,22 @@ public class AssignSplitOperation<SplitT extends 
SourceSplit> extends Operation
                 () -> {
                     SourceSeaTunnelTask<?, SplitT> task =
                             server.getTaskExecutionService().getTask(taskID);
-                    ClassLoader classLoader =
+                    ClassLoader taskClassLoader =
                             server.getTaskExecutionService()
                                     
.getExecutionContext(taskID.getTaskGroupLocation())
                                     .getClassLoader();
-                    Object[] o = SerializationUtils.deserialize(splits, 
classLoader);
-                    task.receivedSourceSplit(
-                            Arrays.stream(o).map(i -> (SplitT) 
i).collect(Collectors.toList()));
+                    ClassLoader mainClassLoader = 
Thread.currentThread().getContextClassLoader();
+                    List<SplitT> deserializeSplits = new ArrayList<>();
+                    try {
+                        
Thread.currentThread().setContextClassLoader(taskClassLoader);
+                        for (byte[] split : this.splits) {
+                            
deserializeSplits.add(task.getSplitSerializer().deserialize(split));
+                        }
+                    } finally {
+                        
Thread.currentThread().setContextClassLoader(mainClassLoader);
+                    }
+
+                    task.receivedSourceSplit(deserializeSplits);
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
@@ -76,13 +84,20 @@ public class AssignSplitOperation<SplitT extends 
SourceSplit> extends Operation
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeByteArray(splits);
+        out.writeInt(splits.size());
+        for (byte[] split : splits) {
+            out.writeByteArray(split);
+        }
         out.writeObject(taskID);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        splits = in.readByteArray();
+        int splitCount = in.readInt();
+        splits = new ArrayList<>(splitCount);
+        for (int i = 0; i < splitCount; i++) {
+            splits.add(in.readByteArray());
+        }
         taskID = in.readObject();
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 0c9c3d95c9..05fbf6537e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.common.utils.RetryUtils;
-import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
@@ -34,19 +33,18 @@ import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class RestoredSplitOperation extends TaskOperation {
 
-    private byte[] splits;
+    private List<byte[]> splits;
     private Integer subtaskIndex;
 
     public RestoredSplitOperation() {}
 
     public RestoredSplitOperation(
-            TaskLocation enumeratorLocation, byte[] splits, int subtaskIndex) {
+            TaskLocation enumeratorLocation, List<byte[]> splits, int 
subtaskIndex) {
         super(enumeratorLocation);
         this.splits = splits;
         this.subtaskIndex = subtaskIndex;
@@ -55,14 +53,21 @@ public class RestoredSplitOperation extends TaskOperation {
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        out.writeByteArray(splits);
+        out.writeInt(splits.size());
+        for (byte[] split : splits) {
+            out.writeByteArray(split);
+        }
         out.writeInt(subtaskIndex);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        splits = in.readByteArray();
+        int splitCount = in.readInt();
+        splits = new ArrayList<>(splitCount);
+        for (int i = 0; i < splitCount; i++) {
+            splits.add(in.readByteArray());
+        }
         subtaskIndex = in.readInt();
     }
 
@@ -82,27 +87,31 @@ public class RestoredSplitOperation extends TaskOperation {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
         RetryUtils.retryWithException(
                 () -> {
-                    ClassLoader classLoader =
+                    SourceSplitEnumeratorTask<SourceSplit> task =
+                            taskExecutionService.getTask(taskLocation);
+                    ClassLoader taskClassLoader =
                             taskExecutionService
                                     
.getExecutionContext(taskLocation.getTaskGroupLocation())
                                     .getClassLoader();
+                    ClassLoader mainClassLoader = 
Thread.currentThread().getContextClassLoader();
+
+                    List<SourceSplit> deserializeSplits = new ArrayList<>();
+                    try {
+                        
Thread.currentThread().setContextClassLoader(taskClassLoader);
+                        for (byte[] split : splits) {
+                            
deserializeSplits.add(task.getSplitSerializer().deserialize(split));
+                        }
+                    } finally {
+                        
Thread.currentThread().setContextClassLoader(mainClassLoader);
+                    }
 
-                    List<SourceSplit> deserialize =
-                            Arrays.stream(
-                                            (Object[])
-                                                    
SerializationUtils.deserialize(
-                                                            splits, 
classLoader))
-                                    .map(o -> (SourceSplit) o)
-                                    .collect(Collectors.toList());
-                    SourceSplitEnumeratorTask<SourceSplit> task =
-                            taskExecutionService.getTask(taskLocation);
                     task.getExecutionContext()
                             .getTaskExecutionService()
                             .asyncExecuteFunction(
                                     taskLocation.getTaskGroupLocation(),
                                     () -> {
                                         try {
-                                            task.addSplitsBack(deserialize, 
subtaskIndex);
+                                            
task.addSplitsBack(deserializeSplits, subtaskIndex);
                                         } catch (Exception e) {
                                             task.getExecutionContext()
                                                     .sendToMaster(

Reply via email to