This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3be0d1cc61 [Improve][Zeta] Split the classloader of task group (#7580)
3be0d1cc61 is described below

commit 3be0d1cc61724d4b9abcf369fac64b1c4fe218a3
Author: Jia Fan <[email protected]>
AuthorDate: Fri Sep 27 10:08:34 2024 +0800

    [Improve][Zeta] Split the classloader of task group (#7580)
---
 .github/workflows/backend.yml                      |   2 +-
 .../hive/source/config/HiveSourceConfig.java       |   3 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |   3 +-
 .../engine/server/TaskExecutionService.java        | 115 ++++++++++++---------
 .../operation/CheckpointEndOperation.java          |   4 +-
 .../operation/CheckpointFinishedOperation.java     |   4 +-
 .../operation/NotifyTaskRestoreOperation.java      |   3 +-
 .../server/dag/physical/PhysicalPlanGenerator.java |  53 +++++-----
 .../engine/server/dag/physical/PhysicalVertex.java |  51 ++++-----
 .../engine/server/execution/TaskGroup.java         |   2 +
 .../engine/server/execution/TaskGroupContext.java  |  13 ++-
 .../server/execution/TaskGroupDefaultImpl.java     |  13 ++-
 .../{TaskGroupContext.java => TaskGroupType.java}  |  17 +--
 .../engine/server/execution/TaskGroupUtils.java    |  45 ++++++++
 .../server/task/TaskGroupImmutableInformation.java |  33 +++++-
 .../TaskGroupWithIntermediateBlockingQueue.java    |   6 ++
 .../group/TaskGroupWithIntermediateDisruptor.java  |   6 ++
 .../operation/sink/SinkPrepareCommitOperation.java |   2 +-
 .../operation/source/AssignSplitOperation.java     |   2 +-
 .../operation/source/RequestSplitOperation.java    |  14 +--
 .../operation/source/RestoredSplitOperation.java   |   2 +-
 .../source/SourceNoMoreElementOperation.java       |   2 +-
 .../source/SourceReaderEventOperation.java         |   2 +-
 .../operation/source/SourceRegisterOperation.java  |   2 +-
 .../engine/server/TaskExecutionServiceTest.java    | 110 +++++++++++++++++---
 .../seatunnel/engine/server/dag/TaskTest.java      |  30 ++++++
 .../engine/server/execution/TestTask.java          |  23 +++--
 27 files changed, 394 insertions(+), 168 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 564b153aed..6afc981bed 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -131,7 +131,7 @@ jobs:
           echo "engine-e2e=$true_or_false" >> $GITHUB_OUTPUT
           echo "engine-e2e_files=$file_list" >> $GITHUB_OUTPUT
           
-          api_files=`python tools/update_modules_check/check_file_updates.py 
ua $workspace apache/dev origin/$current_branch "seatunnel-api/**" 
"seatunnel-common/**" "seatunnel-config/**" "seatunnel-connectors/**" 
"seatunnel-core/**" "seatunnel-e2e/seatunnel-e2e-common/**" 
"seatunnel-formats/**" "seatunnel-plugin-discovery/**" 
"seatunnel-transforms-v2/**" "seatunnel-translation/**" 
"seatunnel-e2e/seatunnel-transforms-v2-e2e/**" "seatunnel-connectors/**" 
"pom.xml" "**/workflows/**" "tools [...]
+          api_files=`python tools/update_modules_check/check_file_updates.py 
ua $workspace apache/dev origin/$current_branch "seatunnel-api/**" 
"seatunnel-common/**" "seatunnel-config/**" "seatunnel-engine/**" 
"seatunnel-core/**" "seatunnel-e2e/seatunnel-e2e-common/**" 
"seatunnel-formats/**" "seatunnel-plugin-discovery/**" 
"seatunnel-transforms-v2/**" "seatunnel-translation/**" 
"seatunnel-e2e/seatunnel-transforms-v2-e2e/**" "pom.xml" "**/workflows/**" 
"tools/**" "seatunnel-dist/**"`
           true_or_false=${api_files%%$'\n'*}
           file_list=${api_files#*$'\n'}
           if [[ $repository_owner == 'apache' ]];then
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index 203491dcf9..e98143fcf0 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -69,7 +69,6 @@ public class HiveSourceConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final Table table;
     private final CatalogTable catalogTable;
     private final FileFormat fileFormat;
     private final ReadStrategy readStrategy;
@@ -81,7 +80,7 @@ public class HiveSourceConfig implements Serializable {
         readonlyConfig
                 .getOptional(HdfsSourceConfigOptions.READ_PARTITIONS)
                 .ifPresent(this::validatePartitions);
-        this.table = HiveTableUtils.getTableInfo(readonlyConfig);
+        Table table = HiveTableUtils.getTableInfo(readonlyConfig);
         this.hadoopConf = parseHiveHadoopConfig(readonlyConfig, table);
         this.fileFormat = HiveTableUtils.parseFileFormat(table);
         this.readStrategy = parseReadStrategy(table, readonlyConfig, 
fileFormat, hadoopConf);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 73462c53c9..234ec0109f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -153,8 +153,7 @@ public class SeaTunnelServer
 
     private void startWorker() {
         taskExecutionService =
-                new TaskExecutionService(
-                        classLoaderService, nodeEngine, 
nodeEngine.getProperties(), eventService);
+                new TaskExecutionService(classLoaderService, nodeEngine, 
eventService);
         
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
         taskExecutionService.start();
         getSlotService();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index b32dd7c6a9..5afd410e1c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -42,6 +42,7 @@ import 
org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupUtils;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
 import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
@@ -65,13 +66,12 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
-import com.hazelcast.spi.properties.HazelcastProperties;
-import lombok.Getter;
 import lombok.NonNull;
 import lombok.SneakyThrows;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -149,7 +149,6 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     public TaskExecutionService(
             ClassLoaderService classLoaderService,
             NodeEngineImpl nodeEngine,
-            HazelcastProperties properties,
             EventService eventService) {
         seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -282,33 +281,50 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                         taskImmutableInfo.getExecutionId()));
         TaskGroup taskGroup = null;
         try {
-            Set<ConnectorJarIdentifier> connectorJarIdentifiers =
+            List<Set<ConnectorJarIdentifier>> connectorJarIdentifiersList =
                     taskImmutableInfo.getConnectorJarIdentifiers();
-            Set<URL> jars = new HashSet<>();
-            ClassLoader classLoader;
-            if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
-                // Prioritize obtaining the jar package file required for the 
current task execution
-                // from the local, if it does not exist locally, it will be 
downloaded from the
-                // master node.
-                jars =
-                        serverConnectorPackageClient.getConnectorJarFromLocal(
-                                connectorJarIdentifiers);
-            } else if (!CollectionUtils.isEmpty(taskImmutableInfo.getJars())) {
-                jars = taskImmutableInfo.getJars();
-            }
-            classLoader =
-                    classLoaderService.getClassLoader(
-                            taskImmutableInfo.getJobId(), 
Lists.newArrayList(jars));
-            if (jars.isEmpty()) {
-                taskGroup =
-                        
nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
-            } else {
-                taskGroup =
-                        
CustomClassLoadedObject.deserializeWithCustomClassLoader(
-                                nodeEngine.getSerializationService(),
-                                classLoader,
-                                taskImmutableInfo.getGroup());
+            List<Data> taskData = taskImmutableInfo.getTasksData();
+            ConcurrentHashMap<Long, ClassLoader> classLoaders = new 
ConcurrentHashMap<>();
+            List<Task> tasks = new ArrayList<>();
+            ConcurrentHashMap<Long, Collection<URL>> taskJars = new 
ConcurrentHashMap<>();
+            for (int i = 0; i < taskData.size(); i++) {
+                Set<URL> jars = new HashSet<>();
+                Set<ConnectorJarIdentifier> connectorJarIdentifiers =
+                        connectorJarIdentifiersList.get(i);
+                if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
+                    // Prioritize obtaining the jar package file required for 
the current task
+                    // execution
+                    // from the local, if it does not exist locally, it will 
be downloaded from the
+                    // master node.
+                    jars =
+                            
serverConnectorPackageClient.getConnectorJarFromLocal(
+                                    connectorJarIdentifiers);
+                } else if 
(!CollectionUtils.isEmpty(taskImmutableInfo.getJars().get(i))) {
+                    jars = taskImmutableInfo.getJars().get(i);
+                }
+                ClassLoader classLoader =
+                        classLoaderService.getClassLoader(
+                                taskImmutableInfo.getJobId(), 
Lists.newArrayList(jars));
+                Task task;
+                if (jars.isEmpty()) {
+                    task = 
nodeEngine.getSerializationService().toObject(taskData.get(i));
+                } else {
+                    task =
+                            
CustomClassLoadedObject.deserializeWithCustomClassLoader(
+                                    nodeEngine.getSerializationService(),
+                                    classLoader,
+                                    taskData.get(i));
+                }
+                tasks.add(task);
+                classLoaders.put(task.getTaskID(), classLoader);
+                taskJars.put(task.getTaskID(), jars);
             }
+            taskGroup =
+                    TaskGroupUtils.createTaskGroup(
+                            taskImmutableInfo.getTaskGroupType(),
+                            taskImmutableInfo.getTaskGroupLocation(),
+                            taskImmutableInfo.getTaskGroupName(),
+                            tasks);
 
             logger.info(
                     String.format(
@@ -322,7 +338,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                     "TaskGroupLocation: %s already exists",
                                     taskGroup.getTaskGroupLocation()));
                 }
-                deployLocalTask(taskGroup, classLoader, jars);
+                deployLocalTask(taskGroup, classLoaders, taskJars);
                 return TaskDeployState.success();
             }
         } catch (Throwable t) {
@@ -337,15 +353,10 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         }
     }
 
-    @Deprecated
     public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
-            @NonNull TaskGroup taskGroup) {
-        return deployLocalTask(
-                taskGroup, Thread.currentThread().getContextClassLoader(), 
emptyList());
-    }
-
-    public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
-            @NonNull TaskGroup taskGroup, @NonNull ClassLoader classLoader, 
Collection<URL> jars) {
+            @NonNull TaskGroup taskGroup,
+            @NonNull ConcurrentHashMap<Long, ClassLoader> classLoaders,
+            ConcurrentHashMap<Long, Collection<URL>> jars) {
         CompletableFuture<TaskExecutionState> resultFuture = new 
CompletableFuture<>();
         try {
             taskGroup.init();
@@ -389,7 +400,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                             }));
             executionContexts.put(
                     taskGroup.getTaskGroupLocation(),
-                    new TaskGroupContext(taskGroup, classLoader, jars));
+                    new TaskGroupContext(taskGroup, classLoaders, jars));
             cancellationFutures.put(taskGroup.getTaskGroupLocation(), 
cancellationFuture);
             submitThreadShareTask(executionTracker, byCooperation.get(true));
             submitBlockingTask(executionTracker, byCooperation.get(false));
@@ -591,7 +602,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                         }
                                     });
                 });
-        if (localMap.size() > 0) {
+        if (!localMap.isEmpty()) {
             boolean lockedIMap = false;
             try {
                 lockedIMap =
@@ -669,7 +680,8 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
             ClassLoader classLoader =
                     executionContexts
                             
.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
-                            .getClassLoader();
+                            .getClassLoaders()
+                            .get(tracker.task.getTaskID());
             ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
             Thread.currentThread().setContextClassLoader(classLoader);
             final Task t = tracker.task;
@@ -728,16 +740,16 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         public AtomicReference<TaskTracker> exclusiveTaskTracker = new 
AtomicReference<>();
         final TaskCallTimer timer;
         private Thread myThread;
-        public LinkedBlockingDeque<TaskTracker> taskqueue;
+        public LinkedBlockingDeque<TaskTracker> taskQueue;
         private Future<?> thisTaskFuture;
         private BlockingQueue<Future<?>> futureBlockingQueue;
 
         public CooperativeTaskWorker(
-                LinkedBlockingDeque<TaskTracker> taskqueue,
+                LinkedBlockingDeque<TaskTracker> taskQueue,
                 RunBusWorkSupplier runBusWorkSupplier,
                 BlockingQueue<Future<?>> futureBlockingQueue) {
             logger.info(String.format("Created new BusWork : %s", 
this.hashCode()));
-            this.taskqueue = taskqueue;
+            this.taskQueue = taskQueue;
             this.timer = new TaskCallTimer(50, keep, runBusWorkSupplier, this);
             this.futureBlockingQueue = futureBlockingQueue;
         }
@@ -752,7 +764,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 TaskTracker taskTracker =
                         null != exclusiveTaskTracker.get()
                                 ? exclusiveTaskTracker.get()
-                                : taskqueue.takeFirst();
+                                : taskQueue.takeFirst();
                 TaskGroupExecutionTracker taskGroupExecutionTracker =
                         taskTracker.taskGroupExecutionTracker;
                 if 
(taskGroupExecutionTracker.executionCompletedExceptionally()) {
@@ -777,7 +789,8 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                     myThread.setContextClassLoader(
                             executionContexts
                                     
.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
-                                    .getClassLoader());
+                                    .getClassLoaders()
+                                    .get(taskTracker.task.getTaskID()));
                     call = taskTracker.task.call();
                     synchronized (timer) {
                         timer.timerStop();
@@ -819,7 +832,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                         // Task is not completed. Put task to the end of the 
queue
                         // If the current work has an exclusive tracker, it 
will not be put back
                         if (null == exclusiveTaskTracker.get()) {
-                            taskqueue.offer(taskTracker);
+                            taskQueue.offer(taskTracker);
                         }
                     }
                 }
@@ -840,7 +853,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
         }
 
         public boolean runNewBusWork(boolean checkTaskQueue) {
-            if (!checkTaskQueue || taskQueue.size() > 0) {
+            if (!checkTaskQueue || !taskQueue.isEmpty()) {
                 BlockingQueue<Future<?>> futureBlockingQueue = new 
LinkedBlockingQueue<>();
                 CooperativeTaskWorker cooperativeTaskWorker =
                         new CooperativeTaskWorker(taskQueue, this, 
futureBlockingQueue);
@@ -867,7 +880,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
         private final AtomicBoolean isCancel = new AtomicBoolean(false);
 
-        @Getter private Map<Long, Future<?>> currRunningTaskFuture = new 
ConcurrentHashMap<>();
+        private final Map<Long, Future<?>> currRunningTaskFuture = new 
ConcurrentHashMap<>();
 
         TaskGroupExecutionTracker(
                 @NonNull CompletableFuture<Void> cancellationFuture,
@@ -972,8 +985,10 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
         private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
             TaskGroupContext context = 
executionContexts.get(taskGroupLocation);
-            executionContexts.get(taskGroupLocation).setClassLoader(null);
-            
classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(), 
context.getJars());
+            executionContexts.get(taskGroupLocation).setClassLoaders(null);
+            for (Collection<URL> jars : context.getJars().values()) {
+                
classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(), jars);
+            }
         }
 
         boolean executionCompletedExceptionally() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
index 4457b7eda2..9dadf250a0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
@@ -86,7 +86,9 @@ public class CheckpointEndOperation extends TaskOperation {
                                         
.getExecutionContext(taskLocation.getTaskGroupLocation());
                         Task task = 
groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
                         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-                        
Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
+                        Thread.currentThread()
+                                .setContextClassLoader(
+                                        
groupContext.getClassLoader(taskLocation.getTaskID()));
 
                         task.notifyCheckpointEnd(checkpointId);
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 1e97bd4b46..65dd3a7274 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -86,7 +86,9 @@ public class CheckpointFinishedOperation extends 
TaskOperation {
                                         
.getExecutionContext(taskLocation.getTaskGroupLocation());
                         Task task = 
groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
                         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-                        
Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
+                        Thread.currentThread()
+                                .setContextClassLoader(
+                                        
groupContext.getClassLoader(taskLocation.getTaskID()));
                         if (successful) {
                             task.notifyCheckpointComplete(checkpointId);
                         } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index 4cd83941c4..cd681f432e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -98,7 +98,8 @@ public class NotifyTaskRestoreOperation extends TaskOperation 
{
                                         () -> {
                                             Thread.currentThread()
                                                     .setContextClassLoader(
-                                                            
groupContext.getClassLoader());
+                                                            
groupContext.getClassLoader(
+                                                                    
task.getTaskID()));
                                             try {
                                                 log.debug(
                                                         
"NotifyTaskRestoreOperation.restoreState "
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 2a18984a95..34ac96aacb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -282,7 +282,6 @@ public class PhysicalPlanGenerator {
 
                                 return new PhysicalVertex(
                                         atomicInteger.incrementAndGet(),
-                                        executorService,
                                         collect.size(),
                                         new TaskGroupDefaultImpl(
                                                 taskGroupLocation,
@@ -291,8 +290,9 @@ public class PhysicalPlanGenerator {
                                         flakeIdGenerator,
                                         pipelineIndex,
                                         totalPipelineNum,
-                                        sinkAction.getJarUrls(),
-                                        
sinkAction.getConnectorJarIdentifiers(),
+                                        
Collections.singletonList(sinkAction.getJarUrls()),
+                                        Collections.singletonList(
+                                                
sinkAction.getConnectorJarIdentifiers()),
                                         jobImmutableInformation,
                                         initializationTimestamp,
                                         nodeEngine,
@@ -362,7 +362,7 @@ public class PhysicalPlanGenerator {
                                             new PhysicalExecutionFlow(
                                                     shuffleActionOfSinkFlow,
                                                     
Collections.singletonList(sinkFlow));
-                                    setFlowConfig(shuffleFlow, 
parallelismIndex);
+                                    setFlowConfig(shuffleFlow);
 
                                     long taskGroupID =
                                             mixIDPrefixAndIndex(
@@ -389,7 +389,6 @@ public class PhysicalPlanGenerator {
                                     physicalVertices.add(
                                             new PhysicalVertex(
                                                     parallelismIndex,
-                                                    executorService,
                                                     
shuffleFlow.getAction().getParallelism(),
                                                     new TaskGroupDefaultImpl(
                                                             taskGroupLocation,
@@ -400,8 +399,10 @@ public class PhysicalPlanGenerator {
                                                     flakeIdGenerator,
                                                     pipelineIndex,
                                                     totalPipelineNum,
-                                                    seaTunnelTask.getJarsUrl(),
-                                                    
seaTunnelTask.getConnectorPluginJars(),
+                                                    Collections.singletonList(
+                                                            
seaTunnelTask.getJarsUrl()),
+                                                    Collections.singletonList(
+                                                            
seaTunnelTask.getConnectorPluginJars()),
                                                     jobImmutableInformation,
                                                     initializationTimestamp,
                                                     nodeEngine,
@@ -420,7 +421,7 @@ public class PhysicalPlanGenerator {
                                                     taskGroupID);
                                     TaskLocation taskLocation =
                                             new 
TaskLocation(taskGroupLocation, taskIDPrefix, i);
-                                    setFlowConfig(flow, i);
+                                    setFlowConfig(flow);
                                     SeaTunnelTask seaTunnelTask =
                                             new TransformSeaTunnelTask(
                                                     
jobImmutableInformation.getJobId(),
@@ -432,7 +433,6 @@ public class PhysicalPlanGenerator {
                                     physicalVertices.add(
                                             new PhysicalVertex(
                                                     i,
-                                                    executorService,
                                                     
flow.getAction().getParallelism(),
                                                     new TaskGroupDefaultImpl(
                                                             taskGroupLocation,
@@ -442,8 +442,10 @@ public class PhysicalPlanGenerator {
                                                     flakeIdGenerator,
                                                     pipelineIndex,
                                                     totalPipelineNum,
-                                                    seaTunnelTask.getJarsUrl(),
-                                                    
seaTunnelTask.getConnectorPluginJars(),
+                                                    Collections.singletonList(
+                                                            
seaTunnelTask.getJarsUrl()),
+                                                    Collections.singletonList(
+                                                            
seaTunnelTask.getConnectorPluginJars()),
                                                     jobImmutableInformation,
                                                     initializationTimestamp,
                                                     nodeEngine,
@@ -488,7 +490,6 @@ public class PhysicalPlanGenerator {
 
                             return new PhysicalVertex(
                                     atomicInteger.incrementAndGet(),
-                                    executorService,
                                     sources.size(),
                                     new TaskGroupDefaultImpl(
                                             taskGroupLocation,
@@ -497,8 +498,8 @@ public class PhysicalPlanGenerator {
                                     flakeIdGenerator,
                                     pipelineIndex,
                                     totalPipelineNum,
-                                    t.getJarsUrl(),
-                                    t.getConnectorPluginJars(),
+                                    Collections.singletonList(t.getJarsUrl()),
+                                    
Collections.singletonList(t.getConnectorPluginJars()),
                                     jobImmutableInformation,
                                     initializationTimestamp,
                                     nodeEngine,
@@ -536,7 +537,7 @@ public class PhysicalPlanGenerator {
                                         flows.stream()
                                                 .map(
                                                         f -> {
-                                                            setFlowConfig(f, 
finalParallelismIndex);
+                                                            setFlowConfig(f);
                                                             long taskIDPrefix =
                                                                     
flowTaskIDPrefixMap
                                                                             
.computeIfAbsent(
@@ -575,18 +576,15 @@ public class PhysicalPlanGenerator {
                                                         })
                                                 .peek(this::fillCheckpointPlan)
                                                 .collect(Collectors.toList());
-                                Set<URL> jars =
+                                List<Set<URL>> jars =
                                         taskList.stream()
-                                                .flatMap(task -> 
task.getJarsUrl().stream())
-                                                .collect(Collectors.toSet());
+                                                .map(SeaTunnelTask::getJarsUrl)
+                                                .collect(Collectors.toList());
 
-                                Set<ConnectorJarIdentifier> jarIdentifiers =
+                                List<Set<ConnectorJarIdentifier>> 
jarIdentifiers =
                                         taskList.stream()
-                                                .flatMap(
-                                                        task ->
-                                                                
task.getConnectorPluginJars()
-                                                                        
.stream())
-                                                .collect(Collectors.toSet());
+                                                
.map(SeaTunnelTask::getConnectorPluginJars)
+                                                .collect(Collectors.toList());
 
                                 if (taskList.stream()
                                         
.anyMatch(TransformSeaTunnelTask.class::isInstance)) {
@@ -612,7 +610,6 @@ public class PhysicalPlanGenerator {
                                     t.add(
                                             new PhysicalVertex(
                                                     i,
-                                                    executorService,
                                                     
flow.getAction().getParallelism(),
                                                     taskGroup,
                                                     flakeIdGenerator,
@@ -629,7 +626,6 @@ public class PhysicalPlanGenerator {
                                     t.add(
                                             new PhysicalVertex(
                                                     i,
-                                                    executorService,
                                                     
flow.getAction().getParallelism(),
                                                     new TaskGroupDefaultImpl(
                                                             taskGroupLocation,
@@ -671,10 +667,9 @@ public class PhysicalPlanGenerator {
      * set config for flow, some flow should have config support for execute 
on task.
      *
      * @param f flow
-     * @param parallelismIndex the parallelism index of flow
      */
     @SuppressWarnings("unchecked")
-    private void setFlowConfig(Flow f, int parallelismIndex) {
+    private void setFlowConfig(Flow f) {
 
         if (f instanceof PhysicalExecutionFlow) {
             PhysicalExecutionFlow<?, FlowConfig> flow = 
(PhysicalExecutionFlow<?, FlowConfig>) f;
@@ -702,7 +697,7 @@ public class PhysicalPlanGenerator {
         }
 
         if (!f.getNext().isEmpty()) {
-            f.getNext().forEach(n -> setFlowConfig(n, parallelismIndex));
+            f.getNext().forEach(this::setFlowConfig);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index b6ec234bf2..7f75f489e0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -47,6 +47,7 @@ import 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Member;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -59,7 +60,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -79,13 +79,12 @@ public class PhysicalVertex {
 
     private final TaskGroupDefaultImpl taskGroup;
 
-    private final ExecutorService executorService;
-
     private final FlakeIdGenerator flakeIdGenerator;
 
-    private final Set<URL> pluginJarsUrls;
+    private final List<Set<URL>> pluginJarsUrls;
 
-    // Set<URL> pluginJarsUrls is a collection of paths stored on the engine 
for all connector Jar
+    // List<Set<URL>> pluginJarsUrls is a collection of paths stored on the 
engine for all connector
+    // Jar
     // packages and third-party Jar packages that the connector relies on.
     // All storage paths come from the unique identifier obtained after 
uploading the Jar package
     // through the client.
@@ -93,9 +92,10 @@ public class PhysicalVertex {
     // file,
     // which contains more information about the Jar package file, including 
the name of the
     // connector plugin using the current Jar, the type of the current Jar 
package, and so on.
-    // TODO: Only use Set<ConnectorJarIdentifier>to save more information 
about the Jar package,
+    // TODO: Only use List<Set<ConnectorJarIdentifier>>to save more 
information about the Jar
+    // package,
     // including the storage path of the Jar package on the server.
-    private final Set<ConnectorJarIdentifier> connectorJarIdentifiers;
+    private final List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers;
 
     private final IMap<Object, Object> runningJobStateIMap;
 
@@ -117,7 +117,7 @@ public class PhysicalVertex {
 
     private JobMaster jobMaster;
 
-    private volatile ExecutionState currExecutionState = 
ExecutionState.CREATED;
+    private volatile ExecutionState currExecutionState;
 
     public volatile boolean isRunning = false;
 
@@ -126,21 +126,19 @@ public class PhysicalVertex {
 
     public PhysicalVertex(
             int subTaskGroupIndex,
-            @NonNull ExecutorService executorService,
             int parallelism,
             @NonNull TaskGroupDefaultImpl taskGroup,
             @NonNull FlakeIdGenerator flakeIdGenerator,
             int pipelineId,
             int totalPipelineNum,
-            Set<URL> pluginJarsUrls,
-            Set<ConnectorJarIdentifier> connectorJarIdentifiers,
+            List<Set<URL>> pluginJarsUrls,
+            List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers,
             @NonNull JobImmutableInformation jobImmutableInformation,
             long initializationTimestamp,
             @NonNull NodeEngine nodeEngine,
             @NonNull IMap runningJobStateIMap,
             @NonNull IMap runningJobStateTimestampsIMap) {
         this.taskGroupLocation = taskGroup.getTaskGroupLocation();
-        this.executorService = executorService;
         this.taskGroup = taskGroup;
         this.flakeIdGenerator = flakeIdGenerator;
         this.pluginJarsUrls = pluginJarsUrls;
@@ -238,11 +236,9 @@ public class PhysicalVertex {
                             .collect(Collectors.toList());
             if (!members.contains(worker)) {
                 log.warn(
-                        "The node:"
-                                + worker.toString()
-                                + " running the taskGroup "
-                                + taskGroupLocation
-                                + " no longer exists, return false.");
+                        "The node:{} running the taskGroup {} no longer 
exists, return false.",
+                        worker.toString(),
+                        taskGroupLocation);
                 return false;
             }
             InvocationFuture<Object> invoke =
@@ -257,9 +253,8 @@ public class PhysicalVertex {
                 return (Boolean) invoke.get();
             } catch (InterruptedException | ExecutionException e) {
                 log.warn(
-                        "Execution of CheckTaskGroupIsExecutingOperation "
-                                + taskGroupLocation
-                                + " failed, checkTaskGroupIsExecuting return 
false. ",
+                        "Execution of CheckTaskGroupIsExecutingOperation {} 
failed, checkTaskGroupIsExecuting return false. ",
+                        taskGroupLocation,
                         e);
             }
         }
@@ -344,11 +339,19 @@ public class PhysicalVertex {
         return state;
     }
 
-    private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
+    @VisibleForTesting
+    public TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
+        List<Data> tasksData =
+                this.taskGroup.getTasks().stream()
+                        .map(task -> (Data) 
nodeEngine.getSerializationService().toData(task))
+                        .collect(Collectors.toList());
         return new TaskGroupImmutableInformation(
                 this.taskGroup.getTaskGroupLocation().getJobId(),
                 flakeIdGenerator.newId(),
-                nodeEngine.getSerializationService().toData(this.taskGroup),
+                this.taskGroup.getTaskGroupType(),
+                this.taskGroup.getTaskGroupLocation(),
+                this.taskGroup.getTaskGroupName(),
+                tasksData,
                 this.pluginJarsUrls,
                 this.connectorJarIdentifiers);
     }
@@ -391,7 +394,7 @@ public class PhysicalVertex {
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,
-                            exception -> 
ExceptionUtil.isOperationNeedRetryException(exception),
+                            ExceptionUtil::isOperationNeedRetryException,
                             Constant.OPERATION_RETRY_SLEEP));
             this.currExecutionState = targetState;
             log.info(
@@ -492,7 +495,7 @@ public class PhysicalVertex {
                         new RetryUtils.RetryMaterial(
                                 Constant.OPERATION_RETRY_TIME,
                                 true,
-                                exception -> 
ExceptionUtil.isOperationNeedRetryException(exception),
+                                ExceptionUtil::isOperationNeedRetryException,
                                 Constant.OPERATION_RETRY_SLEEP));
             } catch (Exception e) {
                 log.warn(ExceptionUtils.getMessage(e));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index 5e2c654eb2..7c1b19938c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -32,4 +32,6 @@ public interface TaskGroup extends Serializable {
     <T extends Task> T getTask(long taskID);
 
     void setTasksContext(Map<Long, TaskExecutionContext> 
taskExecutionContextMap);
+
+    TaskGroupType getTaskGroupType();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
index 3190edb372..e048e50486 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
@@ -22,12 +22,21 @@ import lombok.Data;
 
 import java.net.URL;
 import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Data
 @AllArgsConstructor
 public class TaskGroupContext {
     private TaskGroup taskGroup;
 
-    private ClassLoader classLoader;
-    private Collection<URL> jars;
+    private ConcurrentHashMap<Long, ClassLoader> classLoaders;
+    private ConcurrentHashMap<Long, Collection<URL>> jars;
+
+    public ClassLoader getClassLoader(long taskId) {
+        if (classLoaders != null) {
+            return classLoaders.get(taskId);
+        } else {
+            return null;
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index 67f863c46c..ce39391fba 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -18,9 +18,8 @@
 package org.apache.seatunnel.engine.server.execution;
 
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 public class TaskGroupDefaultImpl implements TaskGroup {
     private final TaskGroupLocation taskGroupLocation;
@@ -33,7 +32,10 @@ public class TaskGroupDefaultImpl implements TaskGroup {
             TaskGroupLocation taskGroupLocation, String taskGroupName, 
Collection<Task> tasks) {
         this.taskGroupLocation = taskGroupLocation;
         this.taskGroupName = taskGroupName;
-        this.tasks = tasks.stream().collect(Collectors.toMap(Task::getTaskID, 
Function.identity()));
+        // keep the order of tasks, make sure the order of tasks is the same 
as the jars order in
+        // {@link PhysicalVertex::pluginJarsUrls}
+        this.tasks = new LinkedHashMap<>();
+        tasks.forEach(t -> this.tasks.put(t.getTaskID(), t));
     }
 
     public String getTaskGroupName() {
@@ -60,4 +62,9 @@ public class TaskGroupDefaultImpl implements TaskGroup {
 
     @Override
     public void setTasksContext(Map<Long, TaskExecutionContext> 
taskExecutionContextMap) {}
+
+    @Override
+    public TaskGroupType getTaskGroupType() {
+        return TaskGroupType.DEFAULT;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupType.java
similarity index 76%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupType.java
index 3190edb372..a4925238ea 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupType.java
@@ -17,17 +17,8 @@
 
 package org.apache.seatunnel.engine.server.execution;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.net.URL;
-import java.util.Collection;
-
-@Data
-@AllArgsConstructor
-public class TaskGroupContext {
-    private TaskGroup taskGroup;
-
-    private ClassLoader classLoader;
-    private Collection<URL> jars;
+public enum TaskGroupType {
+    DEFAULT,
+    INTERMEDIATE_BLOCKING_QUEUE,
+    INTERMEDIATE_DISRUPTOR_QUEUE,
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupUtils.java
new file mode 100644
index 0000000000..16f12ece1f
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
+import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateDisruptor;
+
+import java.util.Collection;
+
+public class TaskGroupUtils {
+
+    public static TaskGroup createTaskGroup(
+            TaskGroupType type,
+            TaskGroupLocation taskGroupLocation,
+            String taskGroupName,
+            Collection<Task> tasks) {
+        switch (type) {
+            case DEFAULT:
+                return new TaskGroupDefaultImpl(taskGroupLocation, 
taskGroupName, tasks);
+            case INTERMEDIATE_BLOCKING_QUEUE:
+                return new TaskGroupWithIntermediateBlockingQueue(
+                        taskGroupLocation, taskGroupName, tasks);
+            case INTERMEDIATE_DISRUPTOR_QUEUE:
+                return new TaskGroupWithIntermediateDisruptor(
+                        taskGroupLocation, taskGroupName, tasks);
+            default:
+                throw new IllegalArgumentException("Unsupported task group 
type: " + type);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
index c90c73b55e..0743c6a3ab 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
 import com.hazelcast.internal.nio.IOUtil;
@@ -29,6 +31,8 @@ import lombok.AllArgsConstructor;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 @lombok.Data
@@ -38,9 +42,15 @@ public class TaskGroupImmutableInformation implements 
IdentifiedDataSerializable
     // Each deployment generates a new executionId
     private long executionId;
 
-    private Data group;
+    private TaskGroupType taskGroupType;
 
-    private Set<URL> jars;
+    private TaskGroupLocation taskGroupLocation;
+
+    private String taskGroupName;
+
+    private List<Data> tasksData;
+
+    private List<Set<URL>> jars;
 
     // Set<URL> pluginJarsUrls is a collection of paths stored on the engine 
for all connector Jar
     // packages and third-party Jar packages that the connector relies on.
@@ -52,7 +62,7 @@ public class TaskGroupImmutableInformation implements 
IdentifiedDataSerializable
     // connector plugin using the current Jar, the type of the current Jar 
package, and so on.
     // TODO: Only use Set<ConnectorJarIdentifier>to save more information 
about the Jar package,
     // including the storage path of the Jar package on the server.
-    private Set<ConnectorJarIdentifier> connectorJarIdentifiers;
+    private List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers;
 
     public TaskGroupImmutableInformation() {}
 
@@ -70,17 +80,30 @@ public class TaskGroupImmutableInformation implements 
IdentifiedDataSerializable
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeLong(jobId);
         out.writeLong(executionId);
+        out.writeObject(taskGroupType);
         out.writeObject(jars);
         out.writeObject(connectorJarIdentifiers);
-        IOUtil.writeData(out, group);
+        out.writeInt(tasksData.size());
+        for (Data data : tasksData) {
+            IOUtil.writeData(out, data);
+        }
+        out.writeObject(taskGroupLocation);
+        out.writeString(taskGroupName);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         jobId = in.readLong();
         executionId = in.readLong();
+        taskGroupType = in.readObject();
         jars = in.readObject();
         connectorJarIdentifiers = in.readObject();
-        group = IOUtil.readData(in);
+        int size = in.readInt();
+        tasksData = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            tasksData.add(IOUtil.readData(in));
+        }
+        taskGroupLocation = in.readObject();
+        taskGroupName = in.readString();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
index 6828b92b7e..f1153364f6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.group;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
 import 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;
@@ -55,4 +56,9 @@ public class TaskGroupWithIntermediateBlockingQueue extends 
AbstractTaskGroupWit
         blockingQueueCache.computeIfAbsent(id, i -> new 
ArrayBlockingQueue<>(QUEUE_SIZE));
         return new IntermediateBlockingQueue(blockingQueueCache.get(id));
     }
+
+    @Override
+    public TaskGroupType getTaskGroupType() {
+        return TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
index d6544c1662..aabb53ae3f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.group;
 
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
 import 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateDisruptor;
@@ -69,4 +70,9 @@ public class TaskGroupWithIntermediateDisruptor extends 
AbstractTaskGroupWithInt
         this.disruptor.putIfAbsent(id, disruptor);
         return new IntermediateDisruptor(this.disruptor.get(id));
     }
+
+    @Override
+    public TaskGroupType getTaskGroupType() {
+        return TaskGroupType.INTERMEDIATE_DISRUPTOR_QUEUE;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index db3ae487a3..dbce951249 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -77,7 +77,7 @@ public class SinkPrepareCommitOperation<CommitInfoT> extends 
BarrierFlowOperatio
         ClassLoader taskClassLoader =
                 taskExecutionService
                         
.getExecutionContext(taskLocation.getTaskGroupLocation())
-                        .getClassLoader();
+                        .getClassLoader(committerTask.getTaskID());
         ClassLoader mainClassLoader = 
Thread.currentThread().getContextClassLoader();
 
         if (commitInfos != null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index b080d2f1ba..afb70b0f93 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -58,7 +58,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit> 
extends TracingOpe
                     ClassLoader taskClassLoader =
                             server.getTaskExecutionService()
                                     
.getExecutionContext(taskID.getTaskGroupLocation())
-                                    .getClassLoader();
+                                    .getClassLoader(task.getTaskID());
                     ClassLoader mainClassLoader = 
Thread.currentThread().getContextClassLoader();
                     List<SplitT> deserializeSplits = new ArrayList<>();
                     try {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 1a4b7469db..eaf2dd2a21 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -36,13 +36,13 @@ public class RequestSplitOperation extends TracingOperation 
implements Identifie
 
     private TaskLocation enumeratorTaskID;
 
-    private TaskLocation taskID;
+    private TaskLocation taskLocation;
 
     public RequestSplitOperation() {}
 
-    public RequestSplitOperation(TaskLocation taskID, TaskLocation 
enumeratorTaskID) {
+    public RequestSplitOperation(TaskLocation taskLocation, TaskLocation 
enumeratorTaskID) {
         this.enumeratorTaskID = enumeratorTaskID;
-        this.taskID = taskID;
+        this.taskLocation = taskLocation;
     }
 
     @Override
@@ -54,12 +54,12 @@ public class RequestSplitOperation extends TracingOperation 
implements Identifie
                     ClassLoader classLoader =
                             server.getTaskExecutionService()
                                     
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
-                                    .getClassLoader();
+                                    .getClassLoader(taskLocation.getTaskID());
                     ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
                     Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
                             
server.getTaskExecutionService().getTask(enumeratorTaskID);
-                    task.requestSplit(taskID.getTaskIndex());
+                    task.requestSplit(taskLocation.getTaskIndex());
                     
Thread.currentThread().setContextClassLoader(oldClassLoader);
                     return null;
                 },
@@ -81,14 +81,14 @@ public class RequestSplitOperation extends TracingOperation 
implements Identifie
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        out.writeObject(taskID);
+        out.writeObject(taskLocation);
         out.writeObject(enumeratorTaskID);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        taskID = in.readObject();
+        taskLocation = in.readObject();
         enumeratorTaskID = in.readObject();
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index a6ee60f6f1..99189c3d4d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -91,7 +91,7 @@ public class RestoredSplitOperation extends TaskOperation {
                     ClassLoader taskClassLoader =
                             taskExecutionService
                                     
.getExecutionContext(taskLocation.getTaskGroupLocation())
-                                    .getClassLoader();
+                                    .getClassLoader(task.getTaskID());
                     ClassLoader mainClassLoader = 
Thread.currentThread().getContextClassLoader();
 
                     List<SourceSplit> deserializeSplits = new ArrayList<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 35667bf9b5..13701d41a4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -53,7 +53,7 @@ public class SourceNoMoreElementOperation extends 
TracingOperation
                     ClassLoader classLoader =
                             server.getTaskExecutionService()
                                     
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
-                                    .getClassLoader();
+                                    
.getClassLoader(enumeratorTaskID.getTaskID());
                     ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
                     Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index 56a376dc5d..3bf8a9c315 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -54,7 +54,7 @@ public class SourceReaderEventOperation extends 
SourceEventOperation {
                     ClassLoader classLoader =
                             server.getTaskExecutionService()
                                     
.getExecutionContext(taskLocation.getTaskGroupLocation())
-                                    .getClassLoader();
+                                    .getClassLoader(task.getTaskID());
                     task.handleSourceEvent(
                             currentTaskLocation.getTaskIndex(),
                             SerializationUtils.deserialize(sourceEvent, 
classLoader));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 12685ae8b4..688597c031 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -60,7 +60,7 @@ public class SourceRegisterOperation extends TracingOperation
                     ClassLoader classLoader =
                             server.getTaskExecutionService()
                                     
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
-                                    .getClassLoader();
+                                    
.getClassLoader(enumeratorTaskID.getTaskID());
                     ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
                     SourceSplitEnumeratorTask<?> task =
                             
server.getTaskExecutionService().getTask(enumeratorTaskID);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 75659668ab..6b701dd38b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -17,32 +17,47 @@
 
 package org.apache.seatunnel.engine.server;
 
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.execution.BlockTask;
 import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
 import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
 import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
 import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskDeployState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
 import org.apache.seatunnel.engine.server.execution.TestTask;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.internal.serialization.Data;
+import lombok.NonNull;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.Collections.emptySet;
 import static 
org.apache.seatunnel.engine.server.execution.ExecutionState.CANCELED;
 import static 
org.apache.seatunnel.engine.server.execution.ExecutionState.FAILED;
 import static 
org.apache.seatunnel.engine.server.execution.ExecutionState.FINISHED;
@@ -63,6 +78,15 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         FLAKE_ID_GENERATOR = instance.getFlakeIdGenerator("test");
     }
 
+    private PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
+            TaskExecutionService taskExecutionService, @NonNull TaskGroup 
taskGroup) {
+        Long taskId = taskGroup.getTasks().iterator().next().getTaskID();
+        ConcurrentHashMap<Long, ClassLoader> classLoaders = new 
ConcurrentHashMap<>();
+        classLoaders.put(taskId, 
Thread.currentThread().getContextClassLoader());
+        return taskExecutionService.deployLocalTask(
+                taskGroup, classLoaders, new ConcurrentHashMap<>());
+    }
+
     @Test
     public void testCancel() {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
@@ -70,8 +94,8 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         long sleepTime = 300;
 
         AtomicBoolean stop = new AtomicBoolean(false);
-        TestTask testTask1 = new TestTask(stop, LOGGER, sleepTime, true);
-        TestTask testTask2 = new TestTask(stop, LOGGER, sleepTime, false);
+        TestTask testTask1 = new TestTask(stop, sleepTime, true);
+        TestTask testTask2 = new TestTask(stop, sleepTime, false);
 
         TaskGroupDefaultImpl ts =
                 new TaskGroupDefaultImpl(
@@ -79,7 +103,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
                         "ts",
                         Lists.newArrayList(testTask1, testTask2));
         CompletableFuture<TaskExecutionState> completableFuture =
-                taskExecutionService.deployLocalTask(ts);
+                deployLocalTask(taskExecutionService, ts);
 
         taskExecutionService.cancelTaskGroup(ts.getTaskGroupLocation());
 
@@ -101,7 +125,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
                         "ts",
                         Lists.newArrayList(testTask1, testTask2));
         CompletableFuture<TaskExecutionState> completableFuture =
-                taskExecutionService.deployLocalTask(ts);
+                deployLocalTask(taskExecutionService, ts);
 
         Thread.sleep(5000);
 
@@ -120,11 +144,12 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
 
         AtomicBoolean stop = new AtomicBoolean(false);
         AtomicBoolean futureMark = new AtomicBoolean(false);
-        TestTask testTask1 = new TestTask(stop, LOGGER, sleepTime, true);
-        TestTask testTask2 = new TestTask(stop, LOGGER, sleepTime, false);
+        TestTask testTask1 = new TestTask(stop, sleepTime, true);
+        TestTask testTask2 = new TestTask(stop, sleepTime, false);
 
         final CompletableFuture<TaskExecutionState> completableFuture =
-                taskExecutionService.deployLocalTask(
+                deployLocalTask(
+                        taskExecutionService,
                         new TaskGroupDefaultImpl(
                                 new TaskGroupLocation(
                                         jobId, pipeLineId, 
FLAKE_ID_GENERATOR.newId()),
@@ -141,6 +166,63 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         assertTrue(futureMark.get());
     }
 
+    @Test
+    public void testClassloaderSplit() throws MalformedURLException {
+        TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
+
+        long sleepTime = 300;
+
+        AtomicBoolean stop = new AtomicBoolean(false);
+        TestTask testTask1 = new TestTask(stop, sleepTime, true);
+        TestTask testTask2 = new TestTask(stop, sleepTime, false);
+
+        long jobId = System.currentTimeMillis();
+
+        TaskGroupLocation location = new TaskGroupLocation(jobId, 1, 1);
+        TaskGroupImmutableInformation taskGroupImmutableInformation =
+                new TaskGroupImmutableInformation(
+                        jobId,
+                        1,
+                        TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE,
+                        location,
+                        "testClassloaderSplit",
+                        Arrays.asList(
+                                
nodeEngine.getSerializationService().toData(testTask1),
+                                
nodeEngine.getSerializationService().toData(testTask2)),
+                        Arrays.asList(
+                                Collections.singleton(new 
URL("file://fake.jar")),
+                                Collections.singleton(new 
URL("file://console.jar"))),
+                        Arrays.asList(emptySet(), emptySet()));
+
+        Data data = 
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation);
+
+        final TaskDeployState taskDeployState = 
taskExecutionService.deployTask(data);
+
+        Assertions.assertEquals(TaskDeployState.success(), taskDeployState);
+
+        TaskGroupContext taskGroupContext =
+                taskExecutionService.getActiveExecutionContext(location);
+        Assertions.assertIterableEquals(
+                Collections.singleton(new URL("file://fake.jar")),
+                taskGroupContext.getJars().get(testTask1.getTaskID()));
+        Assertions.assertIterableEquals(
+                Collections.singleton(new URL("file://console.jar")),
+                taskGroupContext.getJars().get(testTask2.getTaskID()));
+
+        Assertions.assertIterableEquals(
+                Collections.singletonList(new URL("file://fake.jar")),
+                Arrays.asList(
+                        ((URLClassLoader) 
taskGroupContext.getClassLoader(testTask1.getTaskID()))
+                                .getURLs()));
+        Assertions.assertIterableEquals(
+                Collections.singletonList(new URL("file://console.jar")),
+                Arrays.asList(
+                        ((URLClassLoader) 
taskGroupContext.getClassLoader(testTask2.getTaskID()))
+                                .getURLs()));
+
+        taskExecutionService.cancelTaskGroup(location);
+    }
+
     /** Test task execution time is the same as the timer timeout */
     @Test
     public void testCriticalCallTime() throws InterruptedException {
@@ -158,7 +240,8 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
         CompletableFuture<TaskExecutionState> taskCts =
-                taskExecutionService.deployLocalTask(
+                deployLocalTask(
+                        taskExecutionService,
                         new TaskGroupDefaultImpl(
                                 new TaskGroupLocation(
                                         jobId, pipeLineId, 
FLAKE_ID_GENERATOR.newId()),
@@ -211,7 +294,8 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         Collections.shuffle(tasks);
 
         CompletableFuture<TaskExecutionState> taskCts =
-                taskExecutionService.deployLocalTask(
+                deployLocalTask(
+                        taskExecutionService,
                         new TaskGroupDefaultImpl(
                                 new TaskGroupLocation(
                                         jobId, pipeLineId, 
FLAKE_ID_GENERATOR.newId()),
@@ -219,7 +303,8 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
                                 Lists.newArrayList(tasks)));
 
         CompletableFuture<TaskExecutionState> t1c =
-                taskExecutionService.deployLocalTask(
+                deployLocalTask(
+                        taskExecutionService,
                         new TaskGroupDefaultImpl(
                                 new TaskGroupLocation(
                                         jobId, pipeLineId, 
FLAKE_ID_GENERATOR.newId()),
@@ -227,7 +312,8 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
                                 Lists.newArrayList(t1)));
 
         CompletableFuture<TaskExecutionState> t2c =
-                taskExecutionService.deployLocalTask(
+                deployLocalTask(
+                        taskExecutionService,
                         new TaskGroupDefaultImpl(
                                 new TaskGroupLocation(
                                         jobId, pipeLineId, 
FLAKE_ID_GENERATOR.newId()),
@@ -285,7 +371,7 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
         CompletableFuture<TaskExecutionState> completableFuture =
-                taskExecutionService.deployLocalTask(taskGroup);
+                deployLocalTask(taskExecutionService, taskGroup);
 
         // stop tasks
         Thread.sleep(taskRunTime);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 949903ed55..10d426f589 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -170,6 +170,36 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
                 
physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
         Assertions.assertEquals(
                 
physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 2);
+        Assertions.assertEquals(
+                physicalPlan
+                        .getPipelineList()
+                        .get(0)
+                        .getPhysicalVertexList()
+                        .get(0)
+                        .getTaskGroupImmutableInformation()
+                        .getTasksData()
+                        .size(),
+                2);
+        Assertions.assertEquals(
+                physicalPlan
+                        .getPipelineList()
+                        .get(0)
+                        .getPhysicalVertexList()
+                        .get(0)
+                        .getTaskGroupImmutableInformation()
+                        .getJars()
+                        .get(0),
+                Sets.newHashSet(new URL("file:///fake.jar")));
+        Assertions.assertEquals(
+                physicalPlan
+                        .getPipelineList()
+                        .get(0)
+                        .getPhysicalVertexList()
+                        .get(0)
+                        .getTaskGroupImmutableInformation()
+                        .getJars()
+                        .get(1),
+                Sets.newHashSet(new URL("file:///console.jar")));
     }
 
     private static FakeSource createFakeSource() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
index f27330352e..5c174a9293 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
@@ -19,24 +19,29 @@ package org.apache.seatunnel.engine.server.execution;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 
-import com.hazelcast.logging.ILogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import lombok.NonNull;
 
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /** For test use, only print logs */
 public class TestTask implements Task {
 
-    AtomicBoolean stop;
-    long sleep;
-    private final ILogger logger;
-    boolean isThreadsShare;
+    private static final Logger logger = 
LoggerFactory.getLogger(TestTask.class);
+
+    private final AtomicBoolean stop;
+    private final long sleep;
+    private final boolean isThreadsShare;
+    private final long taskId;
 
-    public TestTask(AtomicBoolean stop, ILogger logger, long sleep, boolean 
isThreadsShare) {
+    public TestTask(AtomicBoolean stop, long sleep, boolean isThreadsShare) {
         this.stop = stop;
-        this.logger = logger;
         this.sleep = sleep;
         this.isThreadsShare = isThreadsShare;
+        this.taskId = new Random().nextInt();
     }
 
     @NonNull @Override
@@ -47,7 +52,7 @@ public class TestTask implements Task {
             try {
                 Thread.sleep(sleep);
             } catch (InterruptedException e) {
-                logger.severe(ExceptionUtils.getMessage(e));
+                logger.error(ExceptionUtils.getMessage(e));
             }
             progressState = ProgressState.MADE_PROGRESS;
         } else {
@@ -58,7 +63,7 @@ public class TestTask implements Task {
 
     @NonNull @Override
     public Long getTaskID() {
-        return (long) this.hashCode();
+        return taskId;
     }
 
     @Override

Reply via email to