This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new cfe476043 [Feature][Zeta] Close task when job cancel (#4189)
cfe476043 is described below
commit cfe476043cc6e124bc4944559445a38747efb55a
Author: hailin0 <[email protected]>
AuthorDate: Thu Feb 23 10:20:12 2023 +0800
[Feature][Zeta] Close task when job cancel (#4189)
---
.../seatunnel/engine/server/TaskExecutionService.java | 6 ++++++
.../apache/seatunnel/engine/server/task/SeaTunnelTask.java | 14 ++++++++++----
2 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 688a38c69..dfb9c9a82 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -70,6 +70,7 @@ import lombok.NonNull;
import lombok.SneakyThrows;
import org.apache.commons.collections4.CollectionUtils;
+import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
@@ -410,6 +411,11 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
taskGroupExecutionTracker.exception(e);
} finally {
taskGroupExecutionTracker.taskDone(t);
+ try {
+ tracker.task.close();
+ } catch (IOException e) {
+ logger.severe("Close task error", e);
+ }
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 11691432d..0014eb9f8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -69,8 +69,7 @@ import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import lombok.NonNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.URL;
@@ -85,8 +84,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+@Slf4j
public abstract class SeaTunnelTask extends AbstractTask {
- private static final Logger LOG =
LoggerFactory.getLogger(SeaTunnelTask.class);
private static final long serialVersionUID = 2604309561613784425L;
protected volatile SeaTunnelTaskState currState;
@@ -278,7 +277,14 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public void close() throws IOException {
- allCycles.parallelStream().forEach(cycle -> sneaky(cycle::close));
+ allCycles.parallelStream()
+ .forEach(flowLifeCycle -> {
+ try {
+ flowLifeCycle.close();
+ } catch (IOException e) {
+ log.error("Close FlowLifeCycle error.", e);
+ }
+ });
}
public void ack(Barrier barrier) {