This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1f2f3fc5f0 [Test][E2E] Add thread leak check for connector (#5773)
1f2f3fc5f0 is described below

commit 1f2f3fc5f0925f755a4de2a3f1b8e590bc1a88be
Author: Jia Fan <[email protected]>
AuthorDate: Thu Feb 1 10:51:41 2024 +0800

    [Test][E2E] Add thread leak check for connector (#5773)
---
 .../file/hadoop/HadoopFileSystemProxy.java         |   6 +-
 .../seatunnel/file/sink/BaseFileSinkWriter.java    |   6 +-
 .../file/sink/BaseMultipleTableFileSink.java       |  20 +--
 .../file/sink/writer/AbstractWriteStrategy.java    |  10 ++
 .../seatunnel/file/sink/writer/WriteStrategy.java  |   3 +-
 .../file/source/BaseFileSourceReader.java          |   4 +-
 .../file/source/reader/AbstractReadStrategy.java   |  10 ++
 .../reader/MultipleTableFileSourceReader.java      |   3 +
 .../seatunnel/file/source/reader/ReadStrategy.java |   3 +-
 .../seatunnel/file/s3/catalog/S3FileCatalog.java   |  11 +-
 .../seatunnel/file/sftp/system/SFTPFileSystem.java |   6 +
 .../source/enumerator/AbstractSplitEnumerator.java |   3 +
 .../iceberg/source/reader/IcebergSourceReader.java |   3 +
 .../influxdb/sink/InfluxDBSinkWriter.java          |   3 +
 .../source/InfluxDBSourceSplitEnumerator.java      |   4 +-
 .../influxdb/source/InfluxdbSourceReader.java      |   3 +
 .../rocketmq/sink/RocketMqSinkWriter.java          |   3 +
 .../rocketmq/source/RocketMqSourceReader.java      |   3 +
 .../source/RocketMqSourceSplitEnumerator.java      |   3 +
 .../container/seatunnel/SeaTunnelContainer.java    | 166 ++++++++++++++++++++-
 .../seatunnel/e2e/common/util/ContainerUtil.java   |  65 ++++++++
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   |  37 ++---
 .../seatunnel/engine/e2e/JobClientJobProxyIT.java  |   2 +
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java |  19 +++
 .../src/test/resources/jvm_options                 |   3 +
 .../src/test/resources/log4j2.properties           |   4 +-
 .../engine/common/config/SeaTunnelConfig.java      |   1 +
 .../engine/common/loader/ClassLoaderUtil.java      |  35 +++++
 .../core/parse/MultipleTableJobConfigParser.java   |  56 +++----
 .../seatunnel/engine/server/SeaTunnelServer.java   |   6 +
 .../engine/server/TaskExecutionService.java        |  13 +-
 .../server/checkpoint/CheckpointCoordinator.java   |   2 -
 .../seatunnel/engine/server/master/JobMaster.java  |  12 +-
 .../seatunnel/engine/server/rest/RestConstant.java |   3 +
 .../server/rest/RestHttpGetCommandProcessor.java   |  21 +++
 35 files changed, 472 insertions(+), 80 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index a43baa7c31..61f4520e4b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -230,10 +230,14 @@ public class HadoopFileSystemProxy implements 
Serializable, Closeable {
 
     @Override
     public void close() throws IOException {
-        try (FileSystem closedFileSystem = fileSystem) {
+        try {
             if (userGroupInformation != null && enableKerberos()) {
                 userGroupInformation.logoutUserFromKeytab();
             }
+        } finally {
+            if (fileSystem != null) {
+                fileSystem.close();
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 7d14de2fff..2527d99d56 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -153,5 +153,9 @@ public class BaseFileSinkWriter
     }
 
     @Override
-    public void close() throws IOException {}
+    public void close() throws IOException {
+        if (writeStrategy != null) {
+            writeStrategy.close();
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index fee66ad467..6beb62d7e8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
@@ -46,9 +45,8 @@ public abstract class BaseMultipleTableFileSink
                 SupportMultiTableSink {
 
     private final HadoopConf hadoopConf;
-    private final HadoopFileSystemProxy hadoopFileSystemProxy;
+    private final CatalogTable catalogTable;
     private final FileSinkConfig fileSinkConfig;
-    private final WriteStrategy writeStrategy;
     private String jobId;
 
     public abstract String getPluginName();
@@ -58,10 +56,7 @@ public abstract class BaseMultipleTableFileSink
         this.hadoopConf = hadoopConf;
         this.fileSinkConfig =
                 new FileSinkConfig(readonlyConfig.toConfig(), 
catalogTable.getSeaTunnelRowType());
-        this.writeStrategy =
-                WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
-        this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
-        
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -72,7 +67,7 @@ public abstract class BaseMultipleTableFileSink
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
             SinkWriter.Context context, List<FileSinkState> states) {
-        return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
+        return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, 
context, jobId, states);
     }
 
     @Override
@@ -84,7 +79,7 @@ public abstract class BaseMultipleTableFileSink
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(
             SinkWriter.Context context) {
-        return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId);
+        return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, 
context, jobId);
     }
 
     @Override
@@ -101,4 +96,11 @@ public abstract class BaseMultipleTableFileSink
     public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
+
+    protected WriteStrategy createWriteStrategy() {
+        WriteStrategy writeStrategy =
+                WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
+        
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+        return writeStrategy;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 7f30e723ae..ab88b2256d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -414,4 +414,14 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     public HadoopFileSystemProxy getHadoopFileSystemProxy() {
         return hadoopFileSystemProxy;
     }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (hadoopFileSystemProxy != null) {
+                hadoopFileSystemProxy.close();
+            }
+        } catch (Exception ignore) {
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index a2fb5c1510..6a1b1840b4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -26,11 +26,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.io.Closeable;
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.List;
 
-public interface WriteStrategy extends Transaction, Serializable {
+public interface WriteStrategy extends Transaction, Serializable, Closeable {
     /**
      * init hadoop conf
      *
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index 37b525df67..1119ae9bf3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -48,7 +48,9 @@ public class BaseFileSourceReader implements 
SourceReader<SeaTunnelRow, FileSour
     public void open() throws Exception {}
 
     @Override
-    public void close() throws IOException {}
+    public void close() throws IOException {
+        readStrategy.close();
+    }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 071414c9da..33b688af06 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -201,4 +201,14 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         }
         return true;
     }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (hadoopFileSystemProxy != null) {
+                hadoopFileSystemProxy.close();
+            }
+        } catch (Exception ignore) {
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
index 66775dd6d7..661a466e49 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
@@ -123,5 +123,8 @@ public class MultipleTableFileSourceReader implements 
SourceReader<SeaTunnelRow,
     public void close() throws IOException {
         // do nothing
         log.info("Closed the MultipleTableLocalFileSourceReader");
+        for (ReadStrategy strategy : readStrategyMap.values()) {
+            strategy.close();
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 462d58b076..a269594e1f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -25,11 +25,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-public interface ReadStrategy extends Serializable {
+public interface ReadStrategy extends Serializable, Closeable {
     void init(HadoopConf conf);
 
     void read(String path, String tableId, Collector<SeaTunnelRow> output)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
index 4a527e2a60..0f48a2c4ae 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import lombok.AllArgsConstructor;
 import lombok.SneakyThrows;
 
+import java.io.IOException;
 import java.util.List;
 
 @AllArgsConstructor
@@ -47,7 +48,15 @@ public class S3FileCatalog implements Catalog {
     public void open() throws CatalogException {}
 
     @Override
-    public void close() throws CatalogException {}
+    public void close() throws CatalogException {
+        if (hadoopFileSystemProxy != null) {
+            try {
+                hadoopFileSystemProxy.close();
+            } catch (IOException e) {
+                throw new CatalogException(e);
+            }
+        }
+    }
 
     @Override
     public String name() {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
index 555fb5bf9d..f49145bc4c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
@@ -617,4 +617,10 @@ public class SFTPFileSystem extends FileSystem {
             disconnect(channel);
         }
     }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        connectionPool.shutdown();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index 26a971cc2e..03785b40da 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
 
 import org.apache.iceberg.Table;
+import org.apache.iceberg.util.ThreadPools;
 
 import lombok.Getter;
 import lombok.NonNull;
@@ -74,6 +75,8 @@ public abstract class AbstractSplitEnumerator
     public void close() throws IOException {
         icebergTableLoader.close();
         isOpen = false;
+        // TODO we should remove shutdown logic when supported closed part task
+        ThreadPools.getWorkerPool().shutdownNow();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
index 2242704860..a12fc79b51 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFil
 
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.util.ThreadPools;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -95,6 +96,8 @@ public class IcebergSourceReader implements 
SourceReader<SeaTunnelRow, IcebergFi
             icebergFileScanTaskSplitReader.close();
         }
         icebergTableLoader.close();
+        // TODO we should remove shutdown logic when supported closed part task
+        ThreadPools.getWorkerPool().shutdownNow();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
index f2d401db51..c97ab6c2c6 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -36,6 +36,7 @@ import org.influxdb.dto.Point;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import okhttp3.internal.concurrent.TaskRunner;
 
 import java.io.IOException;
 import java.net.ConnectException;
@@ -88,6 +89,8 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         if (influxdb != null) {
             influxdb.close();
             influxdb = null;
+            // TODO we should remove shutdown logic when supported closed part 
task
+            ((TaskRunner.RealBackend) 
TaskRunner.INSTANCE.getBackend()).shutdown();
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index be69fac9c0..d810a2192f 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceSt
 import org.apache.commons.lang3.tuple.Pair;
 
 import lombok.extern.slf4j.Slf4j;
+import okhttp3.internal.concurrent.TaskRunner;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -220,7 +221,8 @@ public class InfluxDBSourceSplitEnumerator
 
     @Override
     public void close() {
-        // nothing to do
+        // TODO we should remove shutdown logic when supported closed part task
+        ((TaskRunner.RealBackend) TaskRunner.INSTANCE.getBackend()).shutdown();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
index 16eec22839..184ec3ecaf 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
@@ -35,6 +35,7 @@ import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 
 import lombok.extern.slf4j.Slf4j;
+import okhttp3.internal.concurrent.TaskRunner;
 
 import java.net.ConnectException;
 import java.util.ArrayList;
@@ -93,6 +94,8 @@ public class InfluxdbSourceReader implements 
SourceReader<SeaTunnelRow, InfluxDB
         if (influxdb != null) {
             influxdb.close();
             influxdb = null;
+            // TODO we should remove shutdown logic when supported closed part 
task
+            ((TaskRunner.RealBackend) 
TaskRunner.INSTANCE.getBackend()).shutdown();
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
index a02887fa98..b8cbaf70da 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
@@ -49,6 +49,9 @@ public class RocketMqSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                     new RocketMqNoTransactionSender(
                             producerMetadata.getConfiguration(), 
producerMetadata.isSync());
         }
+        // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
+        // `AsyncAppender-Dispatcher-Thread`
+        System.setProperty("rocketmq.client.logUseSlf4j", "true");
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 42c3788e81..fd4f986072 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -75,6 +75,9 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
         this.executorService =
                 Executors.newCachedThreadPool(r -> new Thread(r, "RocketMq 
Source Data Consumer"));
         pendingPartitionsQueue = new LinkedBlockingQueue<>();
+        // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
+        // `AsyncAppender-Dispatcher-Thread`
+        System.setProperty("rocketmq.client.logUseSlf4j", "true");
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 6630d495f9..45ded447f2 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -67,6 +67,9 @@ public class RocketMqSourceSplitEnumerator
         this.context = context;
         this.assignedSplit = new HashMap<>();
         this.pendingSplit = new HashMap<>();
+        // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
+        // `AsyncAppender-Dispatcher-Thread`
+        System.setProperty("rocketmq.client.logUseSlf4j", "true");
     }
 
     public RocketMqSourceSplitEnumerator(
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index dd9b14f1ea..8a98a50e20 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -23,6 +23,14 @@ import 
org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -30,15 +38,22 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerLoggerFactory;
 import org.testcontainers.utility.MountableFile;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.service.AutoService;
+import groovy.lang.Tuple2;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.nio.file.Paths;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
 
@@ -46,10 +61,12 @@ import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PA
 @Slf4j
 @AutoService(TestContainer.class)
 public class SeaTunnelContainer extends AbstractTestContainer {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final String JDK_DOCKER_IMAGE = "openjdk:8";
     private static final String CLIENT_SHELL = "seatunnel.sh";
     private static final String SERVER_SHELL = "seatunnel-cluster.sh";
     protected GenericContainer<?> server;
+    private final AtomicInteger runningCount = new AtomicInteger();
 
     @Override
     public void startUp() throws Exception {
@@ -68,7 +85,7 @@ public class SeaTunnelContainer extends AbstractTestContainer 
{
                                                 "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
                         .waitingFor(Wait.forListeningPort());
         copySeaTunnelStarterToContainer(server);
-        server.setExposedPorts(Arrays.asList(5801));
+        server.setPortBindings(Collections.singletonList("5801:5801"));
         server.withCopyFileToContainer(
                 MountableFile.forHostPath(
                         PROJECT_ROOT_PATH
@@ -152,7 +169,145 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
     public Container.ExecResult executeJob(String confFile)
             throws IOException, InterruptedException {
         log.info("test in container: {}", identifier());
-        return executeJob(server, confFile);
+        List<String> beforeThreads = ContainerUtil.getJVMThreadNames(server);
+        runningCount.incrementAndGet();
+        Container.ExecResult result = executeJob(server, confFile);
+        if (runningCount.decrementAndGet() > 0) {
+            // only check thread when job all finished.
+            return result;
+        }
+        List<String> afterThreads = ContainerUtil.getJVMThreadNames(server);
+        afterThreads = removeSystemThread(beforeThreads, afterThreads);
+        if (afterThreads.isEmpty()) {
+            //            classLoaderObjectCheck(1);
+            return result;
+        } else {
+            // Waiting 10s for release thread
+            Awaitility.await()
+                    .atMost(10, TimeUnit.SECONDS)
+                    .untilAsserted(
+                            () -> {
+                                List<String> threads = 
ContainerUtil.getJVMThreadNames(server);
+                                threads = removeSystemThread(beforeThreads, 
threads);
+                                List<String> finalAfterThreads = threads;
+                                Assertions.assertTrue(
+                                        threads.isEmpty(),
+                                        "There are still threads running in 
the container: \n"
+                                                + 
ContainerUtil.getJVMThreads(server).stream()
+                                                        .filter(
+                                                                tuple2 ->
+                                                                        
finalAfterThreads.contains(
+                                                                               
 tuple2.getV1()))
+                                                        .map(Tuple2::getV2)
+                                                        .map(str -> str + "\n")
+                                                        
.collect(Collectors.joining()));
+                            });
+        }
+        //        classLoaderObjectCheck(1);
+        return result;
+    }
+
+    private List<String> removeSystemThread(List<String> beforeThreads, 
List<String> afterThreads)
+            throws IOException {
+        Pattern aqsThread = Pattern.compile("pool-[0-9]-thread-[0-9]");
+        afterThreads.removeIf(
+                s ->
+                        s.startsWith("hz.main")
+                                || 
s.startsWith("seatunnel-coordinator-service")
+                                || s.startsWith("GC task thread")
+                                || s.contains("CompilerThread")
+                                || 
s.contains("NioNetworking-closeListenerExecutor")
+                                || s.contains("ForkJoinPool.commonPool")
+                                || s.contains("DestroyJavaVM")
+                                || s.contains("main-query-state-checker")
+                                || s.contains("Keep-Alive-SocketCleaner")
+                                || s.contains("process reaper")
+                                || s.startsWith("Timer-")
+                                || s.contains("InterruptTimer")
+                                || s.contains("Java2D Disposer")
+                                || s.contains(
+                                        
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
+                                || s.startsWith("Log4j2-TF-")
+                                || aqsThread.matcher(s).matches());
+        afterThreads.removeIf(beforeThreads::contains);
+        Map<String, String> threadAndClassLoader = getThreadClassLoader();
+        List<String> notSystemClassLoaderThread =
+                threadAndClassLoader.entrySet().stream()
+                        .filter(
+                                tc -> {
+                                    // system thread, ttl 60s
+                                    if (tc.getKey().contains("process 
reaper")) {
+                                        return false;
+                                    }
+                                    String classLoader = tc.getValue();
+                                    return 
!classLoader.contains("AppClassLoader")
+                                            && !classLoader.equals("null");
+                                })
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+        notSystemClassLoaderThread.addAll(afterThreads);
+        notSystemClassLoaderThread.removeIf(this::isIssueWeAlreadyKnow);
+        return notSystemClassLoaderThread;
+    }
+
+    private void classLoaderObjectCheck(Integer maxSize) throws IOException, 
InterruptedException {
+        Map<String, Integer> objects = ContainerUtil.getJVMLiveObject(server);
+        String className =
+                
"org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader";
+        if (objects.containsKey(className) && objects.get(className) > 
maxSize) {
+            Awaitility.await()
+                    .atMost(20, TimeUnit.SECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Map<String, Integer> newObjects =
+                                        ContainerUtil.getJVMLiveObject(server);
+                                if (newObjects.containsKey(className)) {
+                                    Assertions.assertTrue(
+                                            newObjects.get(className) <= 
maxSize,
+                                            "There are still 
SeaTunnelChildFirstClassLoader objects in the seatunnel server");
+                                }
+                            });
+        }
+    }
+
+    private Map<String, String> getThreadClassLoader() throws IOException {
+        HttpGet get = new 
HttpGet("http://localhost:5801/hazelcast/rest/maps/running-threads";);
+        try (CloseableHttpClient client = HttpClients.createDefault()) {
+            CloseableHttpResponse response = client.execute(get);
+            String threads = EntityUtils.toString(response.getEntity());
+            List<Map<String, String>> value =
+                    OBJECT_MAPPER.readValue(
+                            threads, new TypeReference<List<Map<String, 
String>>>() {});
+            return value.stream()
+                    .collect(
+                            Collectors.toMap(
+                                    map -> map.get("threadName"),
+                                    map -> map.get("classLoader"),
+                                    (a, b) -> a + " && " + b));
+        }
+    }
+
+    /** The thread should be recycled but not, we should fix it in the future. 
*/
+    private boolean isIssueWeAlreadyKnow(String threadName) {
+        // ClickHouse com.clickhouse.client.ClickHouseClientBuilder
+        return threadName.startsWith("ClickHouseClientWorker")
+                // InfluxDB okio.AsyncTimeout$Watchdog
+                || threadName.startsWith("Okio Watchdog")
+                // InfluxDB okhttp3.internal.concurrent.TaskRunner.RealBackend
+                || threadName.startsWith("OkHttp TaskRunner")
+                // IOTDB org.apache.iotdb.session.Session
+                || threadName.startsWith("SessionExecutor")
+                // Oracle Driver
+                // 
oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser
+                || threadName.contains(
+                        
"oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser")
+                // RocketMQ
+                // 
org.apache.rocketmq.logging.inner.LoggingBuilder$AsyncAppender$Dispatcher
+                || threadName.startsWith("AsyncAppender-Dispatcher-Thread")
+                // MongoDB
+                || threadName.startsWith("BufferPoolPruner")
+                || threadName.startsWith("MaintenanceTimer")
+                || threadName.startsWith("cluster-");
     }
 
     @Override
@@ -164,7 +319,10 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
     @Override
     public Container.ExecResult restoreJob(String confFile, String jobId)
             throws IOException, InterruptedException {
-        return restoreJob(server, confFile, jobId);
+        runningCount.incrementAndGet();
+        Container.ExecResult result = restoreJob(server, confFile, jobId);
+        runningCount.decrementAndGet();
+        return result;
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 722a5c18e3..641c90bf31 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -28,19 +28,24 @@ import 
org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.commons.lang3.StringUtils;
 
 import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.utility.MountableFile;
 
+import groovy.lang.Tuple2;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.Set;
@@ -256,4 +261,64 @@ public final class ContainerUtil {
             Path path, String targetPath, GenericContainer<?> container) {
         container.copyFileToContainer(MountableFile.forHostPath(path), 
targetPath);
     }
+
+    public static List<String> getJVMThreadNames(GenericContainer<?> container)
+            throws IOException, InterruptedException {
+        return 
getJVMThreads(container).stream().map(Tuple2::getV1).collect(Collectors.toList());
+    }
+
+    public static Map<String, Integer> getJVMLiveObject(GenericContainer<?> 
container)
+            throws IOException, InterruptedException {
+        Container.ExecResult liveObjects =
+                container.execInContainer("jmap", "-histo:live", 
getJVMProcessId(container));
+        Assertions.assertEquals(0, liveObjects.getExitCode());
+        String value = liveObjects.getStdout().trim();
+        return Arrays.stream(value.split("\n"))
+                .skip(2)
+                .map(
+                        str ->
+                                Arrays.stream(str.split(" "))
+                                        .filter(StringUtils::isNotEmpty)
+                                        .collect(Collectors.toList()))
+                .filter(list -> list.size() == 4)
+                .collect(
+                        Collectors.toMap(
+                                list -> list.get(3),
+                                list -> Integer.valueOf(list.get(1)),
+                                (a, b) -> a));
+    }
+
+    public static List<Tuple2<String, String>> 
getJVMThreads(GenericContainer<?> container)
+            throws IOException, InterruptedException {
+        Container.ExecResult threads =
+                container.execInContainer("jstack", 
getJVMProcessId(container));
+        Assertions.assertEquals(0, threads.getExitCode());
+        // Thread name line example
+        // "hz.main.MetricsRegistry.thread-2" #232 prio=5 os_prio=0 
tid=0x0000ffff3c003000 nid=0x5e
+        // waiting on condition [0x0000ffff6cf3a000]
+        return Arrays.stream(threads.getStdout().trim().split("\n\n"))
+                .filter(s -> s.startsWith("\""))
+                .map(
+                        threadStr ->
+                                new Tuple2<>(
+                                        Arrays.stream(threadStr.split("\n"))
+                                                .filter(s -> 
s.startsWith("\""))
+                                                .map(s -> s.substring(1, 
s.lastIndexOf("\"")))
+                                                .findFirst()
+                                                .get(),
+                                        threadStr))
+                .collect(Collectors.toList());
+    }
+
+    private static String getJVMProcessId(GenericContainer<?> container)
+            throws IOException, InterruptedException {
+        Container.ExecResult processes = container.execInContainer("jps");
+        Assertions.assertEquals(0, processes.getExitCode());
+        Optional<String> server =
+                Arrays.stream(processes.getStdout().trim().split("\n"))
+                        .filter(s -> s.contains("SeaTunnelServer"))
+                        .findFirst();
+        Assertions.assertTrue(server.isPresent());
+        return server.get().trim().split(" ")[0];
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 6e02dd6478..013cfd33de 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -26,6 +26,8 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
 import org.testcontainers.containers.Container;
 
 import lombok.extern.slf4j.Slf4j;
@@ -33,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -41,6 +44,7 @@ import java.util.regex.Pattern;
 import static org.awaitility.Awaitility.await;
 
 @Slf4j
+@DisabledOnJre(value = JRE.JAVA_11, disabledReason = "slf4j jar conflict, we 
should fix it later")
 public class CheckpointEnableIT extends TestSuiteBase {
 
     @TestTemplate
@@ -89,18 +93,19 @@ public class CheckpointEnableIT extends TestSuiteBase {
             disabledReason =
                     "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
     public void testZetaStreamingCheckpointInterval(TestContainer container)
-            throws IOException, InterruptedException {
+            throws IOException, InterruptedException, ExecutionException {
         // start job
-        CompletableFuture.supplyAsync(
-                () -> {
-                    try {
-                        return container.executeJob(
-                                
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
-                    } catch (Exception e) {
-                        log.error("Commit task exception :" + e.getMessage());
-                        throw new RuntimeException(e);
-                    }
-                });
+        CompletableFuture<Container.ExecResult> startFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                return container.executeJob(
+                                        
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
+                            } catch (Exception e) {
+                                log.error("Commit task exception :" + 
e.getMessage());
+                                throw new RuntimeException(e);
+                            }
+                        });
 
         // wait obtain job id
         AtomicReference<String> jobId = new AtomicReference<>();
@@ -121,16 +126,14 @@ public class CheckpointEnableIT extends TestSuiteBase {
         Thread.sleep(15000);
         Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is enabled"));
         Assertions.assertEquals(0, 
container.savepointJob(jobId.get()).getExitCode());
-
+        Assertions.assertEquals(0, startFuture.get().getExitCode());
         // restore job
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
-                        return container
-                                .restoreJob(
-                                        
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
-                                        jobId.get())
-                                .getExitCode();
+                        return container.restoreJob(
+                                
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
+                                jobId.get());
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index ce54ba84c2..9f5ddf8c38 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -34,6 +34,7 @@ import org.testcontainers.utility.MountableFile;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collections;
 
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
 
@@ -59,6 +60,7 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
                         .waitingFor(Wait.forListeningPort());
         copySeaTunnelStarterToContainer(server);
         server.setExposedPorts(Arrays.asList(5801));
+        server.setPortBindings(Collections.singletonList("5801:5801"));
         server.withCopyFileToContainer(
                 MountableFile.forHostPath(
                         PROJECT_ROOT_PATH
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 2d2fc5d96e..c6cb429c0a 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import static io.restassured.RestAssured.given;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
 
 @Slf4j
 public class RestApiIT {
@@ -126,6 +127,24 @@ public class RestApiIT {
                         });
     }
 
+    @Test
+    public void testGetRunningThreads() {
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance ->
+                                given().get(
+                                                HOST
+                                                        + instance.getCluster()
+                                                                
.getLocalMember()
+                                                                .getAddress()
+                                                                .getPort()
+                                                        + 
RestConstant.RUNNING_THREADS)
+                                        .then()
+                                        .statusCode(200)
+                                        .body("[0].threadName", notNullValue())
+                                        .body("[0].classLoader", 
notNullValue()));
+    }
+
     @Test
     public void testSystemMonitoringInformation() {
         Arrays.asList(node2, node1)
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
index 75facf75c0..f7d00c6eaf 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
@@ -22,3 +22,6 @@
 # JVM Dump
 -XX:+HeapDumpOnOutOfMemoryError
 -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
+
+# Only used for test!!! We should make sure soft reference be collected ASAP
+-XX:SoftRefLRUPolicyMSPerMB=1
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
index bfcd94a55a..6b6c6335ec 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
@@ -16,10 +16,10 @@
 # limitations under the License.
 
################################################################################
 
-rootLogger.level = WARN
+rootLogger.level = INFO
 
 logger.zeta.name=org.apache.seatunnel.engine
-logger.zeta.level=WARN
+logger.zeta.level=INFO
 
 # For print job id
 logger.zetaMaster.name=org.apache.seatunnel.engine.server.master
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
index dfbfd2c66b..4c4b7e798e 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -49,6 +49,7 @@ public class SeaTunnelConfig {
         hazelcastConfig
                 .getHotRestartPersistenceConfig()
                 .setBaseDir(new File(seatunnelHome(), 
"recovery").getAbsoluteFile());
+        System.setProperty("hazelcast.compat.classloading.cache.disabled", 
"true");
     }
 
     /**
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java
new file mode 100644
index 0000000000..cb6e0820f0
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.loader;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ClassLoaderUtil {
+
+    public static void recycleClassLoaderFromThread(ClassLoader classLoader) {
+        log.info("recycle classloader " + classLoader);
+        Thread.getAllStackTraces().keySet().stream()
+                .filter(thread -> thread.getContextClassLoader() == 
classLoader)
+                .forEach(
+                        thread -> {
+                            log.info("recycle classloader for thread " + 
thread.getName());
+                            thread.setContextClassLoader(null);
+                        });
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index cc6cb501cd..137da2e0a1 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -164,37 +164,39 @@ public class MultipleTableJobConfigParser {
         if (!commonPluginJars.isEmpty()) {
             connectorJars.addAll(commonPluginJars);
         }
+        ClassLoader parentClassLoader = 
Thread.currentThread().getContextClassLoader();
         ClassLoader classLoader =
-                new SeaTunnelChildFirstClassLoader(
-                        connectorJars, 
Thread.currentThread().getContextClassLoader());
-        Thread.currentThread().setContextClassLoader(classLoader);
-
-        ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, 
sinkConfigs);
-
-        this.fillJobConfig();
-
-        LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap =
-                new LinkedHashMap<>();
-
-        log.info("start generating all sources.");
-        for (int configIndex = 0; configIndex < sourceConfigs.size(); 
configIndex++) {
-            Config sourceConfig = sourceConfigs.get(configIndex);
-            Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
-                    parseSource(configIndex, sourceConfig, classLoader);
-            tableWithActionMap.put(tuple2._1(), tuple2._2());
-        }
+                new SeaTunnelChildFirstClassLoader(connectorJars, 
parentClassLoader);
+        try {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, 
sinkConfigs);
+            this.fillJobConfig();
+            LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap =
+                    new LinkedHashMap<>();
+
+            log.info("start generating all sources.");
+            for (int configIndex = 0; configIndex < sourceConfigs.size(); 
configIndex++) {
+                Config sourceConfig = sourceConfigs.get(configIndex);
+                Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
+                        parseSource(configIndex, sourceConfig, classLoader);
+                tableWithActionMap.put(tuple2._1(), tuple2._2());
+            }
 
-        log.info("start generating all transforms.");
-        parseTransforms(transformConfigs, classLoader, tableWithActionMap);
+            log.info("start generating all transforms.");
+            parseTransforms(transformConfigs, classLoader, tableWithActionMap);
 
-        log.info("start generating all sinks.");
-        List<Action> sinkActions = new ArrayList<>();
-        for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
-            Config sinkConfig = sinkConfigs.get(configIndex);
-            sinkActions.addAll(parseSink(configIndex, sinkConfig, classLoader, 
tableWithActionMap));
+            log.info("start generating all sinks.");
+            List<Action> sinkActions = new ArrayList<>();
+            for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
+                Config sinkConfig = sinkConfigs.get(configIndex);
+                sinkActions.addAll(
+                        parseSink(configIndex, sinkConfig, classLoader, 
tableWithActionMap));
+            }
+            Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
+            return new ImmutablePair<>(sinkActions, factoryUrls);
+        } finally {
+            Thread.currentThread().setContextClassLoader(parentClassLoader);
         }
-        Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
-        return new ImmutablePair<>(sinkActions, factoryUrls);
     }
 
     public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index ebb0edea85..970177ae90 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -27,6 +27,8 @@ import 
org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
+import org.apache.hadoop.fs.FileSystem;
+
 import com.hazelcast.internal.services.ManagedService;
 import com.hazelcast.internal.services.MembershipAwareService;
 import com.hazelcast.internal.services.MembershipServiceEvent;
@@ -113,6 +115,10 @@ public class SeaTunnelServer
                 TimeUnit.SECONDS);
 
         seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) 
engine).getNode());
+
+        // a trick way to fix StatisticsDataReferenceCleaner thread class 
loader leak.
+        // see https://issues.apache.org/jira/browse/HADOOP-19049
+        FileSystem.Statistics statistics = new 
FileSystem.Statistics("SeaTunnel");
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 482c4d6712..2c2b38d928 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
 import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
+import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
 import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
@@ -269,7 +270,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
             Set<ConnectorJarIdentifier> connectorJarIdentifiers =
                     taskImmutableInfo.getConnectorJarIdentifiers();
             Set<URL> jars = taskImmutableInfo.getJars();
-            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+            ClassLoader classLoader;
             if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
                 // Prioritize obtaining the jar package file required for the 
current task execution
                 // from the local, if it does not exist locally, it will be 
downloaded from the
@@ -292,6 +293,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                                 classLoader,
                                 taskImmutableInfo.getGroup());
             } else {
+                classLoader = new SeaTunnelChildFirstClassLoader(emptyList());
                 taskGroup =
                         
nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
             }
@@ -886,8 +888,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                             task.getTaskID(), taskGroupLocation));
             Throwable ex = executionException.get();
             if (completionLatch.decrementAndGet() == 0) {
-                // recycle classloader
-                executionContexts.get(taskGroupLocation).setClassLoader(null);
+                recycleClassLoader(taskGroupLocation);
                 finishedExecutionContexts.put(
                         taskGroupLocation, 
executionContexts.remove(taskGroupLocation));
                 cancellationFutures.remove(taskGroupLocation);
@@ -910,6 +911,12 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
             }
         }
 
+        private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
+            ClassLoader classLoader = 
executionContexts.get(taskGroupLocation).getClassLoader();
+            executionContexts.get(taskGroupLocation).setClassLoader(null);
+            ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
+        }
+
         boolean executionCompletedExceptionally() {
             return executionException.get() != null;
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 9b93bf91d5..d85c23e927 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -174,7 +174,6 @@ public class CheckpointCoordinator {
                         2,
                         runnable -> {
                             Thread thread = new Thread(runnable);
-                            thread.setDaemon(true);
                             thread.setName(
                                     String.format(
                                             "checkpoint-coordinator-%s/%s", 
pipelineId, jobId));
@@ -704,7 +703,6 @@ public class CheckpointCoordinator {
                             2,
                             runnable -> {
                                 Thread thread = new Thread(runnable);
-                                thread.setDaemon(true);
                                 thread.setName(
                                         String.format(
                                                 
"checkpoint-coordinator-%s/%s", pipelineId, jobId));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d9a4a0b94a..ca75e185aa 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
 import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -108,8 +109,6 @@ public class JobMaster {
 
     private CompletableFuture<JobResult> jobMasterCompleteFuture;
 
-    private ClassLoader classLoader;
-
     private JobImmutableInformation jobImmutableInformation;
 
     private LogicalDag logicalDag;
@@ -128,8 +127,6 @@ public class JobMaster {
 
     private final IMap<Object, Object> runningJobStateTimestampsIMap;
 
-    private CompletableFuture<Void> scheduleFuture;
-
     // TODO add config to change value
     private boolean isPhysicalDAGIInfo = true;
 
@@ -204,13 +201,14 @@ public class JobMaster {
                         jobImmutableInformation.getJobId(),
                         jobImmutableInformation.getPluginJarsUrls()));
 
-        classLoader =
+        ClassLoader classLoader =
                 new 
SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
         logicalDag =
                 CustomClassLoadedObject.deserializeWithCustomClassLoader(
                         nodeEngine.getSerializationService(),
                         classLoader,
                         jobImmutableInformation.getLogicalDag());
+        ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
 
         final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
                 PlanUtils.fromLogicalDAG(
@@ -444,10 +442,6 @@ public class JobMaster {
                 "can't find task group address from taskGroupLocation: " + 
taskGroupLocation);
     }
 
-    public ClassLoader getClassLoader() {
-        return classLoader;
-    }
-
     public void cancelJob() {
         physicalPlan.cancelJob();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index daa3da6e09..a7e93f2551 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -51,6 +51,9 @@ public class RestConstant {
     public static final String SUBMIT_JOB_URL = 
"/hazelcast/rest/maps/submit-job";
     public static final String ENCRYPT_CONFIG = 
"/hazelcast/rest/maps/encrypt-config";
 
+    // only for test use
+    public static final String RUNNING_THREADS = 
"/hazelcast/rest/maps/running-threads";
+
     public static final String SYSTEM_MONITORING_INFORMATION =
             "/hazelcast/rest/maps/system-monitoring-information";
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 6d9619abbf..af351f5278 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -67,6 +67,7 @@ import static 
com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_THREADS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
 
 public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCommand> {
@@ -104,6 +105,8 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                 handleJobInfoById(httpGetCommand, uri);
             } else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
                 getSystemMonitoringInformation(httpGetCommand);
+            } else if (uri.startsWith(RUNNING_THREADS)) {
+                getRunningThread(httpGetCommand);
             } else {
                 original.handle(httpGetCommand);
             }
@@ -276,6 +279,24 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
         }
     }
 
+    private void getRunningThread(HttpGetCommand command) {
+        this.prepareResponse(
+                command,
+                Thread.getAllStackTraces().keySet().stream()
+                        .sorted(Comparator.comparing(Thread::getName))
+                        .map(
+                                stackTraceElements -> {
+                                    JsonObject jobInfoJson = new JsonObject();
+                                    jobInfoJson.add("threadName", 
stackTraceElements.getName());
+                                    jobInfoJson.add(
+                                            "classLoader",
+                                            String.valueOf(
+                                                    
stackTraceElements.getContextClassLoader()));
+                                    return jobInfoJson;
+                                })
+                        .collect(JsonArray::new, JsonArray::add, 
JsonArray::add));
+    }
+
     private Map<String, Long> getJobMetrics(String jobMetrics) {
         Map<String, Long> metricsMap = new HashMap<>();
         long sourceReadCount = 0L;

Reply via email to