This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 17482c81aa [Feature][zeta] Add the UNKNOWABLE job status. (#5303)
17482c81aa is described below
commit 17482c81aa7baddad4153ecd77d8a95971608133
Author: ic4y <[email protected]>
AuthorDate: Thu Aug 24 18:03:37 2023 +0800
[Feature][zeta] Add the UNKNOWABLE job status. (#5303)
---------
Co-authored-by: Jia Fan <[email protected]>
---
.../apache/seatunnel/common/utils/RetryUtils.java | 2 +-
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 12 ++++++----
.../seatunnel/engine/e2e/JobExecutionIT.java | 26 ++++++++++++++++++++--
.../seatunnel/engine/core/job/JobStatus.java | 5 ++++-
.../engine/server/CoordinatorService.java | 19 +++++++++++++---
.../seatunnel/engine/server/SeaTunnelServer.java | 2 +-
6 files changed, 54 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
index aa1bbd5934..e8ee03a501 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java
@@ -66,7 +66,7 @@ public class RetryUtils {
backoff);
Thread.sleep(backoff);
} else {
- log.debug(attemptMessage,
ExceptionUtils.getMessage(e), i, retryTimes, 0);
+ log.info(attemptMessage, ExceptionUtils.getMessage(e),
i, retryTimes, 0);
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 608871dd56..3c677b45f3 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -543,9 +543,11 @@ public class ClusterFaultToleranceTwoPipelineIT {
@Test
public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
throws ExecutionException, InterruptedException {
- String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
+ String testCaseName =
+ "testTwoPipelineBatchJobRestoreIn2NodeMasterDown" +
System.currentTimeMillis();
String testClusterName =
-
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
+
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown"
+ + System.currentTimeMillis();
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
@@ -651,9 +653,11 @@ public class ClusterFaultToleranceTwoPipelineIT {
@Test
public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown()
throws ExecutionException, InterruptedException {
- String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
+ String testCaseName =
+ "testTwoPipelineStreamJobRestoreIn2NodeMasterDown" +
System.currentTimeMillis();
String testClusterName =
-
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
+
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown"
+ + System.currentTimeMillis();
long testRowNumber = 1000;
int testParallelism = 6;
HazelcastInstanceImpl node1 = null;
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index cba498e999..4ecee663ae 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -145,6 +145,28 @@ public class JobExecutionIT {
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
}
+ @Test
+ public void testGetUnKnownJobID() {
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+
+ ClientJobProxy newClientJobProxy =
+
engineClient.createJobClient().getJobProxy(System.currentTimeMillis());
+ CompletableFuture<JobStatus> waitForJobCompleteFuture =
+
CompletableFuture.supplyAsync(newClientJobProxy::waitForJobComplete);
+
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.UNKNOWABLE,
waitForJobCompleteFuture.get()));
+
+ Assertions.assertEquals(
+ "UNKNOWABLE",
engineClient.getJobClient().getJobStatus(System.currentTimeMillis()));
+ }
+
@Test
public void testExpiredJobWasDeleted() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
@@ -164,8 +186,8 @@ public class JobExecutionIT {
await().atMost(65, TimeUnit.SECONDS)
.untilAsserted(
() ->
- Assertions.assertThrowsExactly(
- NullPointerException.class,
clientJobProxy::getJobStatus));
+ Assertions.assertEquals(
+ JobStatus.UNKNOWABLE,
clientJobProxy.getJobStatus()));
}
@AfterEach
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
index f9dbfb4c6c..7c50744dba 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
@@ -60,7 +60,10 @@ public enum JobStatus {
SUSPENDED(EndState.LOCALLY),
/** The job is currently reconciling and waits for task execution report
to recover state. */
- RECONCILING(EndState.NOT_END);
+ RECONCILING(EndState.NOT_END),
+
+ /** Cannot find the JobID or the job status has already been cleared. */
+ UNKNOWABLE(EndState.GLOBALLY);
//
--------------------------------------------------------------------------------------------
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5293fe8bf9..89a2258ce2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -505,9 +505,22 @@ public class CoordinatorService {
public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
- JobHistoryService.JobState jobState =
jobHistoryService.getJobDetailState(jobId);
+ // Because operations on Imap cannot be performed within Operation.
+ CompletableFuture<JobHistoryService.JobState> jobStateFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ return
jobHistoryService.getJobDetailState(jobId);
+ },
+ executorService);
+ JobHistoryService.JobState jobState = null;
+ try {
+ jobState = jobStateFuture.get();
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException("get job state error", e);
+ }
+
CompletableFuture<JobResult> future = new CompletableFuture<>();
- if (jobState == null) future.complete(new
JobResult(JobStatus.FAILED, null));
+ if (jobState == null) future.complete(new
JobResult(JobStatus.UNKNOWABLE, null));
else
future.complete(new JobResult(jobState.getJobStatus(),
jobState.getErrorMessage()));
return new PassiveCompletableFuture<>(future);
@@ -537,7 +550,7 @@ public class CoordinatorService {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
JobHistoryService.JobState jobDetailState =
jobHistoryService.getJobDetailState(jobId);
- return null == jobDetailState ? null :
jobDetailState.getJobStatus();
+ return null == jobDetailState ? JobStatus.UNKNOWABLE :
jobDetailState.getJobStatus();
}
return runningJobMaster.getJobStatus();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 56b0e5ec00..88ee1afc9d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -230,7 +230,7 @@ public class SeaTunnelServer
// must retry until the cluster have master node
try {
return RetryUtils.retryWithException(
- () ->
nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress()),
+ () ->
nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,