This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new a7d1296fe [Engine][Task] Add fixed taskId when parallelism changed.
(#2455)
a7d1296fe is described below
commit a7d1296fe6c73de9cb8b2963bfb4191088b79dbc
Author: Hisoka <[email protected]>
AuthorDate: Thu Aug 18 18:59:28 2022 +0800
[Engine][Task] Add fixed taskId when parallelism changed. (#2455)
* [Engine][Task] Add fixed taskId when parallelism changed.
* [Engine][Task] fix checkstyle
---
.../server/dag/physical/PhysicalPlanGenerator.java | 26 +++++++++++++++-------
.../engine/server/dag/physical/flow/Flow.java | 2 ++
.../physical/flow/IntermediateExecutionFlow.java | 5 +++++
.../dag/physical/flow/PhysicalExecutionFlow.java | 5 +++++
.../server/task/statemachine/EnumeratorState.java | 13 +++++------
.../seatunnel/engine/server/dag/TaskTest.java | 9 +++++---
6 files changed, 42 insertions(+), 18 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 3e6ca9acf..d2b2e1950 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -185,7 +185,7 @@ public class PhysicalPlanGenerator {
long taskGroupID = idGenerator.getNextId();
SinkAggregatedCommitterTask<?> t =
new
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
idGenerator.getNextId()), s,
+ new TaskLocation(taskGroupID,
convertToTaskID(idGenerator.getNextId(), 1)), s,
sinkAggregatedCommitter.get());
committerTaskIDMap.put(s, new TaskLocation(taskGroupID,
t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
@@ -220,11 +220,12 @@ public class PhysicalPlanGenerator {
.map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
.flatMap(flow -> {
List<PhysicalVertex> t = new ArrayList<>();
+ long taskIDPrefix = idGenerator.getNextId();
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
long taskGroupID = idGenerator.getNextId();
setFlowConfig(flow, i);
SeaTunnelTask seaTunnelTask = new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
idGenerator.getNextId()), i, flow);
+ new TaskLocation(taskGroupID,
convertToTaskID(taskIDPrefix, i)), i, flow);
CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new
NonCompletableFuture<>(taskFuture));
@@ -257,7 +258,7 @@ public class PhysicalPlanGenerator {
return sources.stream().map(s -> {
long taskGroupID = idGenerator.getNextId();
SourceSplitEnumeratorTask<?> t = new
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID, idGenerator.getNextId()), s);
+ new TaskLocation(taskGroupID,
convertToTaskID(idGenerator.getNextId(), 1)), s);
enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID,
t.getTaskID()));
CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
waitForCompleteByPhysicalVertexList.add(new
NonCompletableFuture<>(taskFuture));
@@ -292,18 +293,22 @@ public class PhysicalPlanGenerator {
if (sourceWithSink(flow)) {
flows.addAll(splitSinkFromFlow(flow));
}
+ Map<Long, Long> flowTaskIDPrefixMap = new HashMap<>();
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
int finalParallelismIndex = i;
List<SeaTunnelTask> taskList =
flows.stream().map(f -> {
setFlowConfig(f, finalParallelismIndex);
+ long taskIDPrefix =
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id ->
idGenerator.getNextId());
if (f instanceof PhysicalExecutionFlow) {
return new
SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
idGenerator.getNextId()),
+ new TaskLocation(taskGroupID,
+
convertToTaskID(taskIDPrefix, finalParallelismIndex)),
finalParallelismIndex, f);
} else {
return new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
- new TaskLocation(taskGroupID,
idGenerator.getNextId()),
+ new TaskLocation(taskGroupID,
+
convertToTaskID(taskIDPrefix, finalParallelismIndex)),
finalParallelismIndex, f);
}
}).collect(Collectors.toList());
@@ -428,12 +433,17 @@ public class PhysicalPlanGenerator {
.collect(Collectors.toList()).contains(true);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private long convertToTaskID(long taskTypeID, int index) {
+ return taskTypeID * 10000 + index;
+ }
+
private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start)
{
List<Action> actions = edges.stream().filter(e ->
e.getLeftVertex().getAction().equals(start))
- .map(e ->
e.getRightVertex().getAction()).collect(Collectors.toList());
+ .map(e ->
e.getRightVertex().getAction()).collect(Collectors.toList());
List<Flow> wrappers = actions.stream()
- .filter(a -> a instanceof PartitionTransformAction || a instanceof
SinkAction)
- .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+ .filter(a -> a instanceof PartitionTransformAction || a
instanceof SinkAction)
+ .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
wrappers.addAll(actions.stream()
.filter(a -> !(a instanceof PartitionTransformAction || a
instanceof SinkAction))
.map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges,
a))).collect(Collectors.toList()));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
index 0aa7149eb..bf5529ec3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
@@ -31,4 +31,6 @@ public abstract class Flow implements Serializable {
public List<Flow> getNext() {
return next;
}
+
+ public abstract long getFlowID();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
index 59ccccd80..b3946d898 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
@@ -50,4 +50,9 @@ public class IntermediateExecutionFlow<F extends FlowConfig>
extends Flow {
public IntermediateQueue getQueue() {
return queue;
}
+
+ @Override
+ public long getFlowID() {
+ return queue.getId();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
index db81256df..2a7accfec 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
@@ -49,4 +49,9 @@ public class PhysicalExecutionFlow<T extends Action, F
extends FlowConfig> exten
public T getAction() {
return action;
}
+
+ @Override
+ public long getFlowID() {
+ return action.getId();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
index 483c6c62d..72a37b6a2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
@@ -23,18 +23,17 @@ import java.io.Serializable;
* The state of {@link
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask},
* The task usually start in the state {@code CREATED} and switch states
according to this diagram:
* <p>
- * CREATED -> INIT -> READY -> READER_REGISTER_COMPLETE -> ASSIGN ->
WAITING_FEEDBACK -> READER_CLOSED -> CLOSED
- * | | | | |
- * | | | | |
- * | | | | |
- * | | | | |
- * +--------+----------+-------------------------+--------------+-->
CANCELLING ----> CANCELED
+ * CREATED -> INIT -> READER_REGISTER_COMPLETE -> ASSIGN -> WAITING_FEEDBACK
-> READER_CLOSED -> CLOSED
+ * | | | | |
+ * | | | | |
+ * | | | | |
+ * | | | | |
+ * +--------+----------+----------------------------+--------------+-->
CANCELLING ----> CANCELED
* ... -> FAILED
*/
public enum EnumeratorState implements Serializable {
CREATED,
INIT,
- READY,
READER_REGISTER_COMPLETE,
ASSIGN,
WAITING_FEEDBACK,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 26ab0c51c..17c004ff2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -82,19 +82,22 @@ public class TaskTest {
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake",
fakeSource,
Collections.singletonList(new URL("file:///fake.jar")));
- LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 1);
+ fake.setParallelism(3);
+ LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
FakeSource fakeSource2 = new FakeSource();
fakeSource2.setSeaTunnelContext(SeaTunnelContext.getContext());
Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake",
fakeSource2,
Collections.singletonList(new URL("file:///fake.jar")));
- LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 1);
+ fake2.setParallelism(3);
+ LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 3);
ConsoleSink consoleSink = new ConsoleSink();
consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
Action console = new SinkAction<>(idGenerator.getNextId(), "console",
consoleSink,
Collections.singletonList(new URL("file:///console.jar")));
- LogicalVertex consoleVertex = new LogicalVertex(console.getId(),
console, 1);
+ console.setParallelism(3);
+ LogicalVertex consoleVertex = new LogicalVertex(console.getId(),
console, 3);
LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);