This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 192d90415 [Hotfix][Zeta][Client] Fix Client Not Throw Exception When
Error Msg Is Null (#4107)
192d90415 is described below
commit 192d90415401bc75e2d996e126636bcb635aca4e
Author: Hisoka <[email protected]>
AuthorDate: Mon Feb 13 14:17:14 2023 +0800
[Hotfix][Zeta][Client] Fix Client Not Throw Exception When Error Msg Is
Null (#4107)
* [Improve] [Zeta] [Client] Fix Client Not Throw Exception When Error Msg
Is Null
* [Improve][Zeta][Client] Reformat code with spotless
---------
Co-authored-by: tyrantlucifer <[email protected]>
---
.../core/starter/seatunnel/command/ClientExecuteCommand.java | 3 ++-
.../org/apache/seatunnel/engine/client/job/ClientJobProxy.java | 9 +++++++--
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index b94c6d9bd..245bf6bfe 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -211,7 +211,8 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
private void shutdownHook(ClientJobProxy clientJobProxy) {
if (clientCommandArgs.isCloseJob()) {
- if (jobStatus == null || !jobStatus.isEndState()) {
+ if (clientJobProxy.getJobResultCache() == null
+ && (jobStatus == null || !jobStatus.isEndState())) {
log.warn("Task will be closed due to client shutdown.");
clientJobProxy.cancelJob();
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 3bfbbdd5c..f42e856e1 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -43,6 +43,7 @@ public class ClientJobProxy implements Job {
private static final ILogger LOGGER =
Logger.getLogger(ClientJobProxy.class);
private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private final JobImmutableInformation jobImmutableInformation;
+ private JobResult jobResult;
public ClientJobProxy(
@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
@@ -81,7 +82,6 @@ public class ClientJobProxy implements Job {
*/
@Override
public JobStatus waitForJobComplete() {
- JobResult jobResult;
try {
jobResult =
RetryUtils.retryWithException(
@@ -113,12 +113,17 @@ public class ClientJobProxy implements Job {
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
jobResult.getStatus()));
- if (StringUtils.isNotEmpty(jobResult.getError())) {
+ if (StringUtils.isNotEmpty(jobResult.getError())
+ || jobResult.getStatus().equals(JobStatus.FAILED)) {
throw new SeaTunnelEngineException(jobResult.getError());
}
return jobResult.getStatus();
}
+ public JobResult getJobResultCache() {
+ return jobResult;
+ }
+
@Override
public PassiveCompletableFuture<JobResult> doWaitForJobComplete() {
return new PassiveCompletableFuture<>(