This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 73632bad2b [Hotfix][Zeta] Fix task cannot be stopped when system is 
busy (#7292)
73632bad2b is described below

commit 73632bad2b93e6879c673e0f00bb83035aa51408
Author: hailin0 <[email protected]>
AuthorDate: Tue Aug 6 10:08:45 2024 +0800

    [Hotfix][Zeta] Fix task cannot be stopped when system is busy (#7292)
---
 .../engine/server/TaskExecutionService.java        | 22 ++++++++++++++++------
 .../seatunnel/engine/server/master/JobMaster.java  | 17 ++++++++++++++++-
 2 files changed, 32 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 94f0fa324f..00716f2c90 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -55,6 +55,7 @@ import org.apache.commons.collections4.CollectionUtils;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.instance.impl.NodeState;
 import com.hazelcast.internal.metrics.DynamicMetricsProvider;
 import com.hazelcast.internal.metrics.MetricDescriptor;
@@ -624,9 +625,12 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                     });
                 });
         if (localMap.size() > 0) {
+            boolean lockedIMap = false;
             try {
-                if (!metricsImap.tryLock(
-                        Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, 
TimeUnit.SECONDS)) {
+                lockedIMap =
+                        metricsImap.tryLock(
+                                Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, 
TimeUnit.SECONDS);
+                if (!lockedIMap) {
                     logger.warning("try lock failed in update metrics");
                     return;
                 }
@@ -640,10 +644,16 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                         "The Imap acquisition failed due to the hazelcast node 
being offline or restarted, and will be retried next time",
                         e);
             } finally {
-                try {
-                    metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
-                } catch (Throwable e) {
-                    logger.warning("unlock imap failed in update metrics", e);
+                if (lockedIMap) {
+                    boolean unLockedIMap = false;
+                    while (!unLockedIMap) {
+                        try {
+                            
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                            unLockedIMap = true;
+                        } catch (OperationTimeoutException e) {
+                            logger.warning("unlock imap failed in update 
metrics", e);
+                        }
+                    }
                 }
             }
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index aa74460b05..888114bec9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -72,6 +72,7 @@ import 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.core.HazelcastInstanceNotActiveException;
+import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.datamodel.Tuple2;
@@ -674,8 +675,12 @@ public class JobMaster {
         if ((pipelineStatus.equals(PipelineStatus.FINISHED)
                         && 
!checkpointManager.isPipelineSavePointEnd(pipelineLocation))
                 || pipelineStatus.equals(PipelineStatus.CANCELED)) {
+
+            boolean lockedIMap = false;
             try {
                 metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                lockedIMap = true;
+
                 HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
                         metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
                 if (centralMap != null) {
@@ -693,7 +698,17 @@ public class JobMaster {
                     metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);
                 }
             } finally {
-                metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                if (lockedIMap) {
+                    boolean unLockedIMap = false;
+                    while (!unLockedIMap) {
+                        try {
+                            
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+                            unLockedIMap = true;
+                        } catch (OperationTimeoutException e) {
+                            LOGGER.warning("unlock imap failed in update 
metrics", e);
+                        }
+                    }
+                }
             }
         }
     }

Reply via email to