This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3be0d1cc61 [Improve][Zeta] Split the classloader of task group (#7580)
3be0d1cc61 is described below
commit 3be0d1cc61724d4b9abcf369fac64b1c4fe218a3
Author: Jia Fan <[email protected]>
AuthorDate: Fri Sep 27 10:08:34 2024 +0800
[Improve][Zeta] Split the classloader of task group (#7580)
---
.github/workflows/backend.yml | 2 +-
.../hive/source/config/HiveSourceConfig.java | 3 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 3 +-
.../engine/server/TaskExecutionService.java | 115 ++++++++++++---------
.../operation/CheckpointEndOperation.java | 4 +-
.../operation/CheckpointFinishedOperation.java | 4 +-
.../operation/NotifyTaskRestoreOperation.java | 3 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 53 +++++-----
.../engine/server/dag/physical/PhysicalVertex.java | 51 ++++-----
.../engine/server/execution/TaskGroup.java | 2 +
.../engine/server/execution/TaskGroupContext.java | 13 ++-
.../server/execution/TaskGroupDefaultImpl.java | 13 ++-
.../{TaskGroupContext.java => TaskGroupType.java} | 17 +--
.../engine/server/execution/TaskGroupUtils.java | 45 ++++++++
.../server/task/TaskGroupImmutableInformation.java | 33 +++++-
.../TaskGroupWithIntermediateBlockingQueue.java | 6 ++
.../group/TaskGroupWithIntermediateDisruptor.java | 6 ++
.../operation/sink/SinkPrepareCommitOperation.java | 2 +-
.../operation/source/AssignSplitOperation.java | 2 +-
.../operation/source/RequestSplitOperation.java | 14 +--
.../operation/source/RestoredSplitOperation.java | 2 +-
.../source/SourceNoMoreElementOperation.java | 2 +-
.../source/SourceReaderEventOperation.java | 2 +-
.../operation/source/SourceRegisterOperation.java | 2 +-
.../engine/server/TaskExecutionServiceTest.java | 110 +++++++++++++++++---
.../seatunnel/engine/server/dag/TaskTest.java | 30 ++++++
.../engine/server/execution/TestTask.java | 23 +++--
27 files changed, 394 insertions(+), 168 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 564b153aed..6afc981bed 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -131,7 +131,7 @@ jobs:
echo "engine-e2e=$true_or_false" >> $GITHUB_OUTPUT
echo "engine-e2e_files=$file_list" >> $GITHUB_OUTPUT
- api_files=`python tools/update_modules_check/check_file_updates.py
ua $workspace apache/dev origin/$current_branch "seatunnel-api/**"
"seatunnel-common/**" "seatunnel-config/**" "seatunnel-connectors/**"
"seatunnel-core/**" "seatunnel-e2e/seatunnel-e2e-common/**"
"seatunnel-formats/**" "seatunnel-plugin-discovery/**"
"seatunnel-transforms-v2/**" "seatunnel-translation/**"
"seatunnel-e2e/seatunnel-transforms-v2-e2e/**" "seatunnel-connectors/**"
"pom.xml" "**/workflows/**" "tools [...]
+ api_files=`python tools/update_modules_check/check_file_updates.py
ua $workspace apache/dev origin/$current_branch "seatunnel-api/**"
"seatunnel-common/**" "seatunnel-config/**" "seatunnel-engine/**"
"seatunnel-core/**" "seatunnel-e2e/seatunnel-e2e-common/**"
"seatunnel-formats/**" "seatunnel-plugin-discovery/**"
"seatunnel-transforms-v2/**" "seatunnel-translation/**"
"seatunnel-e2e/seatunnel-transforms-v2-e2e/**" "pom.xml" "**/workflows/**"
"tools/**" "seatunnel-dist/**"`
true_or_false=${api_files%%$'\n'*}
file_list=${api_files#*$'\n'}
if [[ $repository_owner == 'apache' ]];then
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index 203491dcf9..e98143fcf0 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -69,7 +69,6 @@ public class HiveSourceConfig implements Serializable {
private static final long serialVersionUID = 1L;
- private final Table table;
private final CatalogTable catalogTable;
private final FileFormat fileFormat;
private final ReadStrategy readStrategy;
@@ -81,7 +80,7 @@ public class HiveSourceConfig implements Serializable {
readonlyConfig
.getOptional(HdfsSourceConfigOptions.READ_PARTITIONS)
.ifPresent(this::validatePartitions);
- this.table = HiveTableUtils.getTableInfo(readonlyConfig);
+ Table table = HiveTableUtils.getTableInfo(readonlyConfig);
this.hadoopConf = parseHiveHadoopConfig(readonlyConfig, table);
this.fileFormat = HiveTableUtils.parseFileFormat(table);
this.readStrategy = parseReadStrategy(table, readonlyConfig,
fileFormat, hadoopConf);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 73462c53c9..234ec0109f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -153,8 +153,7 @@ public class SeaTunnelServer
private void startWorker() {
taskExecutionService =
- new TaskExecutionService(
- classLoaderService, nodeEngine,
nodeEngine.getProperties(), eventService);
+ new TaskExecutionService(classLoaderService, nodeEngine,
eventService);
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index b32dd7c6a9..5afd410e1c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -42,6 +42,7 @@ import
org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupUtils;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
@@ -65,13 +66,12 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
-import com.hazelcast.spi.properties.HazelcastProperties;
-import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -149,7 +149,6 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
public TaskExecutionService(
ClassLoaderService classLoaderService,
NodeEngineImpl nodeEngine,
- HazelcastProperties properties,
EventService eventService) {
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
@@ -282,33 +281,50 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
taskImmutableInfo.getExecutionId()));
TaskGroup taskGroup = null;
try {
- Set<ConnectorJarIdentifier> connectorJarIdentifiers =
+ List<Set<ConnectorJarIdentifier>> connectorJarIdentifiersList =
taskImmutableInfo.getConnectorJarIdentifiers();
- Set<URL> jars = new HashSet<>();
- ClassLoader classLoader;
- if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
- // Prioritize obtaining the jar package file required for the
current task execution
- // from the local, if it does not exist locally, it will be
downloaded from the
- // master node.
- jars =
- serverConnectorPackageClient.getConnectorJarFromLocal(
- connectorJarIdentifiers);
- } else if (!CollectionUtils.isEmpty(taskImmutableInfo.getJars())) {
- jars = taskImmutableInfo.getJars();
- }
- classLoader =
- classLoaderService.getClassLoader(
- taskImmutableInfo.getJobId(),
Lists.newArrayList(jars));
- if (jars.isEmpty()) {
- taskGroup =
-
nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
- } else {
- taskGroup =
-
CustomClassLoadedObject.deserializeWithCustomClassLoader(
- nodeEngine.getSerializationService(),
- classLoader,
- taskImmutableInfo.getGroup());
+ List<Data> taskData = taskImmutableInfo.getTasksData();
+ ConcurrentHashMap<Long, ClassLoader> classLoaders = new
ConcurrentHashMap<>();
+ List<Task> tasks = new ArrayList<>();
+ ConcurrentHashMap<Long, Collection<URL>> taskJars = new
ConcurrentHashMap<>();
+ for (int i = 0; i < taskData.size(); i++) {
+ Set<URL> jars = new HashSet<>();
+ Set<ConnectorJarIdentifier> connectorJarIdentifiers =
+ connectorJarIdentifiersList.get(i);
+ if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
+ // Prioritize obtaining the jar package file required for
the current task
+ // execution
+ // from the local, if it does not exist locally, it will
be downloaded from the
+ // master node.
+ jars =
+
serverConnectorPackageClient.getConnectorJarFromLocal(
+ connectorJarIdentifiers);
+ } else if
(!CollectionUtils.isEmpty(taskImmutableInfo.getJars().get(i))) {
+ jars = taskImmutableInfo.getJars().get(i);
+ }
+ ClassLoader classLoader =
+ classLoaderService.getClassLoader(
+ taskImmutableInfo.getJobId(),
Lists.newArrayList(jars));
+ Task task;
+ if (jars.isEmpty()) {
+ task =
nodeEngine.getSerializationService().toObject(taskData.get(i));
+ } else {
+ task =
+
CustomClassLoadedObject.deserializeWithCustomClassLoader(
+ nodeEngine.getSerializationService(),
+ classLoader,
+ taskData.get(i));
+ }
+ tasks.add(task);
+ classLoaders.put(task.getTaskID(), classLoader);
+ taskJars.put(task.getTaskID(), jars);
}
+ taskGroup =
+ TaskGroupUtils.createTaskGroup(
+ taskImmutableInfo.getTaskGroupType(),
+ taskImmutableInfo.getTaskGroupLocation(),
+ taskImmutableInfo.getTaskGroupName(),
+ tasks);
logger.info(
String.format(
@@ -322,7 +338,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
"TaskGroupLocation: %s already exists",
taskGroup.getTaskGroupLocation()));
}
- deployLocalTask(taskGroup, classLoader, jars);
+ deployLocalTask(taskGroup, classLoaders, taskJars);
return TaskDeployState.success();
}
} catch (Throwable t) {
@@ -337,15 +353,10 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
}
- @Deprecated
public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
- @NonNull TaskGroup taskGroup) {
- return deployLocalTask(
- taskGroup, Thread.currentThread().getContextClassLoader(),
emptyList());
- }
-
- public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
- @NonNull TaskGroup taskGroup, @NonNull ClassLoader classLoader,
Collection<URL> jars) {
+ @NonNull TaskGroup taskGroup,
+ @NonNull ConcurrentHashMap<Long, ClassLoader> classLoaders,
+ ConcurrentHashMap<Long, Collection<URL>> jars) {
CompletableFuture<TaskExecutionState> resultFuture = new
CompletableFuture<>();
try {
taskGroup.init();
@@ -389,7 +400,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}));
executionContexts.put(
taskGroup.getTaskGroupLocation(),
- new TaskGroupContext(taskGroup, classLoader, jars));
+ new TaskGroupContext(taskGroup, classLoaders, jars));
cancellationFutures.put(taskGroup.getTaskGroupLocation(),
cancellationFuture);
submitThreadShareTask(executionTracker, byCooperation.get(true));
submitBlockingTask(executionTracker, byCooperation.get(false));
@@ -591,7 +602,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
});
});
- if (localMap.size() > 0) {
+ if (!localMap.isEmpty()) {
boolean lockedIMap = false;
try {
lockedIMap =
@@ -669,7 +680,8 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
ClassLoader classLoader =
executionContexts
.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
- .getClassLoader();
+ .getClassLoaders()
+ .get(tracker.task.getTaskID());
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
final Task t = tracker.task;
@@ -728,16 +740,16 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
public AtomicReference<TaskTracker> exclusiveTaskTracker = new
AtomicReference<>();
final TaskCallTimer timer;
private Thread myThread;
- public LinkedBlockingDeque<TaskTracker> taskqueue;
+ public LinkedBlockingDeque<TaskTracker> taskQueue;
private Future<?> thisTaskFuture;
private BlockingQueue<Future<?>> futureBlockingQueue;
public CooperativeTaskWorker(
- LinkedBlockingDeque<TaskTracker> taskqueue,
+ LinkedBlockingDeque<TaskTracker> taskQueue,
RunBusWorkSupplier runBusWorkSupplier,
BlockingQueue<Future<?>> futureBlockingQueue) {
logger.info(String.format("Created new BusWork : %s",
this.hashCode()));
- this.taskqueue = taskqueue;
+ this.taskQueue = taskQueue;
this.timer = new TaskCallTimer(50, keep, runBusWorkSupplier, this);
this.futureBlockingQueue = futureBlockingQueue;
}
@@ -752,7 +764,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
TaskTracker taskTracker =
null != exclusiveTaskTracker.get()
? exclusiveTaskTracker.get()
- : taskqueue.takeFirst();
+ : taskQueue.takeFirst();
TaskGroupExecutionTracker taskGroupExecutionTracker =
taskTracker.taskGroupExecutionTracker;
if
(taskGroupExecutionTracker.executionCompletedExceptionally()) {
@@ -777,7 +789,8 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
myThread.setContextClassLoader(
executionContexts
.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
- .getClassLoader());
+ .getClassLoaders()
+ .get(taskTracker.task.getTaskID()));
call = taskTracker.task.call();
synchronized (timer) {
timer.timerStop();
@@ -819,7 +832,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
// Task is not completed. Put task to the end of the
queue
// If the current work has an exclusive tracker, it
will not be put back
if (null == exclusiveTaskTracker.get()) {
- taskqueue.offer(taskTracker);
+ taskQueue.offer(taskTracker);
}
}
}
@@ -840,7 +853,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
public boolean runNewBusWork(boolean checkTaskQueue) {
- if (!checkTaskQueue || taskQueue.size() > 0) {
+ if (!checkTaskQueue || !taskQueue.isEmpty()) {
BlockingQueue<Future<?>> futureBlockingQueue = new
LinkedBlockingQueue<>();
CooperativeTaskWorker cooperativeTaskWorker =
new CooperativeTaskWorker(taskQueue, this,
futureBlockingQueue);
@@ -867,7 +880,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
private final AtomicBoolean isCancel = new AtomicBoolean(false);
- @Getter private Map<Long, Future<?>> currRunningTaskFuture = new
ConcurrentHashMap<>();
+ private final Map<Long, Future<?>> currRunningTaskFuture = new
ConcurrentHashMap<>();
TaskGroupExecutionTracker(
@NonNull CompletableFuture<Void> cancellationFuture,
@@ -972,8 +985,10 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
TaskGroupContext context =
executionContexts.get(taskGroupLocation);
- executionContexts.get(taskGroupLocation).setClassLoader(null);
-
classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(),
context.getJars());
+ executionContexts.get(taskGroupLocation).setClassLoaders(null);
+ for (Collection<URL> jars : context.getJars().values()) {
+
classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(), jars);
+ }
}
boolean executionCompletedExceptionally() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
index 4457b7eda2..9dadf250a0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java
@@ -86,7 +86,9 @@ public class CheckpointEndOperation extends TaskOperation {
.getExecutionContext(taskLocation.getTaskGroupLocation());
Task task =
groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
-
Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
+ Thread.currentThread()
+ .setContextClassLoader(
+
groupContext.getClassLoader(taskLocation.getTaskID()));
task.notifyCheckpointEnd(checkpointId);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 1e97bd4b46..65dd3a7274 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -86,7 +86,9 @@ public class CheckpointFinishedOperation extends
TaskOperation {
.getExecutionContext(taskLocation.getTaskGroupLocation());
Task task =
groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
-
Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
+ Thread.currentThread()
+ .setContextClassLoader(
+
groupContext.getClassLoader(taskLocation.getTaskID()));
if (successful) {
task.notifyCheckpointComplete(checkpointId);
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index 4cd83941c4..cd681f432e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -98,7 +98,8 @@ public class NotifyTaskRestoreOperation extends TaskOperation
{
() -> {
Thread.currentThread()
.setContextClassLoader(
-
groupContext.getClassLoader());
+
groupContext.getClassLoader(
+
task.getTaskID()));
try {
log.debug(
"NotifyTaskRestoreOperation.restoreState "
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 2a18984a95..34ac96aacb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -282,7 +282,6 @@ public class PhysicalPlanGenerator {
return new PhysicalVertex(
atomicInteger.incrementAndGet(),
- executorService,
collect.size(),
new TaskGroupDefaultImpl(
taskGroupLocation,
@@ -291,8 +290,9 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- sinkAction.getJarUrls(),
-
sinkAction.getConnectorJarIdentifiers(),
+
Collections.singletonList(sinkAction.getJarUrls()),
+ Collections.singletonList(
+
sinkAction.getConnectorJarIdentifiers()),
jobImmutableInformation,
initializationTimestamp,
nodeEngine,
@@ -362,7 +362,7 @@ public class PhysicalPlanGenerator {
new PhysicalExecutionFlow(
shuffleActionOfSinkFlow,
Collections.singletonList(sinkFlow));
- setFlowConfig(shuffleFlow,
parallelismIndex);
+ setFlowConfig(shuffleFlow);
long taskGroupID =
mixIDPrefixAndIndex(
@@ -389,7 +389,6 @@ public class PhysicalPlanGenerator {
physicalVertices.add(
new PhysicalVertex(
parallelismIndex,
- executorService,
shuffleFlow.getAction().getParallelism(),
new TaskGroupDefaultImpl(
taskGroupLocation,
@@ -400,8 +399,10 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- seaTunnelTask.getJarsUrl(),
-
seaTunnelTask.getConnectorPluginJars(),
+ Collections.singletonList(
+
seaTunnelTask.getJarsUrl()),
+ Collections.singletonList(
+
seaTunnelTask.getConnectorPluginJars()),
jobImmutableInformation,
initializationTimestamp,
nodeEngine,
@@ -420,7 +421,7 @@ public class PhysicalPlanGenerator {
taskGroupID);
TaskLocation taskLocation =
new
TaskLocation(taskGroupLocation, taskIDPrefix, i);
- setFlowConfig(flow, i);
+ setFlowConfig(flow);
SeaTunnelTask seaTunnelTask =
new TransformSeaTunnelTask(
jobImmutableInformation.getJobId(),
@@ -432,7 +433,6 @@ public class PhysicalPlanGenerator {
physicalVertices.add(
new PhysicalVertex(
i,
- executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl(
taskGroupLocation,
@@ -442,8 +442,10 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- seaTunnelTask.getJarsUrl(),
-
seaTunnelTask.getConnectorPluginJars(),
+ Collections.singletonList(
+
seaTunnelTask.getJarsUrl()),
+ Collections.singletonList(
+
seaTunnelTask.getConnectorPluginJars()),
jobImmutableInformation,
initializationTimestamp,
nodeEngine,
@@ -488,7 +490,6 @@ public class PhysicalPlanGenerator {
return new PhysicalVertex(
atomicInteger.incrementAndGet(),
- executorService,
sources.size(),
new TaskGroupDefaultImpl(
taskGroupLocation,
@@ -497,8 +498,8 @@ public class PhysicalPlanGenerator {
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- t.getJarsUrl(),
- t.getConnectorPluginJars(),
+ Collections.singletonList(t.getJarsUrl()),
+
Collections.singletonList(t.getConnectorPluginJars()),
jobImmutableInformation,
initializationTimestamp,
nodeEngine,
@@ -536,7 +537,7 @@ public class PhysicalPlanGenerator {
flows.stream()
.map(
f -> {
- setFlowConfig(f,
finalParallelismIndex);
+ setFlowConfig(f);
long taskIDPrefix =
flowTaskIDPrefixMap
.computeIfAbsent(
@@ -575,18 +576,15 @@ public class PhysicalPlanGenerator {
})
.peek(this::fillCheckpointPlan)
.collect(Collectors.toList());
- Set<URL> jars =
+ List<Set<URL>> jars =
taskList.stream()
- .flatMap(task ->
task.getJarsUrl().stream())
- .collect(Collectors.toSet());
+ .map(SeaTunnelTask::getJarsUrl)
+ .collect(Collectors.toList());
- Set<ConnectorJarIdentifier> jarIdentifiers =
+ List<Set<ConnectorJarIdentifier>>
jarIdentifiers =
taskList.stream()
- .flatMap(
- task ->
-
task.getConnectorPluginJars()
-
.stream())
- .collect(Collectors.toSet());
+
.map(SeaTunnelTask::getConnectorPluginJars)
+ .collect(Collectors.toList());
if (taskList.stream()
.anyMatch(TransformSeaTunnelTask.class::isInstance)) {
@@ -612,7 +610,6 @@ public class PhysicalPlanGenerator {
t.add(
new PhysicalVertex(
i,
- executorService,
flow.getAction().getParallelism(),
taskGroup,
flakeIdGenerator,
@@ -629,7 +626,6 @@ public class PhysicalPlanGenerator {
t.add(
new PhysicalVertex(
i,
- executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl(
taskGroupLocation,
@@ -671,10 +667,9 @@ public class PhysicalPlanGenerator {
* set config for flow, some flow should have config support for execute
on task.
*
* @param f flow
- * @param parallelismIndex the parallelism index of flow
*/
@SuppressWarnings("unchecked")
- private void setFlowConfig(Flow f, int parallelismIndex) {
+ private void setFlowConfig(Flow f) {
if (f instanceof PhysicalExecutionFlow) {
PhysicalExecutionFlow<?, FlowConfig> flow =
(PhysicalExecutionFlow<?, FlowConfig>) f;
@@ -702,7 +697,7 @@ public class PhysicalPlanGenerator {
}
if (!f.getNext().isEmpty()) {
- f.getNext().forEach(n -> setFlowConfig(n, parallelismIndex));
+ f.getNext().forEach(this::setFlowConfig);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index b6ec234bf2..7f75f489e0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -47,6 +47,7 @@ import
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -59,7 +60,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -79,13 +79,12 @@ public class PhysicalVertex {
private final TaskGroupDefaultImpl taskGroup;
- private final ExecutorService executorService;
-
private final FlakeIdGenerator flakeIdGenerator;
- private final Set<URL> pluginJarsUrls;
+ private final List<Set<URL>> pluginJarsUrls;
- // Set<URL> pluginJarsUrls is a collection of paths stored on the engine
for all connector Jar
+ // List<Set<URL>> pluginJarsUrls is a collection of paths stored on the
engine for all connector
+ // Jar
// packages and third-party Jar packages that the connector relies on.
// All storage paths come from the unique identifier obtained after
uploading the Jar package
// through the client.
@@ -93,9 +92,10 @@ public class PhysicalVertex {
// file,
// which contains more information about the Jar package file, including
the name of the
// connector plugin using the current Jar, the type of the current Jar
package, and so on.
- // TODO: Only use Set<ConnectorJarIdentifier>to save more information
about the Jar package,
+ // TODO: Only use List<Set<ConnectorJarIdentifier>>to save more
information about the Jar
+ // package,
// including the storage path of the Jar package on the server.
- private final Set<ConnectorJarIdentifier> connectorJarIdentifiers;
+ private final List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers;
private final IMap<Object, Object> runningJobStateIMap;
@@ -117,7 +117,7 @@ public class PhysicalVertex {
private JobMaster jobMaster;
- private volatile ExecutionState currExecutionState =
ExecutionState.CREATED;
+ private volatile ExecutionState currExecutionState;
public volatile boolean isRunning = false;
@@ -126,21 +126,19 @@ public class PhysicalVertex {
public PhysicalVertex(
int subTaskGroupIndex,
- @NonNull ExecutorService executorService,
int parallelism,
@NonNull TaskGroupDefaultImpl taskGroup,
@NonNull FlakeIdGenerator flakeIdGenerator,
int pipelineId,
int totalPipelineNum,
- Set<URL> pluginJarsUrls,
- Set<ConnectorJarIdentifier> connectorJarIdentifiers,
+ List<Set<URL>> pluginJarsUrls,
+ List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers,
@NonNull JobImmutableInformation jobImmutableInformation,
long initializationTimestamp,
@NonNull NodeEngine nodeEngine,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap) {
this.taskGroupLocation = taskGroup.getTaskGroupLocation();
- this.executorService = executorService;
this.taskGroup = taskGroup;
this.flakeIdGenerator = flakeIdGenerator;
this.pluginJarsUrls = pluginJarsUrls;
@@ -238,11 +236,9 @@ public class PhysicalVertex {
.collect(Collectors.toList());
if (!members.contains(worker)) {
log.warn(
- "The node:"
- + worker.toString()
- + " running the taskGroup "
- + taskGroupLocation
- + " no longer exists, return false.");
+ "The node:{} running the taskGroup {} no longer
exists, return false.",
+ worker.toString(),
+ taskGroupLocation);
return false;
}
InvocationFuture<Object> invoke =
@@ -257,9 +253,8 @@ public class PhysicalVertex {
return (Boolean) invoke.get();
} catch (InterruptedException | ExecutionException e) {
log.warn(
- "Execution of CheckTaskGroupIsExecutingOperation "
- + taskGroupLocation
- + " failed, checkTaskGroupIsExecuting return
false. ",
+ "Execution of CheckTaskGroupIsExecutingOperation {}
failed, checkTaskGroupIsExecuting return false. ",
+ taskGroupLocation,
e);
}
}
@@ -344,11 +339,19 @@ public class PhysicalVertex {
return state;
}
- private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
+ @VisibleForTesting
+ public TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
+ List<Data> tasksData =
+ this.taskGroup.getTasks().stream()
+ .map(task -> (Data)
nodeEngine.getSerializationService().toData(task))
+ .collect(Collectors.toList());
return new TaskGroupImmutableInformation(
this.taskGroup.getTaskGroupLocation().getJobId(),
flakeIdGenerator.newId(),
- nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.taskGroup.getTaskGroupType(),
+ this.taskGroup.getTaskGroupLocation(),
+ this.taskGroup.getTaskGroupName(),
+ tasksData,
this.pluginJarsUrls,
this.connectorJarIdentifiers);
}
@@ -391,7 +394,7 @@ public class PhysicalVertex {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
ExceptionUtil.isOperationNeedRetryException(exception),
+ ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
this.currExecutionState = targetState;
log.info(
@@ -492,7 +495,7 @@ public class PhysicalVertex {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
ExceptionUtil.isOperationNeedRetryException(exception),
+ ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
log.warn(ExceptionUtils.getMessage(e));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
index 5e2c654eb2..7c1b19938c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroup.java
@@ -32,4 +32,6 @@ public interface TaskGroup extends Serializable {
<T extends Task> T getTask(long taskID);
void setTasksContext(Map<Long, TaskExecutionContext>
taskExecutionContextMap);
+
+ TaskGroupType getTaskGroupType();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
index 3190edb372..e048e50486 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
@@ -22,12 +22,21 @@ import lombok.Data;
import java.net.URL;
import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
@Data
@AllArgsConstructor
public class TaskGroupContext {
private TaskGroup taskGroup;
- private ClassLoader classLoader;
- private Collection<URL> jars;
+ private ConcurrentHashMap<Long, ClassLoader> classLoaders;
+ private ConcurrentHashMap<Long, Collection<URL>> jars;
+
+ public ClassLoader getClassLoader(long taskId) {
+ if (classLoaders != null) {
+ return classLoaders.get(taskId);
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
index 67f863c46c..ce39391fba 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupDefaultImpl.java
@@ -18,9 +18,8 @@
package org.apache.seatunnel.engine.server.execution;
import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
public class TaskGroupDefaultImpl implements TaskGroup {
private final TaskGroupLocation taskGroupLocation;
@@ -33,7 +32,10 @@ public class TaskGroupDefaultImpl implements TaskGroup {
TaskGroupLocation taskGroupLocation, String taskGroupName,
Collection<Task> tasks) {
this.taskGroupLocation = taskGroupLocation;
this.taskGroupName = taskGroupName;
- this.tasks = tasks.stream().collect(Collectors.toMap(Task::getTaskID,
Function.identity()));
+ // keep the order of tasks, make sure the order of tasks is the same
as the jars order in
+ // {@link PhysicalVertex::pluginJarsUrls}
+ this.tasks = new LinkedHashMap<>();
+ tasks.forEach(t -> this.tasks.put(t.getTaskID(), t));
}
public String getTaskGroupName() {
@@ -60,4 +62,9 @@ public class TaskGroupDefaultImpl implements TaskGroup {
@Override
public void setTasksContext(Map<Long, TaskExecutionContext>
taskExecutionContextMap) {}
+
+ @Override
+ public TaskGroupType getTaskGroupType() {
+ return TaskGroupType.DEFAULT;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupType.java
similarity index 76%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupType.java
index 3190edb372..a4925238ea 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupType.java
@@ -17,17 +17,8 @@
package org.apache.seatunnel.engine.server.execution;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.net.URL;
-import java.util.Collection;
-
-@Data
-@AllArgsConstructor
-public class TaskGroupContext {
- private TaskGroup taskGroup;
-
- private ClassLoader classLoader;
- private Collection<URL> jars;
+public enum TaskGroupType {
+ DEFAULT,
+ INTERMEDIATE_BLOCKING_QUEUE,
+ INTERMEDIATE_DISRUPTOR_QUEUE,
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupUtils.java
new file mode 100644
index 0000000000..16f12ece1f
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
+import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateDisruptor;
+
+import java.util.Collection;
+
+public class TaskGroupUtils {
+
+ public static TaskGroup createTaskGroup(
+ TaskGroupType type,
+ TaskGroupLocation taskGroupLocation,
+ String taskGroupName,
+ Collection<Task> tasks) {
+ switch (type) {
+ case DEFAULT:
+ return new TaskGroupDefaultImpl(taskGroupLocation,
taskGroupName, tasks);
+ case INTERMEDIATE_BLOCKING_QUEUE:
+ return new TaskGroupWithIntermediateBlockingQueue(
+ taskGroupLocation, taskGroupName, tasks);
+ case INTERMEDIATE_DISRUPTOR_QUEUE:
+ return new TaskGroupWithIntermediateDisruptor(
+ taskGroupLocation, taskGroupName, tasks);
+ default:
+ throw new IllegalArgumentException("Unsupported task group
type: " + type);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
index c90c73b55e..0743c6a3ab 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.internal.nio.IOUtil;
@@ -29,6 +31,8 @@ import lombok.AllArgsConstructor;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
@lombok.Data
@@ -38,9 +42,15 @@ public class TaskGroupImmutableInformation implements
IdentifiedDataSerializable
// Each deployment generates a new executionId
private long executionId;
- private Data group;
+ private TaskGroupType taskGroupType;
- private Set<URL> jars;
+ private TaskGroupLocation taskGroupLocation;
+
+ private String taskGroupName;
+
+ private List<Data> tasksData;
+
+ private List<Set<URL>> jars;
// Set<URL> pluginJarsUrls is a collection of paths stored on the engine
for all connector Jar
// packages and third-party Jar packages that the connector relies on.
@@ -52,7 +62,7 @@ public class TaskGroupImmutableInformation implements
IdentifiedDataSerializable
// connector plugin using the current Jar, the type of the current Jar
package, and so on.
// TODO: Only use Set<ConnectorJarIdentifier>to save more information
about the Jar package,
// including the storage path of the Jar package on the server.
- private Set<ConnectorJarIdentifier> connectorJarIdentifiers;
+ private List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers;
public TaskGroupImmutableInformation() {}
@@ -70,17 +80,30 @@ public class TaskGroupImmutableInformation implements
IdentifiedDataSerializable
public void writeData(ObjectDataOutput out) throws IOException {
out.writeLong(jobId);
out.writeLong(executionId);
+ out.writeObject(taskGroupType);
out.writeObject(jars);
out.writeObject(connectorJarIdentifiers);
- IOUtil.writeData(out, group);
+ out.writeInt(tasksData.size());
+ for (Data data : tasksData) {
+ IOUtil.writeData(out, data);
+ }
+ out.writeObject(taskGroupLocation);
+ out.writeString(taskGroupName);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
jobId = in.readLong();
executionId = in.readLong();
+ taskGroupType = in.readObject();
jars = in.readObject();
connectorJarIdentifiers = in.readObject();
- group = IOUtil.readData(in);
+ int size = in.readInt();
+ tasksData = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ tasksData.add(IOUtil.readData(in));
+ }
+ taskGroupLocation = in.readObject();
+ taskGroupName = in.readString();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
index 6828b92b7e..f1153364f6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.group;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;
@@ -55,4 +56,9 @@ public class TaskGroupWithIntermediateBlockingQueue extends
AbstractTaskGroupWit
blockingQueueCache.computeIfAbsent(id, i -> new
ArrayBlockingQueue<>(QUEUE_SIZE));
return new IntermediateBlockingQueue(blockingQueueCache.get(id));
}
+
+ @Override
+ public TaskGroupType getTaskGroupType() {
+ return TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
index d6544c1662..aabb53ae3f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.task.group;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import
org.apache.seatunnel.engine.server.task.group.queue.IntermediateDisruptor;
@@ -69,4 +70,9 @@ public class TaskGroupWithIntermediateDisruptor extends
AbstractTaskGroupWithInt
this.disruptor.putIfAbsent(id, disruptor);
return new IntermediateDisruptor(this.disruptor.get(id));
}
+
+ @Override
+ public TaskGroupType getTaskGroupType() {
+ return TaskGroupType.INTERMEDIATE_DISRUPTOR_QUEUE;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
index db3ae487a3..dbce951249 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java
@@ -77,7 +77,7 @@ public class SinkPrepareCommitOperation<CommitInfoT> extends
BarrierFlowOperatio
ClassLoader taskClassLoader =
taskExecutionService
.getExecutionContext(taskLocation.getTaskGroupLocation())
- .getClassLoader();
+ .getClassLoader(committerTask.getTaskID());
ClassLoader mainClassLoader =
Thread.currentThread().getContextClassLoader();
if (commitInfos != null) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index b080d2f1ba..afb70b0f93 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -58,7 +58,7 @@ public class AssignSplitOperation<SplitT extends SourceSplit>
extends TracingOpe
ClassLoader taskClassLoader =
server.getTaskExecutionService()
.getExecutionContext(taskID.getTaskGroupLocation())
- .getClassLoader();
+ .getClassLoader(task.getTaskID());
ClassLoader mainClassLoader =
Thread.currentThread().getContextClassLoader();
List<SplitT> deserializeSplits = new ArrayList<>();
try {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 1a4b7469db..eaf2dd2a21 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -36,13 +36,13 @@ public class RequestSplitOperation extends TracingOperation
implements Identifie
private TaskLocation enumeratorTaskID;
- private TaskLocation taskID;
+ private TaskLocation taskLocation;
public RequestSplitOperation() {}
- public RequestSplitOperation(TaskLocation taskID, TaskLocation
enumeratorTaskID) {
+ public RequestSplitOperation(TaskLocation taskLocation, TaskLocation
enumeratorTaskID) {
this.enumeratorTaskID = enumeratorTaskID;
- this.taskID = taskID;
+ this.taskLocation = taskLocation;
}
@Override
@@ -54,12 +54,12 @@ public class RequestSplitOperation extends TracingOperation
implements Identifie
ClassLoader classLoader =
server.getTaskExecutionService()
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
- .getClassLoader();
+ .getClassLoader(taskLocation.getTaskID());
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
- task.requestSplit(taskID.getTaskIndex());
+ task.requestSplit(taskLocation.getTaskIndex());
Thread.currentThread().setContextClassLoader(oldClassLoader);
return null;
},
@@ -81,14 +81,14 @@ public class RequestSplitOperation extends TracingOperation
implements Identifie
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- out.writeObject(taskID);
+ out.writeObject(taskLocation);
out.writeObject(enumeratorTaskID);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- taskID = in.readObject();
+ taskLocation = in.readObject();
enumeratorTaskID = in.readObject();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index a6ee60f6f1..99189c3d4d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -91,7 +91,7 @@ public class RestoredSplitOperation extends TaskOperation {
ClassLoader taskClassLoader =
taskExecutionService
.getExecutionContext(taskLocation.getTaskGroupLocation())
- .getClassLoader();
+ .getClassLoader(task.getTaskID());
ClassLoader mainClassLoader =
Thread.currentThread().getContextClassLoader();
List<SourceSplit> deserializeSplits = new ArrayList<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 35667bf9b5..13701d41a4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -53,7 +53,7 @@ public class SourceNoMoreElementOperation extends
TracingOperation
ClassLoader classLoader =
server.getTaskExecutionService()
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
- .getClassLoader();
+
.getClassLoader(enumeratorTaskID.getTaskID());
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index 56a376dc5d..3bf8a9c315 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -54,7 +54,7 @@ public class SourceReaderEventOperation extends
SourceEventOperation {
ClassLoader classLoader =
server.getTaskExecutionService()
.getExecutionContext(taskLocation.getTaskGroupLocation())
- .getClassLoader();
+ .getClassLoader(task.getTaskID());
task.handleSourceEvent(
currentTaskLocation.getTaskIndex(),
SerializationUtils.deserialize(sourceEvent,
classLoader));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 12685ae8b4..688597c031 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -60,7 +60,7 @@ public class SourceRegisterOperation extends TracingOperation
ClassLoader classLoader =
server.getTaskExecutionService()
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
- .getClassLoader();
+
.getClassLoader(enumeratorTaskID.getTaskID());
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 75659668ab..6b701dd38b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -17,32 +17,47 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.execution.BlockTask;
import org.apache.seatunnel.engine.server.execution.ExceptionTestTask;
import org.apache.seatunnel.engine.server.execution.FixedCallTestTimeTask;
import org.apache.seatunnel.engine.server.execution.StopTimeTestTask;
import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskDeployState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.execution.TaskGroupType;
import org.apache.seatunnel.engine.server.execution.TestTask;
+import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Lists;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.internal.serialization.Data;
+import lombok.NonNull;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static java.util.Collections.emptySet;
import static
org.apache.seatunnel.engine.server.execution.ExecutionState.CANCELED;
import static
org.apache.seatunnel.engine.server.execution.ExecutionState.FAILED;
import static
org.apache.seatunnel.engine.server.execution.ExecutionState.FINISHED;
@@ -63,6 +78,15 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
FLAKE_ID_GENERATOR = instance.getFlakeIdGenerator("test");
}
+ private PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
+ TaskExecutionService taskExecutionService, @NonNull TaskGroup
taskGroup) {
+ Long taskId = taskGroup.getTasks().iterator().next().getTaskID();
+ ConcurrentHashMap<Long, ClassLoader> classLoaders = new
ConcurrentHashMap<>();
+ classLoaders.put(taskId,
Thread.currentThread().getContextClassLoader());
+ return taskExecutionService.deployLocalTask(
+ taskGroup, classLoaders, new ConcurrentHashMap<>());
+ }
+
@Test
public void testCancel() {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -70,8 +94,8 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
long sleepTime = 300;
AtomicBoolean stop = new AtomicBoolean(false);
- TestTask testTask1 = new TestTask(stop, LOGGER, sleepTime, true);
- TestTask testTask2 = new TestTask(stop, LOGGER, sleepTime, false);
+ TestTask testTask1 = new TestTask(stop, sleepTime, true);
+ TestTask testTask2 = new TestTask(stop, sleepTime, false);
TaskGroupDefaultImpl ts =
new TaskGroupDefaultImpl(
@@ -79,7 +103,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
"ts",
Lists.newArrayList(testTask1, testTask2));
CompletableFuture<TaskExecutionState> completableFuture =
- taskExecutionService.deployLocalTask(ts);
+ deployLocalTask(taskExecutionService, ts);
taskExecutionService.cancelTaskGroup(ts.getTaskGroupLocation());
@@ -101,7 +125,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
"ts",
Lists.newArrayList(testTask1, testTask2));
CompletableFuture<TaskExecutionState> completableFuture =
- taskExecutionService.deployLocalTask(ts);
+ deployLocalTask(taskExecutionService, ts);
Thread.sleep(5000);
@@ -120,11 +144,12 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
AtomicBoolean stop = new AtomicBoolean(false);
AtomicBoolean futureMark = new AtomicBoolean(false);
- TestTask testTask1 = new TestTask(stop, LOGGER, sleepTime, true);
- TestTask testTask2 = new TestTask(stop, LOGGER, sleepTime, false);
+ TestTask testTask1 = new TestTask(stop, sleepTime, true);
+ TestTask testTask2 = new TestTask(stop, sleepTime, false);
final CompletableFuture<TaskExecutionState> completableFuture =
- taskExecutionService.deployLocalTask(
+ deployLocalTask(
+ taskExecutionService,
new TaskGroupDefaultImpl(
new TaskGroupLocation(
jobId, pipeLineId,
FLAKE_ID_GENERATOR.newId()),
@@ -141,6 +166,63 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
assertTrue(futureMark.get());
}
+ @Test
+ public void testClassloaderSplit() throws MalformedURLException {
+ TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
+
+ long sleepTime = 300;
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+ TestTask testTask1 = new TestTask(stop, sleepTime, true);
+ TestTask testTask2 = new TestTask(stop, sleepTime, false);
+
+ long jobId = System.currentTimeMillis();
+
+ TaskGroupLocation location = new TaskGroupLocation(jobId, 1, 1);
+ TaskGroupImmutableInformation taskGroupImmutableInformation =
+ new TaskGroupImmutableInformation(
+ jobId,
+ 1,
+ TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE,
+ location,
+ "testClassloaderSplit",
+ Arrays.asList(
+
nodeEngine.getSerializationService().toData(testTask1),
+
nodeEngine.getSerializationService().toData(testTask2)),
+ Arrays.asList(
+ Collections.singleton(new
URL("file://fake.jar")),
+ Collections.singleton(new
URL("file://console.jar"))),
+ Arrays.asList(emptySet(), emptySet()));
+
+ Data data =
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation);
+
+ final TaskDeployState taskDeployState =
taskExecutionService.deployTask(data);
+
+ Assertions.assertEquals(TaskDeployState.success(), taskDeployState);
+
+ TaskGroupContext taskGroupContext =
+ taskExecutionService.getActiveExecutionContext(location);
+ Assertions.assertIterableEquals(
+ Collections.singleton(new URL("file://fake.jar")),
+ taskGroupContext.getJars().get(testTask1.getTaskID()));
+ Assertions.assertIterableEquals(
+ Collections.singleton(new URL("file://console.jar")),
+ taskGroupContext.getJars().get(testTask2.getTaskID()));
+
+ Assertions.assertIterableEquals(
+ Collections.singletonList(new URL("file://fake.jar")),
+ Arrays.asList(
+ ((URLClassLoader)
taskGroupContext.getClassLoader(testTask1.getTaskID()))
+ .getURLs()));
+ Assertions.assertIterableEquals(
+ Collections.singletonList(new URL("file://console.jar")),
+ Arrays.asList(
+ ((URLClassLoader)
taskGroupContext.getClassLoader(testTask2.getTaskID()))
+ .getURLs()));
+
+ taskExecutionService.cancelTaskGroup(location);
+ }
+
/** Test task execution time is the same as the timer timeout */
@Test
public void testCriticalCallTime() throws InterruptedException {
@@ -158,7 +240,8 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
CompletableFuture<TaskExecutionState> taskCts =
- taskExecutionService.deployLocalTask(
+ deployLocalTask(
+ taskExecutionService,
new TaskGroupDefaultImpl(
new TaskGroupLocation(
jobId, pipeLineId,
FLAKE_ID_GENERATOR.newId()),
@@ -211,7 +294,8 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
Collections.shuffle(tasks);
CompletableFuture<TaskExecutionState> taskCts =
- taskExecutionService.deployLocalTask(
+ deployLocalTask(
+ taskExecutionService,
new TaskGroupDefaultImpl(
new TaskGroupLocation(
jobId, pipeLineId,
FLAKE_ID_GENERATOR.newId()),
@@ -219,7 +303,8 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
Lists.newArrayList(tasks)));
CompletableFuture<TaskExecutionState> t1c =
- taskExecutionService.deployLocalTask(
+ deployLocalTask(
+ taskExecutionService,
new TaskGroupDefaultImpl(
new TaskGroupLocation(
jobId, pipeLineId,
FLAKE_ID_GENERATOR.newId()),
@@ -227,7 +312,8 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
Lists.newArrayList(t1)));
CompletableFuture<TaskExecutionState> t2c =
- taskExecutionService.deployLocalTask(
+ deployLocalTask(
+ taskExecutionService,
new TaskGroupDefaultImpl(
new TaskGroupLocation(
jobId, pipeLineId,
FLAKE_ID_GENERATOR.newId()),
@@ -285,7 +371,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
CompletableFuture<TaskExecutionState> completableFuture =
- taskExecutionService.deployLocalTask(taskGroup);
+ deployLocalTask(taskExecutionService, taskGroup);
// stop tasks
Thread.sleep(taskRunTime);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 949903ed55..10d426f589 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -170,6 +170,36 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);
Assertions.assertEquals(
physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(), 2);
+ Assertions.assertEquals(
+ physicalPlan
+ .getPipelineList()
+ .get(0)
+ .getPhysicalVertexList()
+ .get(0)
+ .getTaskGroupImmutableInformation()
+ .getTasksData()
+ .size(),
+ 2);
+ Assertions.assertEquals(
+ physicalPlan
+ .getPipelineList()
+ .get(0)
+ .getPhysicalVertexList()
+ .get(0)
+ .getTaskGroupImmutableInformation()
+ .getJars()
+ .get(0),
+ Sets.newHashSet(new URL("file:///fake.jar")));
+ Assertions.assertEquals(
+ physicalPlan
+ .getPipelineList()
+ .get(0)
+ .getPhysicalVertexList()
+ .get(0)
+ .getTaskGroupImmutableInformation()
+ .getJars()
+ .get(1),
+ Sets.newHashSet(new URL("file:///console.jar")));
}
private static FakeSource createFakeSource() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
index f27330352e..5c174a9293 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/execution/TestTask.java
@@ -19,24 +19,29 @@ package org.apache.seatunnel.engine.server.execution;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import com.hazelcast.logging.ILogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import lombok.NonNull;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
/** For test use, only print logs */
public class TestTask implements Task {
- AtomicBoolean stop;
- long sleep;
- private final ILogger logger;
- boolean isThreadsShare;
+ private static final Logger logger =
LoggerFactory.getLogger(TestTask.class);
+
+ private final AtomicBoolean stop;
+ private final long sleep;
+ private final boolean isThreadsShare;
+ private final long taskId;
- public TestTask(AtomicBoolean stop, ILogger logger, long sleep, boolean
isThreadsShare) {
+ public TestTask(AtomicBoolean stop, long sleep, boolean isThreadsShare) {
this.stop = stop;
- this.logger = logger;
this.sleep = sleep;
this.isThreadsShare = isThreadsShare;
+ this.taskId = new Random().nextInt();
}
@NonNull @Override
@@ -47,7 +52,7 @@ public class TestTask implements Task {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
- logger.severe(ExceptionUtils.getMessage(e));
+ logger.error(ExceptionUtils.getMessage(e));
}
progressState = ProgressState.MADE_PROGRESS;
} else {
@@ -58,7 +63,7 @@ public class TestTask implements Task {
@NonNull @Override
public Long getTaskID() {
- return (long) this.hashCode();
+ return taskId;
}
@Override