EricJoy2048 commented on code in PR #2527:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2527#discussion_r956529234


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -134,7 +134,7 @@ public void run() {
     }
 
     public void cleanJob() {

Review Comment:
   done



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -167,4 +167,27 @@ public PassiveCompletableFuture<JobStatus> 
waitForJobComplete(long jobId) {
             return runningJobMaster.getJobMasterCompleteFuture();
         }
     }
+
+    public PassiveCompletableFuture<Void> cancelJob(long jodId) {
+        JobMaster runningJobMaster = runningJobMasterMap.get(jodId);
+        if (runningJobMaster == null) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.complete(null);
+            return new PassiveCompletableFuture<>(future);
+        } else {
+            return new 
PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
+                runningJobMaster.cleanJob();

Review Comment:
   done



##########
seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java:
##########
@@ -67,20 +67,51 @@ public void testSayHello() {
     public void testExecuteJob() {
         TestUtils.initPluginDir();
         Common.setDeployMode(DeployMode.CLIENT);
-        String filePath = 
TestUtils.getResource("/fakesource_to_file_complex.conf");
+        String filePath = 
TestUtils.getResource("/batch_fakesource_to_file.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
         JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
 
-        JobProxy jobProxy = null;
+        ClientJobProxy clientJobProxy = null;
         try {
-            jobProxy = jobExecutionEnv.execute();
-            JobStatus jobStatus = jobProxy.waitForJobComplete();
+            clientJobProxy = jobExecutionEnv.execute();
+            JobStatus jobStatus = clientJobProxy.waitForJobComplete();
             Assert.assertEquals(JobStatus.FINISHED, jobStatus);
-        } catch (ExecutionException | InterruptedException e) {
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void cancelJobTest() {
+        TestUtils.initPluginDir();
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_file");
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv = 
engineClient.createExecutionContext(filePath, jobConfig);
+
+        ClientJobProxy clientJobProxy = null;
+        try {
+            clientJobProxy = jobExecutionEnv.execute();
+            JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+            Assert.assertFalse(jobStatus1.isEndState());
+            ClientJobProxy finalClientJobProxy = clientJobProxy;
+            CompletableFuture<Object> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                JobStatus jobStatus = finalClientJobProxy.waitForJobComplete();
+                Assert.assertEquals(JobStatus.CANCELED, jobStatus);
+                return null;
+            });
+            Thread.sleep(500);
+            clientJobProxy.cancelJob();
+            objectCompletableFuture.join();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to