This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 17482c81aa [Feature][zeta] Add the UNKNOWABLE job status. (#5303)
17482c81aa is described below

commit 17482c81aa7baddad4153ecd77d8a95971608133
Author: ic4y <[email protected]>
AuthorDate: Thu Aug 24 18:03:37 2023 +0800

    [Feature][zeta] Add the UNKNOWABLE job status. (#5303)
    
    
    ---------
    
    Co-authored-by: Jia Fan <[email protected]>
---
 .../apache/seatunnel/common/utils/RetryUtils.java  |  2 +-
 .../e2e/ClusterFaultToleranceTwoPipelineIT.java    | 12 ++++++----
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 26 ++++++++++++++++++++--
 .../seatunnel/engine/core/job/JobStatus.java       |  5 ++++-
 .../engine/server/CoordinatorService.java          | 19 +++++++++++++---
 .../seatunnel/engine/server/SeaTunnelServer.java   |  2 +-
 6 files changed, 54 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index aa1bbd5934..e8ee03a501 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -66,7 +66,7 @@ public class RetryUtils {
                                 backoff);
                         Thread.sleep(backoff);
                     } else {
-                        log.debug(attemptMessage, 
ExceptionUtils.getMessage(e), i, retryTimes, 0);
+                        log.info(attemptMessage, ExceptionUtils.getMessage(e), 
i, retryTimes, 0);
                     }
                 }
             }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 608871dd56..3c677b45f3 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -543,9 +543,11 @@ public class ClusterFaultToleranceTwoPipelineIT {
     @Test
     public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
             throws ExecutionException, InterruptedException {
-        String testCaseName = 
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
+        String testCaseName =
+                "testTwoPipelineBatchJobRestoreIn2NodeMasterDown" + 
System.currentTimeMillis();
         String testClusterName =
-                
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
+                
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown"
+                        + System.currentTimeMillis();
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
@@ -651,9 +653,11 @@ public class ClusterFaultToleranceTwoPipelineIT {
     @Test
     public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown()
             throws ExecutionException, InterruptedException {
-        String testCaseName = 
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
+        String testCaseName =
+                "testTwoPipelineStreamJobRestoreIn2NodeMasterDown" + 
System.currentTimeMillis();
         String testClusterName =
-                
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
+                
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown"
+                        + System.currentTimeMillis();
         long testRowNumber = 1000;
         int testParallelism = 6;
         HazelcastInstanceImpl node1 = null;
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index cba498e999..4ecee663ae 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -145,6 +145,28 @@ public class JobExecutionIT {
         
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
     }
 
+    @Test
+    public void testGetUnKnownJobID() {
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+
+        ClientJobProxy newClientJobProxy =
+                
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
+        CompletableFuture<JobStatus> waitForJobCompleteFuture =
+                
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
+        await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.UNKNOWABLE, 
waitForJobCompleteFuture.get()));
+
+        Assertions.assertEquals(
+                "UNKNOWABLE", 
engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
+    }
+
     @Test
     public void testExpiredJobWasDeleted() throws Exception {
         Common.setDeployMode(DeployMode.CLIENT);
@@ -164,8 +186,8 @@ public class JobExecutionIT {
         await().atMost(65, TimeUnit.SECONDS)
                 .untilAsserted(
                         () ->
-                                Assertions.assertThrowsExactly(
-                                        NullPointerException.class, 
clientJobProxy::getJobStatus));
+                                Assertions.assertEquals(
+                                        JobStatus.UNKNOWABLE, 
clientJobProxy.getJobStatus()));
     }
 
     @AfterEach
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
index f9dbfb4c6c..7c50744dba 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -60,7 +60,10 @@ public enum JobStatus {
     SUSPENDED(EndState.LOCALLY),
 
     /** The job is currently reconciling and waits for task execution report 
to recover state. */
-    RECONCILING(EndState.NOT_END);
+    RECONCILING(EndState.NOT_END),
+
+    /** Cannot find the JobID or the job status has already been cleared. */
+    UNKNOWABLE(EndState.GLOBALLY);
 
     // 
--------------------------------------------------------------------------------------------
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5293fe8bf9..89a2258ce2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -505,9 +505,22 @@ public class CoordinatorService {
     public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
-            JobHistoryService.JobState jobState = 
jobHistoryService.getJobDetailState(jobId);
+            // Because operations on Imap cannot be performed within Operation.
+            CompletableFuture<JobHistoryService.JobState> jobStateFuture =
+                    CompletableFuture.supplyAsync(
+                            () -> {
+                                return 
jobHistoryService.getJobDetailState(jobId);
+                            },
+                            executorService);
+            JobHistoryService.JobState jobState = null;
+            try {
+                jobState = jobStateFuture.get();
+            } catch (Exception e) {
+                throw new SeaTunnelEngineException("get job state error", e);
+            }
+
             CompletableFuture<JobResult> future = new CompletableFuture<>();
-            if (jobState == null) future.complete(new 
JobResult(JobStatus.FAILED, null));
+            if (jobState == null) future.complete(new 
JobResult(JobStatus.UNKNOWABLE, null));
             else
                 future.complete(new JobResult(jobState.getJobStatus(), 
jobState.getErrorMessage()));
             return new PassiveCompletableFuture<>(future);
@@ -537,7 +550,7 @@ public class CoordinatorService {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
             JobHistoryService.JobState jobDetailState = 
jobHistoryService.getJobDetailState(jobId);
-            return null == jobDetailState ? null : 
jobDetailState.getJobStatus();
+            return null == jobDetailState ? JobStatus.UNKNOWABLE : 
jobDetailState.getJobStatus();
         }
         return runningJobMaster.getJobStatus();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 56b0e5ec00..88ee1afc9d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -230,7 +230,7 @@ public class SeaTunnelServer
         // must retry until the cluster have master node
         try {
             return RetryUtils.retryWithException(
-                    () -> 
nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()),
+                    () -> 
nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,

Reply via email to