This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6a7df83b3e [Fix][Zeta] Fix task can not end cause by lock metrics
failed (#7357)
6a7df83b3e is described below
commit 6a7df83b3e95affcce0edd66e25f0e236f3d04d7
Author: Jia Fan <[email protected]>
AuthorDate: Tue Aug 13 17:41:08 2024 +0800
[Fix][Zeta] Fix task can not end cause by lock metrics failed (#7357)
---
.../seatunnel/engine/e2e/JobExecutionIT.java | 61 ++++++++++++----------
.../engine/server/TaskExecutionService.java | 10 ++--
.../engine/server/dag/physical/SubPlan.java | 9 +++-
.../seatunnel/engine/server/master/JobMaster.java | 12 ++++-
.../engine/server/TaskExecutionServiceTest.java | 13 -----
5 files changed, 58 insertions(+), 47 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 0d1a647ded..86ec21a375 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -22,12 +22,15 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -36,8 +39,10 @@ import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.map.IMap;
import lombok.extern.slf4j.Slf4j;
+import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -73,11 +78,17 @@ public class JobExecutionIT {
@Test
public void testExecuteJob() throws Exception {
+ runJobFileWithAssertEndStatus(
+ "batch_fakesource_to_file.conf", "fake_to_file",
JobStatus.FINISHED);
+ }
+
+ private static void runJobFileWithAssertEndStatus(
+ String confFile, String name, JobStatus finished)
+ throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath =
TestUtils.getResource("batch_fakesource_to_file.conf");
+ String filePath = TestUtils.getResource(confFile);
JobConfig jobConfig = new JobConfig();
- jobConfig.setName("fake_to_file");
-
+ jobConfig.setName(name);
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
@@ -94,11 +105,25 @@ public class JobExecutionIT {
() ->
Assertions.assertTrue(
objectCompletableFuture.isDone()
- &&
JobStatus.FINISHED.equals(
+ && finished.equals(
objectCompletableFuture.get())));
}
}
+ @Test
+ public void testExecuteJobWithLockMetrics() throws Exception {
+ // lock metrics map
+ IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
+ hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ try {
+ runJobFileWithAssertEndStatus(
+ "batch_fakesource_to_file.conf", "fake_to_file",
JobStatus.FINISHED);
+ } finally {
+ metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ }
+ }
+
@Test
public void cancelJobTest() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
@@ -229,29 +254,9 @@ public class JobExecutionIT {
@Test
public void testLastCheckpointErrorJob() throws Exception {
- Common.setDeployMode(DeployMode.CLIENT);
- String filePath =
TestUtils.getResource("batch_last_checkpoint_error.conf");
- JobConfig jobConfig = new JobConfig();
- jobConfig.setName("batch_last_checkpoint_error");
-
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
- ClientJobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
-
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-
- CompletableFuture<JobStatus> objectCompletableFuture =
-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
- await().atMost(600000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () ->
- Assertions.assertTrue(
- objectCompletableFuture.isDone()
- && JobStatus.FAILED.equals(
-
objectCompletableFuture.get())));
- }
+ runJobFileWithAssertEndStatus(
+ "batch_last_checkpoint_error.conf",
+ "batch_last_checkpoint_error",
+ JobStatus.FAILED);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 00716f2c90..a4717c6a81 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -968,10 +968,14 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
cancellationFutures.remove(taskGroupLocation);
try {
cancelAsyncFunction(taskGroupLocation);
- } catch (Throwable e) {
- throw new RuntimeException(e);
+ } catch (Throwable t) {
+ logger.severe("cancel async function failed", t);
+ }
+ try {
+ updateMetricsContextInImap();
+ } catch (Throwable t) {
+ logger.severe("update metrics context in imap failed", t);
}
- updateMetricsContextInImap();
if (ex == null) {
logger.info(
String.format(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 5e023f81e8..6e6667ddda 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -308,7 +308,14 @@ public class SubPlan {
RetryUtils.retryWithException(
() -> {
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
- jobMaster.removeMetricsContext(getPipelineLocation(),
pipelineStatus);
+ try {
+
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
+ } catch (Throwable e) {
+ log.error(
+ "Remove metrics context for pipeline {}
failed, with exception: {}",
+ pipelineFullName,
+ ExceptionUtils.getMessage(e));
+ }
notifyCheckpointManagerPipelineEnd(pipelineStatus);
jobMaster.releasePipelineResource(this);
return null;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 888114bec9..f521c05492 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -95,6 +95,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
@@ -678,8 +679,13 @@ public class JobMaster {
boolean lockedIMap = false;
try {
- metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
- lockedIMap = true;
+ lockedIMap =
+ metricsImap.tryLock(
+ Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5,
TimeUnit.SECONDS);
+ if (!lockedIMap) {
+ LOGGER.severe("lock imap failed in update metrics");
+ return;
+ }
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
@@ -697,6 +703,8 @@ public class JobMaster {
collect.forEach(centralMap::remove);
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY,
centralMap);
}
+ } catch (Exception e) {
+ LOGGER.warning("failed to remove metrics context", e);
} finally {
if (lockedIMap) {
boolean unLockedIMap = false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 3658f32a13..75659668ab 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -28,7 +28,6 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TestTask;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
@@ -65,8 +64,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@Test
- @Disabled(
- "As we have more and more test cases the test the load of the test
container will up, the test case may failed")
public void testCancel() {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -92,8 +89,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@Test
- @Disabled(
- "As we have more and more test cases the test the load of the test
container will up, the test case may failed")
public void testCancelBlockTask() throws InterruptedException {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -118,8 +113,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@Test
- @Disabled(
- "As we have more and more test cases the test the load of the test
container will up, the test case may failed")
public void testFinish() {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -150,8 +143,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
/** Test task execution time is the same as the timer timeout */
@Test
- @Disabled(
- "As we have more and more test cases the test the load of the test
container will up, the test case may failed")
public void testCriticalCallTime() throws InterruptedException {
AtomicBoolean stopMark = new AtomicBoolean(false);
CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
@@ -189,8 +180,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@Test
- @Disabled(
- "As we have more and more test cases the test the load of the test
container will up, the test case may failed")
public void testThrowException() throws InterruptedException {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -264,8 +253,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@RepeatedTest(2)
- @Disabled(
- "As we have more and more test cases the test the load of the test
container will up, the test case may failed")
public void testDelay() throws InterruptedException {
long lowLagSleep = 10;