This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 73632bad2b [Hotfix][Zeta] Fix task cannot be stopped when system is
busy (#7292)
73632bad2b is described below
commit 73632bad2b93e6879c673e0f00bb83035aa51408
Author: hailin0 <[email protected]>
AuthorDate: Tue Aug 6 10:08:45 2024 +0800
[Hotfix][Zeta] Fix task cannot be stopped when system is busy (#7292)
---
.../engine/server/TaskExecutionService.java | 22 ++++++++++++++++------
.../seatunnel/engine/server/master/JobMaster.java | 17 ++++++++++++++++-
2 files changed, 32 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 94f0fa324f..00716f2c90 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -55,6 +55,7 @@ import org.apache.commons.collections4.CollectionUtils;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
@@ -624,9 +625,12 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
});
});
if (localMap.size() > 0) {
+ boolean lockedIMap = false;
try {
- if (!metricsImap.tryLock(
- Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5,
TimeUnit.SECONDS)) {
+ lockedIMap =
+ metricsImap.tryLock(
+ Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5,
TimeUnit.SECONDS);
+ if (!lockedIMap) {
logger.warning("try lock failed in update metrics");
return;
}
@@ -640,10 +644,16 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
"The Imap acquisition failed due to the hazelcast node
being offline or restarted, and will be retried next time",
e);
} finally {
- try {
- metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
- } catch (Throwable e) {
- logger.warning("unlock imap failed in update metrics", e);
+ if (lockedIMap) {
+ boolean unLockedIMap = false;
+ while (!unLockedIMap) {
+ try {
+
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ unLockedIMap = true;
+ } catch (OperationTimeoutException e) {
+ logger.warning("unlock imap failed in update
metrics", e);
+ }
+ }
}
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index aa74460b05..888114bec9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -72,6 +72,7 @@ import
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.datamodel.Tuple2;
@@ -674,8 +675,12 @@ public class JobMaster {
if ((pipelineStatus.equals(PipelineStatus.FINISHED)
&&
!checkpointManager.isPipelineSavePointEnd(pipelineLocation))
|| pipelineStatus.equals(PipelineStatus.CANCELED)) {
+
+ boolean lockedIMap = false;
try {
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ lockedIMap = true;
+
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
if (centralMap != null) {
@@ -693,7 +698,17 @@ public class JobMaster {
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY,
centralMap);
}
} finally {
- metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ if (lockedIMap) {
+ boolean unLockedIMap = false;
+ while (!unLockedIMap) {
+ try {
+
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ unLockedIMap = true;
+ } catch (OperationTimeoutException e) {
+ LOGGER.warning("unlock imap failed in update
metrics", e);
+ }
+ }
+ }
}
}
}