This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f4aa219a9b [Feature][Zeta] Support history service record job execute
error (#5114)
f4aa219a9b is described below
commit f4aa219a9b121c48e63c9be9e42450de33963ebc
Author: wu-a-ge <[email protected]>
AuthorDate: Tue Aug 8 12:20:48 2023 +0800
[Feature][Zeta] Support history service record job execute error (#5114)
* fix:hdfs Checkpoint Storage management fails to delete historical files
* fix:hdfs Checkpoint Storage management fails to delete historical files
* fix after the savepoint job is restored, the checkpoint file cannot be
generated
* [Feature][Zeta] Support history service record job execute error
* Improve Jobstate-related class additions add serialVersionUID
* add e2e test
---
.../connector-seatunnel-e2e-base/pom.xml | 12 ++++++
.../seatunnel/engine/e2e/JobExecutionIT.java | 25 +++++++++++
.../batch_fakesource_to_console_error.conf | 50 ++++++++++++++++++++++
.../engine/server/CoordinatorService.java | 7 +--
.../server/dag/physical/PipelineLocation.java | 1 +
.../engine/server/execution/TaskGroupLocation.java | 1 +
.../engine/server/master/JobHistoryService.java | 7 ++-
.../seatunnel/engine/server/master/JobMaster.java | 7 +++
8 files changed, 106 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index a131556534..20a2e612a6 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -92,6 +92,18 @@
<version>${netty-buffer.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 4609a10dc4..f8891da740 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
@@ -38,6 +39,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -129,6 +131,29 @@ public class JobExecutionIT {
objectCompletableFuture.get())));
}
+ @Test
+ public void testGetErrorInfo() throws ExecutionException,
InterruptedException {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("batch_fakesource_to_console_error.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_console_error");
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ JobStatus jobStatus = clientJobProxy.getJobStatus();
+ while (jobStatus == JobStatus.RUNNING) {
+ Thread.sleep(1 * 1000L);
+ jobStatus = clientJobProxy.getJobStatus();
+ }
+ CompletableFuture<JobResult> future =
clientJobProxy.doWaitForJobComplete();
+ JobResult result = future.get();
+ Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
+
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+ }
+
@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_console_error.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_console_error.conf
new file mode 100644
index 0000000000..5fb9b3b80b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_console_error.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ schema {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+transform {
+ sql {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ query ="select cast(name as int) as name, id,age from fake"
+ }
+}
+sink {
+ console {
+ source_table_name = "fake1"
+ }
+
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d2931d0c37..ef2e356c3d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -495,10 +495,11 @@ public class CoordinatorService {
public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
- JobStatus jobStatus =
jobHistoryService.getJobDetailState(jobId).getJobStatus();
+ JobHistoryService.JobState jobState =
jobHistoryService.getJobDetailState(jobId);
CompletableFuture<JobResult> future = new CompletableFuture<>();
- // TODO support history service record job execute error
- future.complete(new JobResult(jobStatus, null));
+ if (jobState == null) future.complete(new
JobResult(JobStatus.FAILED, null));
+ else
+ future.complete(new JobResult(jobState.getJobStatus(),
jobState.getErrorMessage()));
return new PassiveCompletableFuture<>(future);
} else {
return new
PassiveCompletableFuture<>(runningJobMaster.getJobMasterCompleteFuture());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
index 45609e5cef..c7a2c3caae 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PipelineLocation.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
@AllArgsConstructor
@Data
public class PipelineLocation implements Serializable {
+ private static final long serialVersionUID = 2510281765212372549L;
private long jobId;
private int pipelineId;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
index 83686745a8..6dc7cadad6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupLocation.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
@Data
@AllArgsConstructor
public class TaskGroupLocation implements Serializable {
+ private static final long serialVersionUID = -8321526709920799751L;
private final long jobId;
private final int pipelineId;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index dda9a2d0f3..20961adf70 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -168,6 +168,7 @@ public class JobHistoryService {
public void storeFinishedJobState(JobMaster jobMaster) {
JobState jobState = toJobStateMapper(jobMaster, false);
jobState.setFinishTime(System.currentTimeMillis());
+ jobState.setErrorMessage(jobMaster.getErrorMessage());
finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
}
@@ -230,7 +231,8 @@ public class JobHistoryService {
JobStatus jobStatus = (JobStatus) runningJobStateIMap.get(jobId);
String jobName = jobMaster.getJobImmutableInformation().getJobName();
long submitTime =
jobMaster.getJobImmutableInformation().getCreateTime();
- return new JobState(jobId, jobName, jobStatus, submitTime, null,
pipelineStateMapperMap);
+ return new JobState(
+ jobId, jobName, jobStatus, submitTime, null,
pipelineStateMapperMap, null);
}
public void storeJobInfo(long jobId, JobDAGInfo jobInfo) {
@@ -240,17 +242,20 @@ public class JobHistoryService {
@AllArgsConstructor
@Data
public static final class JobState implements Serializable {
+ private static final long serialVersionUID = -1176348098833918960L;
private Long jobId;
private String jobName;
private JobStatus jobStatus;
private long submitTime;
private Long finishTime;
private Map<PipelineLocation, PipelineStateData>
pipelineStateMapperMap;
+ private String errorMessage;
}
@AllArgsConstructor
@Data
public static final class PipelineStateData implements Serializable {
+ private static final long serialVersionUID = -7875004875757861958L;
private PipelineStatus pipelineStatus;
private Map<TaskGroupLocation, ExecutionState> executionStateMap;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index e14d946c81..404956a7e7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -147,6 +147,12 @@ public class JobMaster {
private CheckpointConfig jobCheckpointConfig;
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ private String errorMessage;
+
public JobMaster(
@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@@ -290,6 +296,7 @@ public class JobMaster {
if (JobStatus.FAILING.equals(v.getStatus())) {
physicalPlan.updateJobState(JobStatus.FAILING,
JobStatus.FAILED);
}
+ JobMaster.this.errorMessage = v.getError();
JobResult jobResult =
new JobResult(physicalPlan.getJobStatus(),
v.getError());
cleanJob();