This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 25f9590a99 fixed zeta ci error (#5254)
25f9590a99 is described below

commit 25f9590a99ba3ffdc4a7fb1edc199501158160aa
Author: Xiaojian Sun <[email protected]>
AuthorDate: Thu Aug 10 14:17:32 2023 +0800

    fixed zeta ci error (#5254)
---
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 44 ++++++++++++++++++----
 1 file changed, 36 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index f8891da740..5fa521eb50 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -19,16 +19,19 @@ package org.apache.seatunnel.engine.e2e;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -42,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.awaitility.Awaitility.await;
+
 @Slf4j
 public class JobExecutionIT {
 
@@ -86,8 +91,7 @@ public class JobExecutionIT {
                             return clientJobProxy.waitForJobComplete();
                         });
 
-        Awaitility.await()
-                .atMost(600000, TimeUnit.MILLISECONDS)
+        await().atMost(600000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () ->
                                 Assertions.assertTrue(
@@ -121,8 +125,7 @@ public class JobExecutionIT {
         Thread.sleep(1000);
         clientJobProxy.cancelJob();
 
-        Awaitility.await()
-                .atMost(20000, TimeUnit.MILLISECONDS)
+        await().atMost(20000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () ->
                                 Assertions.assertTrue(
@@ -145,11 +148,36 @@ public class JobExecutionIT {
         final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
         JobStatus jobStatus = clientJobProxy.getJobStatus();
         while (jobStatus == JobStatus.RUNNING) {
-            Thread.sleep(1 * 1000L);
+            Thread.sleep(1000L);
             jobStatus = clientJobProxy.getJobStatus();
         }
-        CompletableFuture<JobResult> future = 
clientJobProxy.doWaitForJobComplete();
-        JobResult result = future.get();
+
+        CompletableFuture<JobResult> completableFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                return RetryUtils.retryWithException(
+                                        () -> {
+                                            
PassiveCompletableFuture<JobResult> jobFuture =
+                                                    
clientJobProxy.doWaitForJobComplete();
+                                            return jobFuture.get();
+                                        },
+                                        new RetryUtils.RetryMaterial(
+                                                100000,
+                                                true,
+                                                exception ->
+                                                        
ExceptionUtil.isOperationNeedRetryException(
+                                                                exception),
+                                                
Constant.OPERATION_RETRY_SLEEP));
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> 
Assertions.assertTrue(completableFuture.isDone()));
+
+        JobResult result = completableFuture.get();
         Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
         
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
     }

Reply via email to