This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 25f9590a99 fixed zeta ci error (#5254)
25f9590a99 is described below
commit 25f9590a99ba3ffdc4a7fb1edc199501158160aa
Author: Xiaojian Sun <[email protected]>
AuthorDate: Thu Aug 10 14:17:32 2023 +0800
fixed zeta ci error (#5254)
---
.../seatunnel/engine/e2e/JobExecutionIT.java | 44 ++++++++++++++++++----
1 file changed, 36 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index f8891da740..5fa521eb50 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -19,16 +19,19 @@ package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -42,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
+
@Slf4j
public class JobExecutionIT {
@@ -86,8 +91,7 @@ public class JobExecutionIT {
return clientJobProxy.waitForJobComplete();
});
- Awaitility.await()
- .atMost(600000, TimeUnit.MILLISECONDS)
+ await().atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
@@ -121,8 +125,7 @@ public class JobExecutionIT {
Thread.sleep(1000);
clientJobProxy.cancelJob();
- Awaitility.await()
- .atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
@@ -145,11 +148,36 @@ public class JobExecutionIT {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
JobStatus jobStatus = clientJobProxy.getJobStatus();
while (jobStatus == JobStatus.RUNNING) {
- Thread.sleep(1 * 1000L);
+ Thread.sleep(1000L);
jobStatus = clientJobProxy.getJobStatus();
}
- CompletableFuture<JobResult> future =
clientJobProxy.doWaitForJobComplete();
- JobResult result = future.get();
+
+ CompletableFuture<JobResult> completableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return RetryUtils.retryWithException(
+ () -> {
+
PassiveCompletableFuture<JobResult> jobFuture =
+
clientJobProxy.doWaitForJobComplete();
+ return jobFuture.get();
+ },
+ new RetryUtils.RetryMaterial(
+ 100000,
+ true,
+ exception ->
+
ExceptionUtil.isOperationNeedRetryException(
+ exception),
+
Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(completableFuture.isDone()));
+
+ JobResult result = completableFuture.get();
Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
}