This is an automated email from the ASF dual-hosted git repository.
jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 67c8b7932d [improve] improve TaskLocation/TaskLocationGroup info
(#8862)
67c8b7932d is described below
commit 67c8b7932d43df8d242c3139d3a163c6a0ac7501
Author: Jarvis <[email protected]>
AuthorDate: Thu Mar 6 21:24:49 2025 +0800
[improve] improve TaskLocation/TaskLocationGroup info (#8862)
---
.../server/dag/physical/PhysicalPlanGenerator.java | 57 +++++-----------
.../engine/server/dag/physical/PhysicalVertex.java | 34 +++-------
.../engine/server/execution/TaskLocation.java | 21 +++---
.../seatunnel/engine/server/dag/TaskTest.java | 79 ++++++++++++++++++++++
4 files changed, 119 insertions(+), 72 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 5bd7e642a3..0bc7e46d5f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -91,7 +91,7 @@ public class PhysicalPlanGenerator {
private final List<Pipeline> pipelines;
- private final IdGenerator idGenerator = new IdGenerator();
+ private final IdGenerator taskGroupIdGenerator = new IdGenerator();
private final JobImmutableInformation jobImmutableInformation;
@@ -274,15 +274,14 @@ public class PhysicalPlanGenerator {
}
// if sinkAggregatedCommitter is empty, don't
create task.
if (sinkAggregatedCommitter.isPresent()) {
- long taskGroupID = idGenerator.getNextId();
- long taskTypeId = idGenerator.getNextId();
+ long taskGroupID =
taskGroupIdGenerator.getNextId();
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(
jobImmutableInformation.getJobId(),
pipelineIndex,
taskGroupID);
TaskLocation taskLocation =
- new TaskLocation(taskGroupLocation,
taskTypeId, 0);
+ new TaskLocation(taskGroupLocation, 0,
0);
SinkAggregatedCommitterTask<?, ?> t =
new SinkAggregatedCommitterTask(
jobImmutableInformation.getJobId(),
@@ -342,6 +341,7 @@ public class PhysicalPlanGenerator {
if (shuffleStrategy instanceof
ShuffleMultipleRowStrategy) {
ShuffleMultipleRowStrategy
shuffleMultipleRowStrategy =
(ShuffleMultipleRowStrategy)
shuffleStrategy;
+ AtomicInteger atomicInteger = new
AtomicInteger(0);
for (Flow nextFlow : flow.getNext()) {
PhysicalExecutionFlow sinkFlow =
(PhysicalExecutionFlow) nextFlow;
@@ -349,10 +349,7 @@ public class PhysicalPlanGenerator {
String sinkTableId =
sinkAction.getConfig().getTablePath().toString();
- long taskIDPrefix =
idGenerator.getNextId();
- long taskGroupIDPrefix =
idGenerator.getNextId();
- int parallelismIndex = 0;
-
+ int parallelismIndex =
atomicInteger.getAndIncrement();
ShuffleStrategy shuffleStrategyOfSinkFlow =
shuffleMultipleRowStrategy
.toBuilder()
@@ -363,7 +360,6 @@ public class PhysicalPlanGenerator {
.toBuilder()
.shuffleStrategy(shuffleStrategyOfSinkFlow)
.build();
- long shuffleActionId =
idGenerator.getNextId();
String shuffleActionName =
String.format(
"%s -> %s -> %s",
@@ -372,7 +368,7 @@ public class PhysicalPlanGenerator {
sinkAction.getName());
ShuffleAction shuffleActionOfSinkFlow =
new ShuffleAction(
- shuffleActionId,
+ parallelismIndex,
shuffleActionName,
shuffleConfigOfSinkFlow);
shuffleActionOfSinkFlow.setParallelism(1);
@@ -382,9 +378,7 @@ public class PhysicalPlanGenerator {
Collections.singletonList(sinkFlow));
setFlowConfig(shuffleFlow);
- long taskGroupID =
- mixIDPrefixAndIndex(
- taskGroupIDPrefix,
parallelismIndex);
+ long taskGroupID =
taskGroupIdGenerator.getNextId();
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(
jobImmutableInformation.getJobId(),
@@ -392,9 +386,7 @@ public class PhysicalPlanGenerator {
taskGroupID);
TaskLocation taskLocation =
new TaskLocation(
- taskGroupLocation,
- taskIDPrefix,
- parallelismIndex);
+ taskGroupLocation, 0,
parallelismIndex);
SeaTunnelTask seaTunnelTask =
new TransformSeaTunnelTask(
jobImmutableInformation.getJobId(),
@@ -428,17 +420,15 @@ public class PhysicalPlanGenerator {
runningJobStateTimestampsIMap));
}
} else {
- long taskIDPrefix = idGenerator.getNextId();
- long taskGroupIDPrefix =
idGenerator.getNextId();
for (int i = 0; i <
flow.getAction().getParallelism(); i++) {
- long taskGroupID =
mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+ long taskGroupID =
taskGroupIdGenerator.getNextId();
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(
jobImmutableInformation.getJobId(),
pipelineIndex,
taskGroupID);
TaskLocation taskLocation =
- new
TaskLocation(taskGroupLocation, taskIDPrefix, i);
+ new
TaskLocation(taskGroupLocation, 0, i);
setFlowConfig(flow);
SeaTunnelTask seaTunnelTask =
new TransformSeaTunnelTask(
@@ -483,15 +473,13 @@ public class PhysicalPlanGenerator {
return sources.stream()
.map(
sourceAction -> {
- long taskGroupID = idGenerator.getNextId();
- long taskTypeId = idGenerator.getNextId();
+ long taskGroupID =
taskGroupIdGenerator.getNextId();
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(
jobImmutableInformation.getJobId(),
pipelineIndex,
taskGroupID);
- TaskLocation taskLocation =
- new TaskLocation(taskGroupLocation,
taskTypeId, 0);
+ TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, 0, 0);
SourceSplitEnumeratorTask<?> t =
new SourceSplitEnumeratorTask<>(
jobImmutableInformation.getJobId(),
@@ -541,32 +529,25 @@ public class PhysicalPlanGenerator {
if (sourceWithSink(flow)) {
flows.addAll(splitSinkFromFlow(flow));
}
- long taskGroupIDPrefix = idGenerator.getNextId();
- Map<Long, Long> flowTaskIDPrefixMap = new
HashMap<>();
for (int i = 0; i <
flow.getAction().getParallelism(); i++) {
+ long taskGroupId =
taskGroupIdGenerator.getNextId();
int finalParallelismIndex = i;
- long taskGroupID =
mixIDPrefixAndIndex(taskGroupIDPrefix, i);
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(
jobImmutableInformation.getJobId(),
pipelineIndex,
- taskGroupID);
+ taskGroupId);
+ AtomicInteger taskInTaskGroupIndex = new
AtomicInteger(0);
List<SeaTunnelTask> taskList =
flows.stream()
.map(
f -> {
setFlowConfig(f);
- long taskIDPrefix =
-
flowTaskIDPrefixMap
-
.computeIfAbsent(
-
f.getFlowID(),
-
id ->
-
idGenerator
-
.getNextId());
final TaskLocation
taskLocation =
new
TaskLocation(
taskGroupLocation,
-
taskIDPrefix,
+
taskInTaskGroupIndex
+
.getAndIncrement(),
finalParallelismIndex);
if (f
instanceof
@@ -768,10 +749,6 @@ public class PhysicalPlanGenerator {
.contains(true);
}
- private long mixIDPrefixAndIndex(long idPrefix, int index) {
- return idPrefix * 10000 + index;
- }
-
private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start)
{
List<Action> actions =
edges.stream()
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index b65b3cc56a..435cdc1e3c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -162,30 +162,16 @@ public class PhysicalVertex {
this.currExecutionState = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
this.nodeEngine = nodeEngine;
- if (log.isDebugEnabled() || log.isTraceEnabled()) {
- this.taskFullName =
- String.format(
- "Job %s (%s), Pipeline: [(%d/%d)], task: [%s
(%d/%d)], taskGroupLocation: [%s]",
- jobImmutableInformation.getJobConfig().getName(),
- jobImmutableInformation.getJobId(),
- pipelineId,
- totalPipelineNum,
- taskGroup.getTaskGroupName(),
- subTaskGroupIndex + 1,
- parallelism,
- taskGroupLocation);
- } else {
- this.taskFullName =
- String.format(
- "Job %s (%s), Pipeline: [(%d/%d)], task: [%s
(%d/%d)]",
- jobImmutableInformation.getJobConfig().getName(),
- jobImmutableInformation.getJobId(),
- pipelineId,
- totalPipelineNum,
- taskGroup.getTaskGroupName(),
- subTaskGroupIndex + 1,
- parallelism);
- }
+ this.taskFullName =
+ String.format(
+ "Job (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)],
taskGroupLocation: [%s]",
+ jobImmutableInformation.getJobId(),
+ pipelineId,
+ totalPipelineNum,
+ taskGroup.getTaskGroupName(),
+ subTaskGroupIndex + 1,
+ parallelism,
+ taskGroupLocation);
this.taskFuture = new CompletableFuture<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index 5469b55556..0a136742c7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -37,16 +37,21 @@ public class TaskLocation implements
IdentifiedDataSerializable, Serializable {
private long taskID;
private int index;
+ private static final long SUB_PIPELINE_ID_FACTORY = 10000L * 10000L *
10000L;
+ private static final long GROUP_ID_FACTOR = 10000L * 10000L;
+ private static final long TASK_GROUP_FACTOR = 10000L;
+
public TaskLocation() {}
- public TaskLocation(TaskGroupLocation taskGroupLocation, long idPrefix,
int index) {
+ public TaskLocation(
+ TaskGroupLocation taskGroupLocation, long taskInGroupIndex, int
taskParallelismIndex) {
this.taskGroupLocation = taskGroupLocation;
- this.taskID = mixIDPrefixAndIndex(idPrefix, index);
- this.index = index;
- }
-
- private long mixIDPrefixAndIndex(long idPrefix, int index) {
- return idPrefix * 10000 + index;
+ this.taskID =
+ taskGroupLocation.getPipelineId() * SUB_PIPELINE_ID_FACTORY
+ + taskGroupLocation.getTaskGroupId() * GROUP_ID_FACTOR
+ + taskInGroupIndex * TASK_GROUP_FACTOR
+ + taskParallelismIndex;
+ this.index = taskParallelismIndex;
}
public TaskGroupLocation getTaskGroupLocation() {
@@ -66,7 +71,7 @@ public class TaskLocation implements
IdentifiedDataSerializable, Serializable {
}
public long getTaskVertexId() {
- return taskID / 10000;
+ return taskID;
}
public int getTaskIndex() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index aa75fa563b..911bdc169d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -50,7 +50,11 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -228,6 +232,81 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
Sets.newHashSet(new URL("file:///console.jar")));
}
+ @Test
+ public void testTaskGroupAndTaskLocationInfos() {
+ Long jobId = 1L;
+ LogicalDag testLogicalDag =
+ TestUtils.createTestLogicalPlan(
+ "stream_fake_to_console.conf", "test_task_group_info",
jobId);
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ jobId,
+ "Test",
+ nodeEngine.getSerializationService(),
+ testLogicalDag,
+ Collections.emptyList(),
+ Collections.emptyList());
+ IMap<Object, Object> runningJobState =
+
nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
+ IMap<Object, Long[]> runningJobStateTimestamp =
+
nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+ PhysicalPlan physicalPlan =
+ PlanUtils.fromLogicalDAG(
+ testLogicalDag,
+ nodeEngine,
+ jobImmutableInformation,
+ System.currentTimeMillis(),
+ Executors.newCachedThreadPool(),
+ server.getClassLoaderService(),
+
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+ runningJobState,
+ runningJobStateTimestamp,
+ QueueType.BLOCKINGQUEUE,
+ new EngineConfig())
+ .f0();
+ Assertions.assertEquals(2, physicalPlan.getPipelineList().size());
+ for (int i = 0; i < physicalPlan.getPipelineList().size(); i++) {
+ SubPlan subPlan = physicalPlan.getPipelineList().get(i);
+ int pipelineId = subPlan.getPipelineId();
+
+ for (int j = 0; j < subPlan.getCoordinatorVertexList().size();
j++) {
+ PhysicalVertex physicalVertex =
subPlan.getCoordinatorVertexList().get(j);
+ TaskGroupLocation taskGroupLocation =
physicalVertex.getTaskGroupLocation();
+ List<Task> physicalTasks =
+ new
ArrayList<>(physicalVertex.getTaskGroup().getTasks());
+ for (int taskInGroupIndex = 0;
+ taskInGroupIndex < physicalTasks.size();
+ taskInGroupIndex++) {
+ Task task = physicalTasks.get(taskInGroupIndex);
+ long expectedTaskId =
+ pipelineId * 10000L * 10000L * 10000L
+ + taskGroupLocation.getTaskGroupId() *
10000L * 10000L
+ + taskInGroupIndex * 10000L;
+ Assertions.assertEquals(expectedTaskId, task.getTaskID());
+ }
+ }
+
+ for (int j = 0; j < subPlan.getPhysicalVertexList().size(); j++) {
+ PhysicalVertex physicalVertex =
subPlan.getPhysicalVertexList().get(j);
+ TaskGroupLocation taskGroupLocation =
physicalVertex.getTaskGroupLocation();
+ List<Task> physicalTasks =
+ new
ArrayList<>(physicalVertex.getTaskGroup().getTasks());
+ for (int taskInGroupIndex = 0;
+ taskInGroupIndex < physicalTasks.size();
+ taskInGroupIndex++) {
+ Task task = physicalTasks.get(taskInGroupIndex);
+ // can't get job parallel index, use prefix check
+ long expectedTaskIdPrefix =
+ pipelineId * 10000L * 10000L * 10000L
+ + taskGroupLocation.getTaskGroupId() *
10000L * 10000L
+ + taskInGroupIndex * 10000L;
+ Assertions.assertEquals(
+ expectedTaskIdPrefix / 10000L, task.getTaskID() /
10000L);
+ }
+ }
+ }
+ }
+
private static FakeSource createFakeSource() {
Config fakeSourceConfig =
ConfigFactory.parseMap(