This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 12459d19 [Bug] [Seatunnel-web] When job execution initialization 
fails, the job execution status remains unchanged. (#194)
12459d19 is described below

commit 12459d1998f7c6c8717bb9564a806b0f3d4a5de3
Author: Mohammad Arshad <[email protected]>
AuthorDate: Sun Aug 25 14:49:39 2024 +0530

    [Bug] [Seatunnel-web] When job execution initialization fails, the job 
execution status remains unchanged. (#194)
---
 .../app/service/impl/JobExecutorServiceImpl.java   | 59 +++++++++++++---------
 .../server/common/SeatunnelErrorEnum.java          |  1 +
 .../app/test/JobExecutorControllerTest.java        | 22 ++++++++
 3 files changed, 59 insertions(+), 23 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index fbcf0190..b8ca731e 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -30,7 +30,6 @@ import 
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory
 import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -39,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 
 import org.springframework.stereotype.Service;
 
@@ -52,6 +52,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.Date;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -73,9 +74,18 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
 
         String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);
 
-        Long jobInstanceId =
-                executeJobBySeaTunnel(userId, configFile, 
executeResource.getJobInstanceId());
-        return Result.success(jobInstanceId);
+        try {
+            executeJobBySeaTunnel(userId, configFile, 
executeResource.getJobInstanceId());
+            return Result.success(executeResource.getJobInstanceId());
+        } catch (RuntimeException e) {
+            Result<Long> failure =
+                    
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
+            // Even though job execution submission failed, we still need to 
return the
+            // jobInstanceId to the user
+            // as the job instance has been created in the database.
+            failure.setData(executeResource.getJobInstanceId());
+            return failure;
+        }
     }
 
     public String writeJobConfigIntoConfFile(String jobConfig, Long 
jobDefineId) {
@@ -101,35 +111,38 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
         return filePath;
     }
 
-    public Long executeJobBySeaTunnel(Integer userId, String filePath, Long 
jobInstanceId) {
+    private void executeJobBySeaTunnel(Integer userId, String filePath, Long 
jobInstanceId) {
         Common.setDeployMode(DeployMode.CLIENT);
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName(jobInstanceId + "_job");
-        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        SeaTunnelClient seaTunnelClient;
+        ClientJobProxy clientJobProxy;
         try {
+            seaTunnelClient = createSeaTunnelClient();
             SeaTunnelConfig seaTunnelConfig = new 
YamlSeaTunnelConfigBuilder().build();
             ClientJobExecutionEnvironment jobExecutionEnv =
                     seaTunnelClient.createExecutionContext(filePath, 
jobConfig, seaTunnelConfig);
-            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            clientJobProxy = jobExecutionEnv.execute();
+        } catch (Throwable e) {
+            log.error("Job execution submission failed.", e);
             JobInstance jobInstance = 
jobInstanceDao.getJobInstance(jobInstanceId);
-            
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
+            jobInstance.setJobStatus(JobStatus.FAILED.name());
+            jobInstance.setEndTime(new Date());
             jobInstanceDao.update(jobInstance);
-
-            CompletableFuture.runAsync(
-                    () -> {
-                        waitJobFinish(
-                                clientJobProxy,
-                                userId,
-                                jobInstanceId,
-                                Long.toString(clientJobProxy.getJobId()),
-                                seaTunnelClient);
-                    });
-
-        } catch (ExecutionException | InterruptedException e) {
-            ExceptionUtils.getMessage(e);
-            throw new RuntimeException(e);
+            throw new RuntimeException(e.getMessage(), e);
         }
-        return jobInstanceId;
+        JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
+        jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
+        jobInstanceDao.update(jobInstance);
+        CompletableFuture.runAsync(
+                () -> {
+                    waitJobFinish(
+                            clientJobProxy,
+                            userId,
+                            jobInstanceId,
+                            Long.toString(clientJobProxy.getJobId()),
+                            seaTunnelClient);
+                });
     }
 
     public void waitJobFinish(
diff --git 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
index b81cec09..5f4b218e 100644
--- 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
+++ 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
@@ -82,6 +82,7 @@ public enum SeatunnelErrorEnum {
             "load job state from engine error",
             "load job statue from engine [%s] error, error msg is [%s]"),
     UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] 
version [%s]"),
+    JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
 
     JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid 
error"),
     /* datasource and virtual table */
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 7c942e1d..965ad200 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -243,6 +243,28 @@ public class JobExecutorControllerTest {
         assertTrue(result.isSuccess());
     }
 
+    @Test
+    public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
+        String jobName = "execJobStatus" + uniqueId;
+        JobCreateReq jobCreateReq = 
JobUtils.populateMySQLJobCreateReqFromFile();
+        jobCreateReq.getJobConfig().setName(jobName);
+        jobCreateReq.getJobConfig().setDescription(jobName + " description");
+        String datasourceName = "execJobStatus_db_1" + uniqueId;
+        String mysqlDatasourceId =
+                
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
+        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+            pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
+        }
+        Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(job.isSuccess());
+        Long jobVersionId = job.getData();
+        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+        // Fails because of the wrong database credentials.
+        assertFalse(result.isSuccess());
+        // Even though job failed but job instance is created into the 
database.
+        assertTrue(result.getData() > 0);
+    }
+
     @AfterAll
     public static void tearDown() {
         seaTunnelWebCluster.stop();

Reply via email to