This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new cfe476043 [Feature][Zeta] Close task when job cancel (#4189)
cfe476043 is described below

commit cfe476043cc6e124bc4944559445a38747efb55a
Author: hailin0 <[email protected]>
AuthorDate: Thu Feb 23 10:20:12 2023 +0800

    [Feature][Zeta] Close task when job cancel (#4189)
---
 .../seatunnel/engine/server/TaskExecutionService.java      |  6 ++++++
 .../apache/seatunnel/engine/server/task/SeaTunnelTask.java | 14 ++++++++++----
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 688a38c69..dfb9c9a82 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -70,6 +70,7 @@ import lombok.NonNull;
 import lombok.SneakyThrows;
 import org.apache.commons.collections4.CollectionUtils;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.HashMap;
@@ -410,6 +411,11 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                 taskGroupExecutionTracker.exception(e);
             } finally {
                 taskGroupExecutionTracker.taskDone(t);
+                try {
+                    tracker.task.close();
+                } catch (IOException e) {
+                    logger.severe("Close task error", e);
+                }
             }
             Thread.currentThread().setContextClassLoader(oldClassLoader);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 11691432d..0014eb9f8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -69,8 +69,7 @@ import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.internal.metrics.MetricDescriptor;
 import com.hazelcast.internal.metrics.MetricsCollectionContext;
 import lombok.NonNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.net.URL;
@@ -85,8 +84,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+@Slf4j
 public abstract class SeaTunnelTask extends AbstractTask {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SeaTunnelTask.class);
     private static final long serialVersionUID = 2604309561613784425L;
 
     protected volatile SeaTunnelTaskState currState;
@@ -278,7 +277,14 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     @Override
     public void close() throws IOException {
-        allCycles.parallelStream().forEach(cycle -> sneaky(cycle::close));
+        allCycles.parallelStream()
+                .forEach(flowLifeCycle -> {
+                    try {
+                        flowLifeCycle.close();
+                    } catch (IOException e) {
+                        log.error("Close FlowLifeCycle error.", e);
+                    }
+                });
     }
 
     public void ack(Barrier barrier) {

Reply via email to