This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f03753348 [BugFix][Zeta] Fix finding TaskGroup deployment node bug
(#4449)
f03753348 is described below
commit f03753348e6de80e41121016ed685659fd44182d
Author: ic4y <[email protected]>
AuthorDate: Sat Apr 1 15:37:26 2023 +0800
[BugFix][Zeta] Fix finding TaskGroup deployment node bug (#4449)
---
.../common/config/server/ServerConfigOptions.java | 2 +-
.../server/checkpoint/CheckpointManager.java | 10 +++++++-
.../seatunnel/engine/server/master/JobMaster.java | 29 +++++++++++-----------
.../server/scheduler/PipelineBaseScheduler.java | 5 +++-
.../operation/GetTaskGroupAddressOperation.java | 5 +---
5 files changed, 29 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index c49929586..b5d02c034 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -49,7 +49,7 @@ public class ServerConfigOptions {
public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL =
Options.key("job-metrics-backup-interval")
.intType()
- .defaultValue(60)
+ .defaultValue(10)
.withDescription("The interval (in seconds) of job metrics
backups");
public static final Option<ThreadShareMode>
TASK_EXECUTION_THREAD_SHARE_MODE =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 3e18a458e..971266e09 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -269,11 +269,19 @@ public class CheckpointManager {
}
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation
operation) {
+ log.debug(
+ "Sead Operation : "
+ + operation.getClass().getSimpleName()
+ + " to "
+ + jobMaster.queryTaskGroupAddress(
+
operation.getTaskLocation().getTaskGroupLocation())
+ + " for task group:"
+ + operation.getTaskLocation().getTaskGroupLocation());
return NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
operation,
jobMaster.queryTaskGroupAddress(
-
operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
+ operation.getTaskLocation().getTaskGroupLocation()));
}
/**
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 0b840e22c..23d4010d3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -78,7 +78,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -400,23 +399,23 @@ public class JobMaster {
removeJobIMap();
}
- public Address queryTaskGroupAddress(long taskGroupId) {
- for (PipelineLocation pipelineLocation :
ownedSlotProfilesIMap.keySet()) {
- Optional<TaskGroupLocation> currentVertex =
-
ownedSlotProfilesIMap.get(pipelineLocation).keySet().stream()
- .filter(
- taskGroupLocation ->
- taskGroupLocation.getTaskGroupId()
== taskGroupId)
- .findFirst();
- if (currentVertex.isPresent()) {
- return ownedSlotProfilesIMap
- .get(pipelineLocation)
- .get(currentVertex.get())
- .getWorker();
+ public Address queryTaskGroupAddress(TaskGroupLocation taskGroupLocation) {
+
+ PipelineLocation pipelineLocation =
+ new PipelineLocation(
+ taskGroupLocation.getJobId(),
taskGroupLocation.getPipelineId());
+
+ Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
+ ownedSlotProfilesIMap.get(pipelineLocation);
+
+ if (null != taskGroupLocationSlotProfileMap) {
+ SlotProfile slotProfile =
taskGroupLocationSlotProfileMap.get(taskGroupLocation);
+ if (null != slotProfile) {
+ return slotProfile.getWorker();
}
}
throw new IllegalArgumentException(
- "can't find task group address from task group id: " +
taskGroupId);
+ "can't find task group address from taskGroupLocation: " +
taskGroupLocation);
}
public ClassLoader getClassLoader() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 2bc3b2a5b..fc6aff87f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -97,7 +97,10 @@ public class PipelineBaseScheduler implements JobScheduler {
pipeline,
jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));
- log.debug("slotProfiles: {}", slotProfiles);
+ log.debug(
+ "slotProfiles: {}, PipelineLocation: {}",
+ slotProfiles,
+ pipeline.getPipelineLocation());
// To ensure release pipeline resource after new master node
active, we need store
// slotProfiles first and then deploy tasks.
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 50a887176..ecb94e85c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -52,10 +52,7 @@ public class GetTaskGroupAddressOperation extends Operation
implements Identifie
() ->
server.getCoordinatorService()
.getJobMaster(taskLocation.getJobId())
- .queryTaskGroupAddress(
- taskLocation
- .getTaskGroupLocation()
- .getTaskGroupId()),
+
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation()),
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,