This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4ec25f345f [Fix][Zeta] Fix release slot resource twice (#7236)
4ec25f345f is described below
commit 4ec25f345f57debb1b48fcd96a723881161ff492
Author: Jia Fan <[email protected]>
AuthorDate: Mon Jul 22 12:01:36 2024 +0800
[Fix][Zeta] Fix release slot resource twice (#7236)
---
.../seatunnel/engine/e2e/JobClientJobProxyIT.java | 9 ++++++++
.../seatunnel/engine/server/master/JobMaster.java | 25 +++++++++++++++++-----
.../CheckTaskGroupIsExecutingOperation.java | 3 ++-
3 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 3d871adb5a..e6966875e6 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -63,6 +63,15 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
"Restore time 3, pipeline Job
stream_fake_to_inmemory_with_error.conf"));
}
+ @Test
+ public void testNoDuplicatedReleaseSlot() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
+ executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertFalse(
+ server.getLogs().contains("wrong target release operation with
job"));
+ }
+
@Test
public void testMultiTableSinkFailedWithThrowable() throws IOException,
InterruptedException {
Container.ExecResult execResult =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 29d8611f13..e9928a018a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -70,7 +70,6 @@ import
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOp
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
-import com.google.common.collect.Lists;
import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -92,6 +91,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@@ -146,6 +147,8 @@ public class JobMaster {
private Map<Integer, CheckpointPlan> checkpointPlanMap;
+ private final Map<Integer, List<SlotProfile>>
releasedSlotWhenTaskGroupFinished;
+
private final IMap<Long, JobInfo> runningJobInfoIMap;
private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>>
metricsImap;
@@ -190,6 +193,7 @@ public class JobMaster {
this.engineConfig = engineConfig;
this.metricsImap = metricsImap;
this.seaTunnelServer = seaTunnelServer;
+ this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
}
public synchronized void init(long initializationTimestamp, boolean
restart) throws Exception {
@@ -464,13 +468,17 @@ public class JobMaster {
jobImmutableInformation.getJobId(),
Collections.singletonList(taskGroupSlotProfile))
.join();
-
+ releasedSlotWhenTaskGroupFinished
+ .computeIfAbsent(
+ pipelineLocation.getPipelineId(),
+ k -> new CopyOnWriteArrayList<>())
+ .add(taskGroupSlotProfile);
return null;
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
- exception ->
ExceptionUtil.isOperationNeedRetryException(exception),
+ ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(
@@ -487,6 +495,11 @@ public class JobMaster {
if (taskGroupLocationSlotProfileMap == null) {
return;
}
+ List<SlotProfile> alreadyReleased = new ArrayList<>();
+ if
(releasedSlotWhenTaskGroupFinished.containsKey(subPlan.getPipelineId())) {
+ alreadyReleased.addAll(
+
releasedSlotWhenTaskGroupFinished.get(subPlan.getPipelineId()));
+ }
RetryUtils.retryWithException(
() -> {
@@ -497,10 +510,12 @@ public class JobMaster {
resourceManager
.releaseResources(
jobImmutableInformation.getJobId(),
- Lists.newArrayList(
-
taskGroupLocationSlotProfileMap.values()))
+
taskGroupLocationSlotProfileMap.values().stream()
+ .filter(p ->
!alreadyReleased.contains(p))
+ .collect(Collectors.toList()))
.join();
ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation());
+
releasedSlotWhenTaskGroupFinished.remove(subPlan.getPipelineId());
return null;
},
new RetryUtils.RetryMaterial(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
index c43381b785..d4e158abdb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
@@ -46,7 +46,8 @@ public class CheckTaskGroupIsExecutingOperation extends
Operation
SeaTunnelServer server = getService();
try {
response =
-
server.getTaskExecutionService().getExecutionContext(taskGroupLocation) != null;
+
server.getTaskExecutionService().getActiveExecutionContext(taskGroupLocation)
+ != null;
} catch (TaskGroupContextNotFoundException e) {
response = false;
}