This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a80a6a727 [feature][ST-Engine] Add SeaTunnel client cancelJob method 
(#3661)
a80a6a727 is described below

commit a80a6a727fe947d9cd76397c9f360dfadf61c6be
Author: ic4y <[email protected]>
AuthorDate: Wed Dec 7 16:13:07 2022 +0800

    [feature][ST-Engine] Add SeaTunnel client cancelJob method (#3661)
---
 .../seatunnel/engine/client/SeaTunnelClient.java   | 11 ++++-
 .../engine/client/SeaTunnelClientTest.java         | 22 ++++++++++
 .../test/resources/streaming_fake_to_console.conf  | 48 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 438a74f46..691f25c6f 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -20,8 +20,10 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
@@ -86,7 +88,6 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
 
     /**
      * list all jobId and job status
-     *
      */
     public String listJobStatus() {
         return hazelcastClient.requestOnMasterAndDecodeResponse(
@@ -97,7 +98,6 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
 
     /**
      * get one job status
-     *
      * @param jobId jobId
      */
     public String getJobStatus(Long jobId) {
@@ -114,6 +114,13 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
         );
     }
 
+    public void cancelJob(Long jobId) {
+        PassiveCompletableFuture<Void> cancelFuture = 
hazelcastClient.requestOnMasterAndGetCompletableFuture(
+            SeaTunnelCancelJobCodec.encodeRequest(jobId));
+
+        cancelFuture.join();
+    }
+
     public JobDAGInfo getJobInfo(Long jobId) {
         return 
hazelcastClient.getSerializationService().toObject(hazelcastClient.requestOnMasterAndDecodeResponse(
             SeaTunnelGetJobInfoCodec.encodeRequest(jobId),
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index e94d697be..f9114419a 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -167,6 +167,28 @@ public class SeaTunnelClientTest {
         }
     }
 
+    @Test
+    public void testCancelJob() throws ExecutionException, 
InterruptedException {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("/streaming_fake_to_console.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("streaming_fake_to_console");
+
+        JobExecutionEnvironment jobExecutionEnv = 
CLIENT.createExecutionContext(filePath, jobConfig);
+
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+        long jobId = clientJobProxy.getJobId();
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals("RUNNING", 
CLIENT.getJobStatus(jobId)));
+
+        CLIENT.cancelJob(jobId);
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+            .untilAsserted(() -> Assertions.assertEquals("CANCELED", 
CLIENT.getJobStatus(jobId)));
+    }
+
     @Test
     public void testGetJobInfo() {
         Common.setDeployMode(DeployMode.CLIENT);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/streaming_fake_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/streaming_fake_to_console.conf
new file mode 100644
index 000000000..91ff60a9e
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/streaming_fake_to_console.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
\ No newline at end of file

Reply via email to