This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f4aa219a9b [Feature][Zeta] Support history service record job execute 
error (#5114)
f4aa219a9b is described below

commit f4aa219a9b121c48e63c9be9e42450de33963ebc
Author: wu-a-ge <[email protected]>
AuthorDate: Tue Aug 8 12:20:48 2023 +0800

    [Feature][Zeta] Support history service record job execute error (#5114)
    
    * fix:hdfs Checkpoint Storage management fails to delete historical files
    
    * fix:hdfs Checkpoint Storage management fails to delete historical files
    
    * fix after the savepoint job is restored, the checkpoint file cannot be 
generated
    
    * [Feature][Zeta] Support history service record job execute error
    
    * Improve Jobstate-related class additions add serialVersionUID
    
    * add e2e test
---
 .../connector-seatunnel-e2e-base/pom.xml           | 12 ++++++
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 25 +++++++++++
 .../batch_fakesource_to_console_error.conf         | 50 ++++++++++++++++++++++
 .../engine/server/CoordinatorService.java          |  7 +--
 .../server/dag/physical/PipelineLocation.java      |  1 +
 .../engine/server/execution/TaskGroupLocation.java |  1 +
 .../engine/server/master/JobHistoryService.java    |  7 ++-
 .../seatunnel/engine/server/master/JobMaster.java  |  7 +++
 8 files changed, 106 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index a131556534..20a2e612a6 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -92,6 +92,18 @@
             <version>${netty-buffer.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 4609a10dc4..f8891da740 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
@@ -38,6 +39,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -129,6 +131,29 @@ public class JobExecutionIT {
                                                         
objectCompletableFuture.get())));
     }
 
+    @Test
+    public void testGetErrorInfo() throws ExecutionException, 
InterruptedException {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("batch_fakesource_to_console_error.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_console_error");
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(filePath, jobConfig);
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+        JobStatus jobStatus = clientJobProxy.getJobStatus();
+        while (jobStatus == JobStatus.RUNNING) {
+            Thread.sleep(1 * 1000L);
+            jobStatus = clientJobProxy.getJobStatus();
+        }
+        CompletableFuture<JobResult> future = 
clientJobProxy.doWaitForJobComplete();
+        JobResult result = future.get();
+        Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
+        
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+    }
+
     @AfterAll
     static void afterClass() {
         if (hazelcastInstance != null) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_console_error.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_console_error.conf
new file mode 100644
index 0000000000..5fb9b3b80b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_console_error.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    schema {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+transform {
+  sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    query ="select cast(name as int) as name, id,age from fake"
+  }
+}
+sink {
+  console {
+    source_table_name = "fake1"
+  }
+
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d2931d0c37..ef2e356c3d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -495,10 +495,11 @@ public class CoordinatorService {
     public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
-            JobStatus jobStatus = 
jobHistoryService.getJobDetailState(jobId).getJobStatus();
+            JobHistoryService.JobState jobState = 
jobHistoryService.getJobDetailState(jobId);
             CompletableFuture<JobResult> future = new CompletableFuture<>();
-            // TODO support history service record job execute error
-            future.complete(new JobResult(jobStatus, null));
+            if (jobState == null) future.complete(new 
JobResult(JobStatus.FAILED, null));
+            else
+                future.complete(new JobResult(jobState.getJobStatus(), 
jobState.getErrorMessage()));
             return new PassiveCompletableFuture<>(future);
         } else {
             return new 
PassiveCompletableFuture<>(runningJobMaster.getJobMasterCompleteFuture());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
index 45609e5cef..c7a2c3caae 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 @AllArgsConstructor
 @Data
 public class PipelineLocation implements Serializable {
+    private static final long serialVersionUID = 2510281765212372549L;
     private long jobId;
     private int pipelineId;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
index 83686745a8..6dc7cadad6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
 @Data
 @AllArgsConstructor
 public class TaskGroupLocation implements Serializable {
+    private static final long serialVersionUID = -8321526709920799751L;
     private final long jobId;
 
     private final int pipelineId;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index dda9a2d0f3..20961adf70 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -168,6 +168,7 @@ public class JobHistoryService {
     public void storeFinishedJobState(JobMaster jobMaster) {
         JobState jobState = toJobStateMapper(jobMaster, false);
         jobState.setFinishTime(System.currentTimeMillis());
+        jobState.setErrorMessage(jobMaster.getErrorMessage());
         finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
     }
 
@@ -230,7 +231,8 @@ public class JobHistoryService {
         JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId);
         String jobName = jobMaster.getJobImmutableInformation().getJobName();
         long submitTime = 
jobMaster.getJobImmutableInformation().getCreateTime();
-        return new JobState(jobId, jobName, jobStatus, submitTime, null, 
pipelineStateMapperMap);
+        return new JobState(
+                jobId, jobName, jobStatus, submitTime, null, 
pipelineStateMapperMap, null);
     }
 
     public void storeJobInfo(long jobId, JobDAGInfo jobInfo) {
@@ -240,17 +242,20 @@ public class JobHistoryService {
     @AllArgsConstructor
     @Data
     public static final class JobState implements Serializable {
+        private static final long serialVersionUID = -1176348098833918960L;
         private Long jobId;
         private String jobName;
         private JobStatus jobStatus;
         private long submitTime;
         private Long finishTime;
         private Map<PipelineLocation, PipelineStateData> 
pipelineStateMapperMap;
+        private String errorMessage;
     }
 
     @AllArgsConstructor
     @Data
     public static final class PipelineStateData implements Serializable {
+        private static final long serialVersionUID = -7875004875757861958L;
         private PipelineStatus pipelineStatus;
         private Map<TaskGroupLocation, ExecutionState> executionStateMap;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index e14d946c81..404956a7e7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -147,6 +147,12 @@ public class JobMaster {
 
     private CheckpointConfig jobCheckpointConfig;
 
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    private String errorMessage;
+
     public JobMaster(
             @NonNull Data jobImmutableInformationData,
             @NonNull NodeEngine nodeEngine,
@@ -290,6 +296,7 @@ public class JobMaster {
                             if (JobStatus.FAILING.equals(v.getStatus())) {
                                 physicalPlan.updateJobState(JobStatus.FAILING, 
JobStatus.FAILED);
                             }
+                            JobMaster.this.errorMessage = v.getError();
                             JobResult jobResult =
                                     new JobResult(physicalPlan.getJobStatus(), 
v.getError());
                             cleanJob();

Reply via email to