This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6a7df83b3e [Fix][Zeta] Fix task can not end cause by lock metrics 
failed (#7357)
6a7df83b3e is described below

commit 6a7df83b3e95affcce0edd66e25f0e236f3d04d7
Author: Jia Fan <[email protected]>
AuthorDate: Tue Aug 13 17:41:08 2024 +0800

    [Fix][Zeta] Fix task can not end cause by lock metrics failed (#7357)
---
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 61 ++++++++++++----------
 .../engine/server/TaskExecutionService.java        | 10 ++--
 .../engine/server/dag/physical/SubPlan.java        |  9 +++-
 .../seatunnel/engine/server/master/JobMaster.java  | 12 ++++-
 .../engine/server/TaskExecutionServiceTest.java    | 13 -----
 5 files changed, 58 insertions(+), 47 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 0d1a647ded..86ec21a375 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -22,12 +22,15 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -36,8 +39,10 @@ import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.map.IMap;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -73,11 +78,17 @@ public class JobExecutionIT {
 
     @Test
     public void testExecuteJob() throws Exception {
+        runJobFileWithAssertEndStatus(
+                "batch_fakesource_to_file.conf", "fake_to_file", 
JobStatus.FINISHED);
+    }
+
+    private static void runJobFileWithAssertEndStatus(
+            String confFile, String name, JobStatus finished)
+            throws ExecutionException, InterruptedException {
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = 
TestUtils.getResource("batch_fakesource_to_file.conf");
+        String filePath = TestUtils.getResource(confFile);
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("fake_to_file");
-
+        jobConfig.setName(name);
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
         try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
@@ -94,11 +105,25 @@ public class JobExecutionIT {
                             () ->
                                     Assertions.assertTrue(
                                             objectCompletableFuture.isDone()
-                                                    && 
JobStatus.FINISHED.equals(
+                                                    && finished.equals(
                                                             
objectCompletableFuture.get())));
         }
     }
 
+    @Test
+    public void testExecuteJobWithLockMetrics() throws Exception {
+        // lock metrics map
+        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap 
=
+                hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+        try {
+            runJobFileWithAssertEndStatus(
+                    "batch_fakesource_to_file.conf", "fake_to_file", 
JobStatus.FINISHED);
+        } finally {
+            metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+        }
+    }
+
     @Test
     public void cancelJobTest() throws Exception {
         Common.setDeployMode(DeployMode.CLIENT);
@@ -229,29 +254,9 @@ public class JobExecutionIT {
 
     @Test
     public void testLastCheckpointErrorJob() throws Exception {
-        Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = 
TestUtils.getResource("batch_last_checkpoint_error.conf");
-        JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("batch_last_checkpoint_error");
-
-        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
-        try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) 
{
-            ClientJobExecutionEnvironment jobExecutionEnv =
-                    engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
-
-            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
-            CompletableFuture<JobStatus> objectCompletableFuture =
-                    
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
-            await().atMost(600000, TimeUnit.MILLISECONDS)
-                    .untilAsserted(
-                            () ->
-                                    Assertions.assertTrue(
-                                            objectCompletableFuture.isDone()
-                                                    && JobStatus.FAILED.equals(
-                                                            
objectCompletableFuture.get())));
-        }
+        runJobFileWithAssertEndStatus(
+                "batch_last_checkpoint_error.conf",
+                "batch_last_checkpoint_error",
+                JobStatus.FAILED);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 00716f2c90..a4717c6a81 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -968,10 +968,14 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 cancellationFutures.remove(taskGroupLocation);
                 try {
                     cancelAsyncFunction(taskGroupLocation);
-                } catch (Throwable e) {
-                    throw new RuntimeException(e);
+                } catch (Throwable t) {
+                    logger.severe("cancel async function failed", t);
+                }
+                try {
+                    updateMetricsContextInImap();
+                } catch (Throwable t) {
+                    logger.severe("update metrics context in imap failed", t);
                 }
-                updateMetricsContextInImap();
                 if (ex == null) {
                     logger.info(
                             String.format(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 5e023f81e8..6e6667ddda 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -308,7 +308,14 @@ public class SubPlan {
             RetryUtils.retryWithException(
                     () -> {
                         
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
-                        jobMaster.removeMetricsContext(getPipelineLocation(), 
pipelineStatus);
+                        try {
+                            
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
+                        } catch (Throwable e) {
+                            log.error(
+                                    "Remove metrics context for pipeline {} 
failed, with exception: {}",
+                                    pipelineFullName,
+                                    ExceptionUtils.getMessage(e));
+                        }
                         notifyCheckpointManagerPipelineEnd(pipelineStatus);
                         jobMaster.releasePipelineResource(this);
                         return null;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 888114bec9..f521c05492 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -95,6 +95,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
@@ -678,8 +679,13 @@ public class JobMaster {
 
             boolean lockedIMap = false;
             try {
-                metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
-                lockedIMap = true;
+                lockedIMap =
+                        metricsImap.tryLock(
+                                Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, 
TimeUnit.SECONDS);
+                if (!lockedIMap) {
+                    LOGGER.severe("lock imap failed in update metrics");
+                    return;
+                }
 
                 HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
                         metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
@@ -697,6 +703,8 @@ public class JobMaster {
                     collect.forEach(centralMap::remove);
                     metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);
                 }
+            } catch (Exception e) {
+                LOGGER.warning("failed to remove metrics context", e);
             } finally {
                 if (lockedIMap) {
                     boolean unLockedIMap = false;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 3658f32a13..75659668ab 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -28,7 +28,6 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TestTask;
 
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
@@ -65,8 +64,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @Test
-    @Disabled(
-            "As we have more and more test cases the test the load of the test 
container will up, the test case may failed")
     public void testCancel() {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -92,8 +89,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @Test
-    @Disabled(
-            "As we have more and more test cases the test the load of the test 
container will up, the test case may failed")
     public void testCancelBlockTask() throws InterruptedException {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -118,8 +113,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @Test
-    @Disabled(
-            "As we have more and more test cases the test the load of the test 
container will up, the test case may failed")
     public void testFinish() {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -150,8 +143,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
 
     /** Test task execution time is the same as the timer timeout */
     @Test
-    @Disabled(
-            "As we have more and more test cases the test the load of the test 
container will up, the test case may failed")
     public void testCriticalCallTime() throws InterruptedException {
         AtomicBoolean stopMark = new AtomicBoolean(false);
         CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
@@ -189,8 +180,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @Test
-    @Disabled(
-            "As we have more and more test cases the test the load of the test 
container will up, the test case may failed")
     public void testThrowException() throws InterruptedException {
         TaskExecutionService taskExecutionService = 
server.getTaskExecutionService();
 
@@ -264,8 +253,6 @@ public class TaskExecutionServiceTest extends 
AbstractSeaTunnelServerTest {
     }
 
     @RepeatedTest(2)
-    @Disabled(
-            "As we have more and more test cases the test the load of the test 
container will up, the test case may failed")
     public void testDelay() throws InterruptedException {
 
         long lowLagSleep = 10;

Reply via email to