This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 454c339b9c [Improve][CDC] Close idle subtasks gorup(reader/writer) in 
increment phase (#6526)
454c339b9c is described below

commit 454c339b9c03da0654e743149e0fc8616577d864
Author: hailin0 <[email protected]>
AuthorDate: Wed May 15 13:49:41 2024 +0800

    [Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase 
(#6526)
---
 .../cdc/base/dialect/JdbcDataSourceDialect.java    |  4 +-
 .../source/enumerator/HybridSplitAssigner.java     |  3 +-
 .../enumerator/IncrementalSourceEnumerator.java    | 12 +++-
 .../enumerator/IncrementalSplitAssigner.java       |  4 ++
 .../source/reader/IncrementalSourceReader.java     |  8 ++-
 .../seatunnel/cdc/mysql/source/MySqlDialect.java   | 19 ++---
 .../mysql/source/MysqlPooledDataSourceFactory.java | 36 ----------
 .../reader/fetch/binlog/MySqlBinlogFetchTask.java  | 20 ++++++
 .../seatunnel/cdc/oracle/source/OracleDialect.java |  6 --
 .../source/OraclePooledDataSourceFactory.java      | 43 ------------
 .../cdc/postgres/source/PostgresDialect.java       |  6 --
 .../source/PostgresPooledDataSourceFactory.java    | 35 ---------
 .../sqlserver/source/source/SqlServerDialect.java  |  6 --
 .../source/SqlServerPooledDataSourceFactory.java   | 35 ---------
 .../fetch/SqlServerSourceFetchTaskContext.java     | 21 +++++-
 .../common/source/reader/SourceReaderBase.java     | 19 +++--
 .../seatunnel/http/source/HttpSourceReader.java    |  2 +-
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 24 ++++++-
 .../src/test/resources/docker/setup.sql            | 11 +--
 .../src/test/resources/mysqlcdc_to_mysql.conf      |  8 +--
 .../mysqlcdc_to_mysql_with_custom_primary_key.conf |  4 +-
 ...ysqlcdc_to_mysql_with_disable_exactly_once.conf |  8 +--
 ...c_to_mysql_with_multi_table_mode_one_table.conf | 11 +--
 ...c_to_mysql_with_multi_table_mode_two_table.conf | 11 +--
 .../mysqlcdc_to_mysql_with_no_primary_key.conf     |  8 +--
 .../seatunnel/e2e/connector/http/HttpIT.java       |  1 +
 .../server/checkpoint/CheckpointBarrier.java       | 43 +++++++++---
 .../server/checkpoint/CheckpointCoordinator.java   | 82 +++++++++++++++++++++-
 .../server/checkpoint/CheckpointManager.java       |  9 +++
 .../engine/server/dag/physical/PhysicalVertex.java |  8 +++
 .../seatunnel/engine/server/master/JobMaster.java  | 47 +++++++++++++
 .../server/serializable/RecordSerializer.java      |  8 ++-
 .../serializable/TaskDataSerializerHook.java       |  5 ++
 .../engine/server/task/SeaTunnelTask.java          |  3 +-
 .../server/task/SinkAggregatedCommitterTask.java   | 11 ++-
 .../server/task/SourceSplitEnumeratorTask.java     | 28 +++++---
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java |  2 +-
 .../task/flow/ShuffleSourceFlowLifeCycle.java      |  2 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  2 +-
 .../server/task/flow/SourceFlowLifeCycle.java      |  2 +-
 .../server/task/flow/TransformFlowLifeCycle.java   |  2 +-
 .../group/queue/IntermediateBlockingQueue.java     |  2 +-
 .../group/queue/disruptor/RecordEventHandler.java  |  2 +-
 .../group/queue/disruptor/RecordEventProducer.java |  3 +-
 .../operation/source/CloseIdleReaderOperation.java | 72 +++++++++++++++++++
 .../source/SourceNoMoreElementOperation.java       |  2 +-
 .../engine/server/task/record/Barrier.java         | 21 ++++++
 .../engine/server/master/JobMasterTest.java        | 70 ++++++++++++++++++
 .../test/resources/stream_fakesource_to_file.conf  |  3 +-
 49 files changed, 541 insertions(+), 253 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 712328b5f9..4a497daf71 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -71,7 +71,9 @@ public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfi
     }
 
     /** Get a connection pool factory to create connection pool. */
-    JdbcConnectionPoolFactory getPooledDataSourceFactory();
+    default JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+        throw new UnsupportedOperationException();
+    }
 
     /** Query and build the schema of table. */
     TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId 
tableId);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
index d6b0bdb96c..4acd092478 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -114,7 +114,8 @@ public class HybridSplitAssigner<C extends SourceConfig> 
implements SplitAssigne
 
     @Override
     public boolean waitingForCompletedSplits() {
-        return snapshotSplitAssigner.waitingForCompletedSplits();
+        return snapshotSplitAssigner.waitingForCompletedSplits()
+                || incrementalSplitAssigner.waitingForAssignedSplits();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index b17b910e5d..87ca7dc507 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -177,8 +177,16 @@ public class IncrementalSourceEnumerator
                 awaitingReader.remove();
                 LOG.debug("Assign split {} to subtask {}", sourceSplit, 
nextAwaiting);
             } else {
-                // there is no available splits by now, skip assigning
-                break;
+                if (splitAssigner.waitingForCompletedSplits()) {
+                    // there is no available splits by now, skip assigning
+                    break;
+                } else {
+                    LOG.info(
+                            "No more splits available, signal no more splits 
to subtask {}",
+                            nextAwaiting);
+                    context.signalNoMoreSplits(nextAwaiting);
+                    awaitingReader.remove();
+                }
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index 7b45ee1ef6..1accc47af2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -278,4 +278,8 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
         return context.getAssignedSnapshotSplit().isEmpty()
                 && context.getSplitCompletedOffsets().isEmpty();
     }
+
+    public boolean waitingForAssignedSplits() {
+        return !(splitAssigned && noMoreSplits());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index b5fc443310..abfccdeb75 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -111,7 +111,13 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
             context.sendSplitRequest();
             needSendSplitRequest.compareAndSet(true, false);
         }
-        super.pollNext(output);
+
+        if (isNoMoreSplitsAssignment() && isNoMoreElement()) {
+            log.info("Reader {} send NoMoreElement event", 
context.getIndexOfSubtask());
+            context.signalNoMoreElement();
+        } else {
+            super.pollNext(output);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index c43b819f06..b450ab84ae 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -34,6 +33,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.eumerator.MySq
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -70,20 +70,24 @@ public class MySqlDialect implements JdbcDataSourceDialect {
     @Override
     public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig 
sourceConfig) {
         try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) 
{
-            return isTableIdCaseSensitive(jdbcConnection);
+            return isDataCollectionIdCaseSensitive(jdbcConnection);
         } catch (SQLException e) {
             throw new SeaTunnelException("Error reading MySQL variables: " + 
e.getMessage(), e);
         }
     }
 
+    private boolean isDataCollectionIdCaseSensitive(JdbcConnection 
jdbcConnection) {
+        return isTableIdCaseSensitive(jdbcConnection);
+    }
+
     @Override
-    public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
-        return new MySqlChunkSplitter(sourceConfig, this);
+    public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
+        return 
MySqlConnectionUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
     }
 
     @Override
-    public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
-        return new MysqlPooledDataSourceFactory();
+    public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
+        return new MySqlChunkSplitter(sourceConfig, this);
     }
 
     @Override
@@ -101,8 +105,7 @@ public class MySqlDialect implements JdbcDataSourceDialect {
     public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
         if (mySqlSchema == null) {
             mySqlSchema =
-                    new MySqlSchema(
-                            sourceConfig, 
isDataCollectionIdCaseSensitive(sourceConfig), tableMap);
+                    new MySqlSchema(sourceConfig, 
isDataCollectionIdCaseSensitive(jdbc), tableMap);
         }
         return mySqlSchema.getTableSchema(jdbc, tableId);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
deleted file mode 100644
index 9a0ae21b20..0000000000
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-
-/** A MySQL datasource factory. */
-public class MysqlPooledDataSourceFactory extends JdbcConnectionPoolFactory {
-
-    public static final String JDBC_URL_PATTERN =
-            
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
-
-    @Override
-    public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
-        String hostName = sourceConfig.getHostname();
-        int port = sourceConfig.getPort();
-
-        return String.format(JDBC_URL_PATTERN, hostName, port);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index f858864e78..c61cef79af 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.s
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
 import com.github.shyiko.mysql.binlog.event.Event;
 import io.debezium.DebeziumException;
 import io.debezium.connector.mysql.MySqlConnection;
@@ -40,12 +41,15 @@ import io.debezium.connector.mysql.MySqlTaskContext;
 import io.debezium.pipeline.ErrorHandler;
 import io.debezium.pipeline.source.spi.ChangeEventSource;
 import io.debezium.util.Clock;
+import lombok.extern.slf4j.Slf4j;
 
+import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;
 
+@Slf4j
 public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
     private final IncrementalSplit split;
     private volatile boolean taskRunning = false;
@@ -72,6 +76,22 @@ public class MySqlBinlogFetchTask implements 
FetchTask<SourceSplitBase> {
         BinlogSplitChangeEventSourceContext changeEventSourceContext =
                 new BinlogSplitChangeEventSourceContext();
 
+        sourceFetchContext
+                .getBinaryLogClient()
+                .registerLifecycleListener(
+                        new BinaryLogClient.AbstractLifecycleListener() {
+                            @Override
+                            public void onConnect(BinaryLogClient client) {
+                                try {
+                                    sourceFetchContext.getConnection().close();
+                                    log.info(
+                                            "Binlog client connected, closed 
idle jdbc connection.");
+                                } catch (SQLException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        });
+
         mySqlStreamingChangeEventSource.execute(
                 changeEventSourceContext, 
sourceFetchContext.getOffsetContext());
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
index 5ffef4cc3c..d1908badc0 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -90,11 +89,6 @@ public class OracleDialect implements JdbcDataSourceDialect {
         return new OracleChunkSplitter(sourceConfig, this);
     }
 
-    @Override
-    public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
-        return new OraclePooledDataSourceFactory();
-    }
-
     @Override
     public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
         OracleSourceConfig oracleSourceConfig = (OracleSourceConfig) 
sourceConfig;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OraclePooledDataSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OraclePooledDataSourceFactory.java
deleted file mode 100644
index a17ac42349..0000000000
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OraclePooledDataSourceFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
-
-import org.apache.commons.lang3.StringUtils;
-
-/** A Oracle datasource factory. */
-public class OraclePooledDataSourceFactory extends JdbcConnectionPoolFactory {
-
-    public static final String JDBC_URL_PATTERN = "jdbc:oracle:thin:@%s:%s:%s";
-
-    @Override
-    public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
-        OracleSourceConfig oracleSourceConfig = (OracleSourceConfig) 
sourceConfig;
-        if (StringUtils.isNotBlank(oracleSourceConfig.getOriginUrl())) {
-            return oracleSourceConfig.getOriginUrl();
-        } else {
-            String hostName = sourceConfig.getHostname();
-            int port = sourceConfig.getPort();
-            String database = sourceConfig.getDatabaseList().get(0);
-            return String.format(JDBC_URL_PATTERN, hostName, port, database);
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
index cf6b624db9..72e8f6724e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
@@ -98,11 +97,6 @@ public class PostgresDialect implements 
JdbcDataSourceDialect {
         return new PostgresChunkSplitter(sourceConfig, this);
     }
 
-    @Override
-    public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
-        return new PostgresPooledDataSourceFactory();
-    }
-
     @Override
     public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
         PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) 
sourceConfig;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresPooledDataSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresPooledDataSourceFactory.java
deleted file mode 100644
index e1cfa4e912..0000000000
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresPooledDataSourceFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-
-/** Factory to create {@link JdbcConnectionPoolFactory} for Postgre SQL. */
-public class PostgresPooledDataSourceFactory extends JdbcConnectionPoolFactory 
{
-
-    private static final String URL_PATTERN = "jdbc:postgresql://%s:%s/%s";
-
-    @Override
-    public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
-        String hostName = sourceConfig.getHostname();
-        int port = sourceConfig.getPort();
-        String database = sourceConfig.getDatabaseList().get(0);
-        return String.format(URL_PATTERN, hostName, port, database);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index e667412378..aa82024b71 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -85,11 +84,6 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
         return new SqlServerChunkSplitter(sourceConfig, this);
     }
 
-    @Override
-    public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
-        return new SqlServerPooledDataSourceFactory();
-    }
-
     @Override
     public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
         SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig) 
sourceConfig;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerPooledDataSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerPooledDataSourceFactory.java
deleted file mode 100644
index 911b9af147..0000000000
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerPooledDataSourceFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-
-/** Factory to create {@link JdbcConnectionPoolFactory} for SQL Server. */
-public class SqlServerPooledDataSourceFactory extends 
JdbcConnectionPoolFactory {
-
-    private static final String URL_PATTERN = 
"jdbc:sqlserver://%s:%s;databaseName=%s";
-
-    @Override
-    public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
-        String hostName = sourceConfig.getHostname();
-        int port = sourceConfig.getPort();
-        String database = sourceConfig.getDatabaseList().get(0);
-        return String.format(URL_PATTERN, hostName, port, database);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 8178c9f30a..6aadf1aca9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -74,7 +74,7 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
 
     private final SqlServerConnection dataConnection;
 
-    private final SqlServerConnection metadataConnection;
+    private SqlServerConnection metadataConnection;
 
     private final SqlServerEventMetadataProvider metadataProvider;
     private SqlServerDatabaseSchema databaseSchema;
@@ -92,7 +92,6 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
         super(sourceConfig, dataSourceDialect);
 
         this.dataConnection = 
createSqlServerConnection(sourceConfig.getDbzConfiguration());
-        this.metadataConnection = 
createSqlServerConnection(sourceConfig.getDbzConfiguration());
         this.metadataProvider = new SqlServerEventMetadataProvider();
     }
 
@@ -162,13 +161,29 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
                         taskContext, queue, metadataProvider);
 
         this.errorHandler = new 
SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
+        if (sourceSplitBase.isIncrementalSplit() || isExactlyOnce()) {
+            initMetadataConnection();
+        }
+    }
+
+    private void initMetadataConnection() {
+        if (this.metadataConnection == null) {
+            synchronized (this) {
+                if (this.metadataConnection == null) {
+                    this.metadataConnection =
+                            
createSqlServerConnection(sourceConfig.getDbzConfiguration());
+                }
+            }
+        }
     }
 
     @Override
     public void close() {
         try {
             this.dataConnection.close();
-            this.metadataConnection.close();
+            if (this.metadataConnection != null) {
+                this.metadataConnection.close();
+            }
         } catch (SQLException e) {
             log.warn("Failed to close connection", e);
         }
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 29dd2ff6f5..7ec6cc83e9 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -65,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
     private RecordsWithSplitIds<E> currentFetch;
     protected SplitContext<T, SplitStateT> currentSplitContext;
     private Collector<T> currentSplitOutput;
-    private boolean noMoreSplitsAssignment;
+    @Getter private volatile boolean noMoreSplitsAssignment;
 
     public SourceReaderBase(
             BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
@@ -94,10 +94,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
             if (recordsWithSplitId == null) {
                 if (Boundedness.BOUNDED.equals(context.getBoundedness())
                         && noMoreSplitsAssignment
-                        && splitFetcherManager.maybeShutdownFinishedFetchers()
-                        && elementsQueue.isEmpty()) {
+                        && isNoMoreElement()) {
                     context.signalNoMoreElement();
-                    log.info("Send NoMoreElement event");
+                    log.info(
+                            "Reader {} into idle state, send NoMoreElement 
event",
+                            context.getIndexOfSubtask());
                 }
                 return;
             }
@@ -137,7 +138,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
 
     @Override
     public void handleNoMoreSplits() {
-        log.info("Reader received NoMoreSplits event.");
+        log.info("Reader {} received NoMoreSplits event.", 
context.getIndexOfSubtask());
         noMoreSplitsAssignment = true;
     }
 
@@ -146,9 +147,15 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
         log.info("Received unhandled source event: {}", sourceEvent);
     }
 
+    protected boolean isNoMoreElement() {
+        return splitFetcherManager.maybeShutdownFinishedFetchers()
+                && elementsQueue.isEmpty()
+                && currentFetch == null;
+    }
+
     @Override
     public void close() {
-        log.info("Closing Source Reader.");
+        log.info("Closing Source Reader {}.", context.getIndexOfSubtask());
         try {
             splitFetcherManager.close(options.getSourceReaderCloseTimeout());
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index c2c3da6968..104d769ef5 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -130,7 +130,7 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
                     collect(output, content);
                 }
             }
-            log.info(
+            log.debug(
                     "http client execute success request param:[{}], http 
response status code:[{}], content:[{}]",
                     httpParameter.getParams(),
                     response.getCode(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 62dc3f077d..7fab60f9fc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -63,8 +63,8 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
 
     // mysql
     private static final String MYSQL_HOST = "mysql_cdc_e2e";
-    private static final String MYSQL_USER_NAME = "st_user";
-    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_USER_NAME = "mysqluser";
+    private static final String MYSQL_USER_PASSWORD = "mysqlpw";
     private static final String MYSQL_DATABASE = "mysql_cdc";
     private static final String MYSQL_DATABASE2 = "mysql_cdc2";
     private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
@@ -357,6 +357,12 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                                                                 
getSourceQuerySQL(
                                                                         
MYSQL_DATABASE2,
                                                                         
SOURCE_TABLE_1)))));
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .pollInterval(1000, TimeUnit.MILLISECONDS)
+                .until(() -> getConnectionStatus("st_user_source").size() == 
1);
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .pollInterval(1000, TimeUnit.MILLISECONDS)
+                .until(() -> getConnectionStatus("st_user_sink").size() == 1);
 
         Pattern jobIdPattern =
                 Pattern.compile(
@@ -413,6 +419,13 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                                                                         
MYSQL_DATABASE2,
                                                                         
SOURCE_TABLE_2)))));
 
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .pollInterval(1000, TimeUnit.MILLISECONDS)
+                .until(() -> getConnectionStatus("st_user_source").size() == 
1);
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .pollInterval(1000, TimeUnit.MILLISECONDS)
+                .until(() -> getConnectionStatus("st_user_sink").size() == 1);
+
         log.info("****************** container logs start ******************");
         String containerLogs = container.getServerLogs();
         log.info(containerLogs);
@@ -479,6 +492,13 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                 MYSQL_CONTAINER.getPassword());
     }
 
+    private List<List<Object>> getConnectionStatus(String user) {
+        return query(
+                "select USER,HOST,DB,COMMAND,TIME,STATE from 
information_schema.processlist where USER = '"
+                        + user
+                        + "'");
+    }
+
     private List<List<Object>> query(String sql) {
         try (Connection connection = getJdbcConnection()) {
             ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
index 0a54c2c282..079b8f1d95 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
@@ -19,13 +19,16 @@
 -- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
 -- However, in this database we'll grant 2 users different privileges:
 --
--- 1) 'st_user' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
--- 2) 'mysqluser' - all privileges
+-- 1) 'mysqluser' - all privileges
+-- 2) 'st_user_source' - all privileges required by the snapshot reader AND 
binlog reader (used for testing)
+-- 3) 'st_user_sink' - all privileges required by the write data (used for 
testing)
 --
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
DROP, LOCK TABLES  ON *.* TO 'st_user'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
 GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
 
+CREATE USER 'st_user_source' IDENTIFIED BY 'mysqlpw';
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
DROP, LOCK TABLES  ON *.* TO 'st_user_source'@'%';
+CREATE USER 'st_user_sink' IDENTIFIED BY 'mysqlpw';
+GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, INDEX, ALTER ON *.* TO 
'st_user_sink'@'%';
 -- 
----------------------------------------------------------------------------------------------------------------
 -- DATABASE:  emptydb
 -- 
----------------------------------------------------------------------------------------------------------------
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
index a5874db62d..4a352455d9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
@@ -31,8 +31,8 @@ source {
   MySQL-CDC {
     result_table_name = "customers_mysql_cdc"
     server-id = 5652
-    username = "st_user"
-    password = "seatunnel"
+    username = "st_user_source"
+    password = "mysqlpw"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
   }
@@ -55,8 +55,8 @@ sink {
     source_table_name = "trans_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
     driver = "com.mysql.cj.jdbc.Driver"
-    user = "st_user"
-    password = "seatunnel"
+    user = "st_user_sink"
+    password = "mysqlpw"
 
     generate_sink_sql = true
     # You need to configure both database and table
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
index ba3e94855f..1d1c1c80c7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
@@ -30,7 +30,7 @@ source {
     result_table_name = "customers_mysql_cdc"
     server-id = 5652
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
-    username = "mysqluser"
+    username = "st_user_source"
     password = "mysqlpw"
     exactly_once = true
     table-names = 
["mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key", 
"mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"]
@@ -52,7 +52,7 @@ sink {
     source_table_name = "customers_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
     driver = "com.mysql.cj.jdbc.Driver"
-    user = "mysqluser"
+    user = "st_user_sink"
     password = "mysqlpw"
     database = "mysql_cdc2"
     generate_sink_sql = true
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
index 4b91a877d4..fdc4302662 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
@@ -32,8 +32,8 @@ source {
   MySQL-CDC {
     result_table_name = "customers_mysql_cdc"
     server-id = 5652
-    username = "st_user"
-    password = "seatunnel"
+    username = "st_user_source"
+    password = "mysqlpw"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
 
@@ -48,8 +48,8 @@ sink {
     source_table_name = "customers_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
     driver = "com.mysql.cj.jdbc.Driver"
-    user = "st_user"
-    password = "seatunnel"
+    user = "st_user_sink"
+    password = "mysqlpw"
 
     generate_sink_sql = true
     # You need to configure both database and table
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
index 180cedcda8..c382c1c586 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
@@ -20,7 +20,7 @@
 
 env {
   # You can set engine configuration here
-  parallelism = 1
+  parallelism = 3
   job.mode = "STREAMING"
   checkpoint.interval = 5000
 }
@@ -28,11 +28,14 @@ env {
 source {
   MySQL-CDC {
     result_table_name = "customers_mysql_cdc"
-    server-id = 5652
-    username = "mysqluser"
+    server-id = 5652-5660
+    username = "st_user_source"
     password = "mysqlpw"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+    snapshot.split.size = 1
+    snapshot.fetch.size = 1
   }
 }
 
@@ -44,7 +47,7 @@ sink {
     source_table_name = "customers_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
     driver = "com.mysql.cj.jdbc.Driver"
-    user = "mysqluser"
+    user = "st_user_sink"
     password = "mysqlpw"
     database = "mysql_cdc2"
     generate_sink_sql = true
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
index f9bc9b1e01..cb10cf2644 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
@@ -20,7 +20,7 @@
 
 env {
   # You can set engine configuration here
-  parallelism = 1
+  parallelism = 3
   job.mode = "STREAMING"
   checkpoint.interval = 5000
 }
@@ -28,11 +28,14 @@ env {
 source {
   MySQL-CDC {
     result_table_name = "customers_mysql_cdc"
-    server-id = 5652
-    username = "mysqluser"
+    server-id = 5652-5660
+    username = "st_user_source"
     password = "mysqlpw"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table", 
"mysql_cdc.mysql_cdc_e2e_source_table2"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+    snapshot.split.size = 1
+    snapshot.fetch.size = 1
   }
 }
 
@@ -44,7 +47,7 @@ sink {
     source_table_name = "customers_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
     driver = "com.mysql.cj.jdbc.Driver"
-    user = "mysqluser"
+    user = "st_user_sink"
     password = "mysqlpw"
     database = "mysql_cdc2"
     generate_sink_sql = true
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
index 46df806ae7..e35403ba23 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
@@ -29,8 +29,8 @@ source {
   MySQL-CDC {
     result_table_name = "customers_mysql_cdc"
     server-id = 5652
-    username = "st_user"
-    password = "seatunnel"
+    username = "st_user_source"
+    password = "mysqlpw"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table_no_primary_key"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
 
@@ -43,8 +43,8 @@ sink {
     source_table_name = "customers_mysql_cdc"
     url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
     driver = "com.mysql.cj.jdbc.Driver"
-    user = "st_user"
-    password = "seatunnel"
+    user = "st_user_sink"
+    password = "mysqlpw"
 
     generate_sink_sql = true
     # You need to configure both database and table
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 9dc38cbd1c..f53d8c1d45 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -75,6 +75,7 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
                         .withEnv(
                                 "MOCKSERVER_INITIALIZATION_JSON_PATH",
                                 TMP_DIR + getMockServerConfig())
+                        .withEnv("MOCKSERVER_LOG_LEVEL", "WARN")
                         .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
                         .waitingFor(new 
HttpWaitStrategy().forPath("/").forStatusCode(404));
         Startables.deepStart(Stream.of(mockserverContainer)).join();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
index 7179cc8cb3..1cf1f8bdfd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
@@ -18,27 +18,46 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.google.common.base.Objects;
+import lombok.Getter;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+@Getter
 public class CheckpointBarrier implements Barrier, Serializable {
     private final long id;
     private final long timestamp;
     private final CheckpointType checkpointType;
+    private final Set<TaskLocation> prepareCloseTasks;
+    private final Set<TaskLocation> closedTasks;
 
     public CheckpointBarrier(long id, long timestamp, CheckpointType 
checkpointType) {
+        this(id, timestamp, checkpointType, Collections.emptySet(), 
Collections.emptySet());
+    }
+
+    public CheckpointBarrier(
+            long id,
+            long timestamp,
+            CheckpointType checkpointType,
+            Set<TaskLocation> prepareCloseTasks,
+            Set<TaskLocation> closedTasks) {
         this.id = id;
         this.timestamp = timestamp;
         this.checkpointType = checkNotNull(checkpointType);
-    }
-
-    public long getId() {
-        return id;
+        this.prepareCloseTasks = prepareCloseTasks;
+        this.closedTasks = closedTasks;
+        if (new HashSet(prepareCloseTasks).removeAll(closedTasks)) {
+            throw new IllegalArgumentException(
+                    "The prepareCloseTasks collection should not contain 
elements of the closedTasks collection");
+        }
     }
 
     @Override
@@ -51,12 +70,17 @@ public class CheckpointBarrier implements Barrier, 
Serializable {
         return checkpointType.isFinalCheckpoint();
     }
 
-    public long getTimestamp() {
-        return timestamp;
+    @Override
+    public boolean prepareClose(TaskLocation task) {
+        if (prepareClose()) {
+            return true;
+        }
+        return prepareCloseTasks.contains(task);
     }
 
-    public CheckpointType getCheckpointType() {
-        return checkpointType;
+    @Override
+    public Set<TaskLocation> closedTasks() {
+        return Collections.unmodifiableSet(closedTasks);
     }
 
     @Override
@@ -81,7 +105,8 @@ public class CheckpointBarrier implements Barrier, 
Serializable {
     @Override
     public String toString() {
         return String.format(
-                "CheckpointBarrier %d @ %d Options: %s", id, timestamp, 
checkpointType);
+                "CheckpointBarrier %d @ %d type: %s, prepareClose: %s, closed: 
%s",
+                id, timestamp, checkpointType, prepareCloseTasks, closedTasks);
     }
 
     public boolean isAuto() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 47392f7a31..12f5acd597 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -55,6 +55,7 @@ import lombok.SneakyThrows;
 import java.time.Instant;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -111,6 +112,8 @@ public class CheckpointCoordinator {
     private final CheckpointPlan plan;
 
     private final Set<TaskLocation> readyToCloseStartingTask;
+    private final Set<TaskLocation> readyToCloseIdleTask;
+    @Getter private final Set<TaskLocation> closedIdleTask;
     private final ConcurrentHashMap<Long, PendingCheckpoint> 
pendingCheckpoints;
 
     private final ArrayDeque<String> completedCheckpointIds;
@@ -189,6 +192,8 @@ public class CheckpointCoordinator {
         this.pipelineTaskStatus = new ConcurrentHashMap<>();
         this.checkpointIdCounter = checkpointIdCounter;
         this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
+        this.readyToCloseIdleTask = new CopyOnWriteArraySet<>();
+        this.closedIdleTask = new CopyOnWriteArraySet<>();
 
         LOG.info(
                 "Create CheckpointCoordinator for job({}@{}) with plan({})",
@@ -309,7 +314,11 @@ public class CheckpointCoordinator {
                                 for (int i = tuple.f1();
                                         i < actionState.getParallelism();
                                         i += currentParallelism) {
-                                    
states.add(actionState.getSubtaskStates().get(i));
+                                    ActionSubtaskState subtaskState =
+                                            
actionState.getSubtaskStates().get(i);
+                                    if (subtaskState != null) {
+                                        states.add(subtaskState);
+                                    }
                                 }
                             });
         }
@@ -397,6 +406,64 @@ public class CheckpointCoordinator {
         }
     }
 
+    protected void readyToCloseIdleTask(TaskLocation taskLocation) {
+        if (plan.getStartingSubtasks().contains(taskLocation)) {
+            throw new UnsupportedOperationException("Unsupported close 
starting task");
+        }
+
+        LOG.info(
+                "Received close idle task[{}]({}/{}). {}",
+                taskLocation.getTaskID(),
+                taskLocation.getPipelineId(),
+                taskLocation.getJobId(),
+                taskLocation);
+        synchronized (readyToCloseIdleTask) {
+            if (readyToCloseIdleTask.contains(taskLocation)
+                    || closedIdleTask.contains(taskLocation)) {
+                LOG.warn(
+                        "task[{}]({}/{}) already in closed. {}",
+                        taskLocation.getTaskID(),
+                        taskLocation.getPipelineId(),
+                        taskLocation.getJobId(),
+                        taskLocation);
+                return;
+            }
+
+            List<TaskLocation> subTaskList = new ArrayList<>();
+            for (TaskLocation subTask : plan.getPipelineSubtasks()) {
+                if 
(subTask.getTaskGroupLocation().equals(taskLocation.getTaskGroupLocation())) {
+                    // close all subtask in the same task group
+                    subTaskList.add(subTask);
+                    LOG.info(
+                            "Add task[{}]({}/{}) to prepare close list",
+                            subTask.getTaskID(),
+                            subTask.getPipelineId(),
+                            subTask.getJobId());
+                }
+            }
+            if (subTaskList.size() != 2) {
+                throw new UnsupportedOperationException(
+                        "Unsupported close not reader/writer task group: " + 
subTaskList);
+            }
+            readyToCloseIdleTask.addAll(subTaskList);
+            tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
+        }
+    }
+
+    protected void completedCloseIdleTask(TaskLocation taskLocation) {
+        synchronized (readyToCloseIdleTask) {
+            if (readyToCloseIdleTask.contains(taskLocation)) {
+                readyToCloseIdleTask.remove(taskLocation);
+                closedIdleTask.add(taskLocation);
+                LOG.info(
+                        "Completed close task[{}]({}/{})",
+                        taskLocation.getTaskID(),
+                        taskLocation.getPipelineId(),
+                        taskLocation.getJobId());
+            }
+        }
+    }
+
     protected void restoreCoordinator(boolean alreadyStarted) {
         LOG.info("received restore CheckpointCoordinator with alreadyStarted= 
" + alreadyStarted);
         errorByPhysicalVertex = new AtomicReference<>();
@@ -553,7 +620,9 @@ public class CheckpointCoordinator {
                                                             
pendingCheckpoint.getCheckpointId(),
                                                             pendingCheckpoint
                                                                     
.getCheckpointTimestamp(),
-                                                            
pendingCheckpoint.getCheckpointType()),
+                                                            
pendingCheckpoint.getCheckpointType(),
+                                                            new 
HashSet<>(readyToCloseIdleTask),
+                                                            new 
HashSet<>(closedIdleTask)),
                                             executorService)
                                     .thenApplyAsync(this::triggerCheckpoint, 
executorService);
 
@@ -664,8 +733,8 @@ public class CheckpointCoordinator {
     }
 
     private Set<Long> getNotYetAcknowledgedTasks() {
-        // TODO: some tasks have completed and don't need to be ack
         return plan.getPipelineSubtasks().stream()
+                .filter(e -> !closedIdleTask.contains(e))
                 .map(TaskLocation::getTaskID)
                 .collect(Collectors.toCollection(CopyOnWriteArraySet::new));
     }
@@ -715,6 +784,8 @@ public class CheckpointCoordinator {
             }
             pipelineTaskStatus.clear();
             readyToCloseStartingTask.clear();
+            readyToCloseIdleTask.clear();
+            closedIdleTask.clear();
             pendingCounter.set(0);
             schemaChanging.set(false);
             scheduler.shutdownNow();
@@ -752,6 +823,11 @@ public class CheckpointCoordinator {
                 pendingCheckpoint.getCheckpointType().isSavepoint()
                         ? SubtaskStatus.SAVEPOINT_PREPARE_CLOSE
                         : SubtaskStatus.RUNNING);
+
+        if (ackOperation.getBarrier().getCheckpointType().notFinalCheckpoint()
+                && ackOperation.getBarrier().prepareClose(location)) {
+            completedCloseIdleTask(location);
+        }
     }
 
     public synchronized void completePendingCheckpoint(CompletedCheckpoint 
completedCheckpoint) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 9ef2e6623b..21c2b90df5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -216,6 +216,15 @@ public class CheckpointManager {
         getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
     }
 
+    /**
+     * Called by the {@link SourceSplitEnumeratorTask}. <br>
+     * used by SourceSplitEnumeratorTask to tell CheckpointCoordinator 
pipeline will trigger close
+     * barrier of idle task by SourceSplitEnumeratorTask.
+     */
+    public void readyToCloseIdleTask(TaskLocation taskLocation) {
+        
getCheckpointCoordinator(taskLocation).readyToCloseIdleTask(taskLocation);
+    }
+
     /**
      * Called by the JobMaster. <br>
      * Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is 
used to shut down the
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index f16b0cc6e4..4fbcfa4fa3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
@@ -31,6 +33,7 @@ import 
org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskDeployState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -350,6 +353,11 @@ public class PhysicalVertex {
                 this.connectorJarIdentifiers);
     }
 
+    @VisibleForTesting
+    public TaskGroup getTaskGroup() {
+        return taskGroup;
+    }
+
     public synchronized void updateTaskState(@NonNull ExecutionState 
targetState) {
         try {
             ExecutionState current = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 01b2af48a8..ece0c18c93 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -389,6 +389,46 @@ public class JobMaster {
         return jobDAGInfo;
     }
 
+    public void releaseTaskGroupResource(
+            PipelineLocation pipelineLocation, TaskGroupLocation 
taskGroupLocation) {
+        Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
+                ownedSlotProfilesIMap.get(pipelineLocation);
+        if (taskGroupLocationSlotProfileMap == null) {
+            return;
+        }
+        SlotProfile taskGroupSlotProfile = 
taskGroupLocationSlotProfileMap.get(taskGroupLocation);
+        if (taskGroupSlotProfile == null) {
+            return;
+        }
+
+        try {
+            RetryUtils.retryWithException(
+                    () -> {
+                        LOGGER.info(
+                                String.format(
+                                        "release the task group resource %s", 
taskGroupLocation));
+
+                        resourceManager
+                                .releaseResources(
+                                        jobImmutableInformation.getJobId(),
+                                        
Collections.singletonList(taskGroupSlotProfile))
+                                .join();
+
+                        return null;
+                    },
+                    new RetryUtils.RetryMaterial(
+                            Constant.OPERATION_RETRY_TIME,
+                            true,
+                            exception -> 
ExceptionUtil.isOperationNeedRetryException(exception),
+                            Constant.OPERATION_RETRY_SLEEP));
+        } catch (Exception e) {
+            LOGGER.warning(
+                    String.format(
+                            "release the task group resource failed %s, with 
exception: %s ",
+                            taskGroupLocation, ExceptionUtils.getMessage(e)));
+        }
+    }
+
     public void releasePipelineResource(SubPlan subPlan) {
         try {
             Map<TaskGroupLocation, SlotProfile> 
taskGroupLocationSlotProfileMap =
@@ -663,6 +703,13 @@ public class JobMaster {
 
                                                 
task.updateStateByExecutionService(
                                                         taskExecutionState);
+                                                if (taskExecutionState
+                                                        .getExecutionState()
+                                                        .isEndState()) {
+                                                    releaseTaskGroupResource(
+                                                            
pipeline.getPipelineLocation(),
+                                                            
task.getTaskGroupLocation());
+                                                }
                                             });
                         });
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
index 90caab3c18..975810693d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
@@ -45,6 +45,8 @@ public class RecordSerializer implements 
StreamSerializer<Record> {
             out.writeLong(checkpointBarrier.getId());
             out.writeLong(checkpointBarrier.getTimestamp());
             out.writeString(checkpointBarrier.getCheckpointType().getName());
+            out.writeObject(checkpointBarrier.getPrepareCloseTasks());
+            out.writeObject(checkpointBarrier.getClosedTasks());
         } else if (data instanceof SeaTunnelRow) {
             SeaTunnelRow row = (SeaTunnelRow) data;
             out.writeByte(RecordDataType.SEATUNNEL_ROW.ordinal());
@@ -67,7 +69,11 @@ public class RecordSerializer implements 
StreamSerializer<Record> {
         if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
             data =
                     new CheckpointBarrier(
-                            in.readLong(), in.readLong(), 
CheckpointType.fromName(in.readString()));
+                            in.readLong(),
+                            in.readLong(),
+                            CheckpointType.fromName(in.readString()),
+                            in.readObject(),
+                            in.readObject());
         } else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
             String tableId = in.readString();
             byte rowKind = in.readByte();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 7c298272c7..a926767576 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequest
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import 
org.apache.seatunnel.engine.server.task.operation.source.CloseIdleReaderOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
@@ -101,6 +102,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
 
     public static final int REPORT_JOB_EVENT = 25;
 
+    public static final int CLOSE_READER_OPERATION = 26;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
@@ -171,6 +174,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
                     return new DeleteConnectorJarInExecutionNode();
                 case REPORT_JOB_EVENT:
                     return new JobEventReportOperation();
+                case CLOSE_READER_OPERATION:
+                    return new CloseIdleReaderOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 3ee636bbc5..d66921c742 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -339,7 +339,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
         Integer ackSize =
                 cycleAcks.compute(barrier.getId(), (id, count) -> count == 
null ? 1 : ++count);
         if (ackSize == allCycles.size()) {
-            if (barrier.prepareClose()) {
+            cycleAcks.remove(barrier.getId());
+            if (barrier.prepareClose(this.taskLocation)) {
                 this.prepareCloseStatus = true;
                 this.prepareCloseBarrierId.set(barrier.getId());
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 48c3ca197d..34a14cf585 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -216,6 +216,12 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
         }
     }
 
+    private long getClosedWriters(Barrier barrier) {
+        return barrier.closedTasks().stream()
+                .filter(task -> writerAddressMap.containsKey(task.getTaskID()))
+                .count();
+    }
+
     @Override
     public void triggerBarrier(Barrier barrier) throws Exception {
         long startTime = System.currentTimeMillis();
@@ -224,10 +230,11 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
         Integer count =
                 checkpointBarrierCounter.compute(
                         barrier.getId(), (id, num) -> num == null ? 1 : ++num);
-        if (count != maxWriterSize) {
+
+        if (count != (maxWriterSize - getClosedWriters(barrier))) {
             return;
         }
-        if (barrier.prepareClose()) {
+        if (barrier.prepareClose(this.taskLocation)) {
             this.prepareCloseStatus = true;
             this.prepareCloseBarrierId.set(barrier.getId());
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index e95684c1c5..2672dab382 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -32,12 +33,12 @@ import 
org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import 
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
 import 
org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
+import 
org.apache.seatunnel.engine.server.task.operation.source.CloseIdleReaderOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
 import com.hazelcast.cluster.Address;
-import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -57,7 +58,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@@ -144,7 +144,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         long startTime = System.currentTimeMillis();
 
         log.debug("split enumer trigger barrier [{}]", barrier);
-        if (barrier.prepareClose()) {
+        if (barrier.prepareClose(this.taskLocation)) {
             this.prepareCloseTriggered = true;
             this.prepareCloseBarrierId.set(barrier.getId());
         }
@@ -158,7 +158,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 serialize = enumeratorStateSerializer.serialize(snapshotState);
             }
             log.debug("source split enumerator send state [{}] to master", 
snapshotState);
-            sendToAllReader(location -> new BarrierFlowOperation(barrier, 
location));
+            sendToActiveReader(barrier);
         }
         if (barrier.snapshot()) {
             this.getExecutionContext()
@@ -276,10 +276,18 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         return enumerator;
     }
 
-    public void readerFinished(long taskID) {
-        unfinishedReaders.remove(taskID);
+    public void readerFinished(TaskLocation taskLocation) {
+        unfinishedReaders.remove(taskLocation.getTaskID());
         if (unfinishedReaders.isEmpty()) {
             prepareCloseStatus = true;
+        } else if 
(Boundedness.UNBOUNDED.equals(this.source.getSource().getBoundedness())) {
+            log.info(
+                    "Send close idle reader {} operation of unbounded job. {}",
+                    taskLocation.getTaskIndex(),
+                    taskLocation);
+            this.getExecutionContext()
+                    .sendToMaster(new CloseIdleReaderOperation(jobID, 
taskLocation))
+                    .join();
         }
     }
 
@@ -348,10 +356,13 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 .collect(Collectors.toSet());
     }
 
-    private void sendToAllReader(Function<TaskLocation, Operation> function) {
+    private void sendToActiveReader(Barrier barrier) {
         List<InvocationFuture<?>> futures = new ArrayList<>();
         taskMemberMapping.forEach(
                 (location, address) -> {
+                    if (barrier.closedTasks().contains(location)) {
+                        return;
+                    }
                     log.debug(
                             "split enumerator send to read--size: {}, 
location: {}, address: {}",
                             taskMemberMapping.size(),
@@ -359,7 +370,8 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                             address.toString());
                     futures.add(
                             this.getExecutionContext()
-                                    .sendToMember(function.apply(location), 
address));
+                                    .sendToMember(
+                                            new BarrierFlowOperation(barrier, 
location), address));
                 });
         futures.forEach(InvocationFuture::join);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 32ec5cb8f4..9bdbd37f80 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -78,7 +78,7 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle
             shuffleFlush();
 
             Barrier barrier = (Barrier) record.getData();
-            if (barrier.prepareClose()) {
+            if (barrier.prepareClose(runningTask.getTaskLocation())) {
                 prepareClose = true;
             }
             if (barrier.snapshot()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 2f14c67701..87b29eab21 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -107,7 +107,7 @@ public class ShuffleSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCycle
 
                     // publish barrier
                     if (alignedBarriersCounter == shuffles.length) {
-                        if (barrier.prepareClose()) {
+                        if 
(barrier.prepareClose(runningTask.getTaskLocation())) {
                             prepareClose = true;
                         }
                         if (barrier.snapshot()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 1a2143b027..48c530a0c3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -183,7 +183,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 long startTime = System.currentTimeMillis();
 
                 Barrier barrier = (Barrier) record.getData();
-                if (barrier.prepareClose()) {
+                if (barrier.prepareClose(this.taskLocation)) {
                     prepareClose = true;
                 }
                 if (barrier.snapshot()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 64e5bfd22b..83675575b4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -268,7 +268,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
 
         // Block the reader from adding barrier to the collector.
         synchronized (collector.getCheckpointLock()) {
-            if (barrier.prepareClose()) {
+            if (barrier.prepareClose(this.currentTaskLocation)) {
                 this.prepareClose = true;
             }
             if (barrier.snapshot()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 187aa3659b..0447513b5f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -75,7 +75,7 @@ public class TransformFlowLifeCycle<T> extends 
ActionFlowLifeCycle
     public void received(Record<?> record) {
         if (record.getData() instanceof Barrier) {
             CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
-            if (barrier.prepareClose()) {
+            if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
                 prepareClose = true;
             }
             if (barrier.snapshot()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index f7cd7b0c23..b8e53faabd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -64,7 +64,7 @@ public class IntermediateBlockingQueue extends 
AbstractIntermediateQueue<Blockin
         if (record.getData() instanceof Barrier) {
             CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
             getRunningTask().ack(barrier);
-            if (barrier.prepareClose()) {
+            if (barrier.prepareClose(this.getRunningTask().getTaskLocation())) 
{
                 getIntermediateQueueFlowLifeCycle().setPrepareClose(true);
             }
             consumer.accept(record);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
index d525a0b257..6cc5195de5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
@@ -54,7 +54,7 @@ public class RecordEventHandler implements 
EventHandler<RecordEvent> {
             if (record.getData() instanceof Barrier) {
                 CheckpointBarrier barrier = (CheckpointBarrier) 
record.getData();
                 runningTask.ack(barrier);
-                if (barrier.prepareClose()) {
+                if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
                     this.intermediateQueueFlowLifeCycle.setPrepareClose(true);
                 }
             } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
index 021bb8d2f0..ea47f83a79 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
@@ -34,7 +34,8 @@ public class RecordEventProducer {
         if (record.getData() instanceof Barrier) {
             CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
             intermediateQueueFlowLifeCycle.getRunningTask().ack(barrier);
-            if (barrier.prepareClose()) {
+            if (barrier.prepareClose(
+                    
intermediateQueueFlowLifeCycle.getRunningTask().getTaskLocation())) {
                 intermediateQueueFlowLifeCycle.setPrepareClose(true);
             }
         } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java
new file mode 100644
index 0000000000..abedf1a499
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.source;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class CloseIdleReaderOperation extends Operation implements 
IdentifiedDataSerializable {
+    private long jobId;
+    private TaskLocation taskLocation;
+
+    public CloseIdleReaderOperation() {}
+
+    public CloseIdleReaderOperation(long jobId, TaskLocation taskLocation) {
+        this.jobId = jobId;
+        this.taskLocation = taskLocation;
+    }
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        server.getCoordinatorService()
+                .getJobMaster(jobId)
+                .getCheckpointManager()
+                .readyToCloseIdleTask(taskLocation);
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        out.writeLong(jobId);
+        out.writeObject(taskLocation);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        jobId = in.readLong();
+        taskLocation = in.readObject();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.CLOSE_READER_OPERATION;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index fec6afcb91..60b67c0c83 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -57,7 +57,7 @@ public class SourceNoMoreElementOperation extends Operation 
implements Identifie
                     Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
                             
server.getTaskExecutionService().getTask(enumeratorTaskID);
-                    task.readerFinished(currentTaskID.getTaskID());
+                    task.readerFinished(currentTaskID);
                     
Thread.currentThread().setContextClassLoader(oldClassLoader);
                     return null;
                 },
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
index 67042247b6..4350c63372 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.engine.server.task.record;
 
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+
+import java.util.Set;
+
 /** barrier flowing in data flow */
 public interface Barrier {
     Long PREPARE_CLOSE_BARRIER_ID = Long.MAX_VALUE;
@@ -32,4 +36,21 @@ public interface Barrier {
 
     /** Barrier indicating that the task should prepare to close. */
     boolean prepareClose();
+
+    /**
+     * Barrier indicating that the task should prepare to close.
+     *
+     * @param task task location
+     * @return If the task is included, the return true
+     */
+    default boolean prepareClose(TaskLocation task) {
+        return prepareClose();
+    }
+
+    /**
+     * Indicates a list of tasks that have been closed.
+     *
+     * @return
+     */
+    Set<TaskLocation> closedTasks();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index dd886d923e..cbfde91b37 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -28,9 +28,15 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotService;
+import org.apache.seatunnel.engine.server.task.CoordinatorTask;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -219,7 +225,70 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
         Assertions.assertTrue(jobMaster.isNeedRestore());
     }
 
+    @Test
+    public void testCloseIdleTask() throws InterruptedException {
+        long jobId = 
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+        JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
+        Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus());
+
+        assertCloseIdleTask(jobMaster);
+
+        server.getCoordinatorService().savePoint(jobId);
+        server.getCoordinatorService().getJobStatus(jobId);
+        await().atMost(60, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            JobStatus jobStatus =
+                                    
server.getCoordinatorService().getJobStatus(jobId);
+                            Assertions.assertEquals(JobStatus.SAVEPOINT_DONE, 
jobStatus);
+                        });
+        jobMaster = newJobInstanceWithRunningState(jobId, true);
+        Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus());
+
+        assertCloseIdleTask(jobMaster);
+    }
+
+    private void assertCloseIdleTask(JobMaster jobMaster) {
+        SlotService slotService = server.getSlotService();
+        Assertions.assertEquals(4, 
slotService.getWorkerProfile().getAssignedSlots().length);
+
+        Assertions.assertEquals(1, 
jobMaster.getPhysicalPlan().getPipelineList().size());
+        SubPlan subPlan = jobMaster.getPhysicalPlan().getPipelineList().get(0);
+        try {
+            PhysicalVertex coordinatorVertex1 = 
subPlan.getCoordinatorVertexList().get(0);
+            CoordinatorTask coordinatorTask =
+                    (CoordinatorTask)
+                            
coordinatorVertex1.getTaskGroup().getTasks().stream().findFirst().get();
+            jobMaster
+                    .getCheckpointManager()
+                    .readyToCloseIdleTask(coordinatorTask.getTaskLocation());
+            Assertions.fail("should throw UnsupportedOperationException");
+        } catch (UnsupportedOperationException e) {
+            // ignore
+        }
+
+        Assertions.assertEquals(2, subPlan.getPhysicalVertexList().size());
+        PhysicalVertex taskGroup1 = subPlan.getPhysicalVertexList().get(0);
+        SeaTunnelTask seaTunnelTask =
+                (SeaTunnelTask) 
taskGroup1.getTaskGroup().getTasks().stream().findFirst().get();
+        
jobMaster.getCheckpointManager().readyToCloseIdleTask(seaTunnelTask.getTaskLocation());
+
+        CheckpointCoordinator checkpointCoordinator =
+                jobMaster
+                        .getCheckpointManager()
+                        
.getCheckpointCoordinator(seaTunnelTask.getTaskLocation().getPipelineId());
+        await().atMost(60, TimeUnit.SECONDS)
+                .until(() -> checkpointCoordinator.getClosedIdleTask().size() 
== 2);
+        await().atMost(60, TimeUnit.SECONDS)
+                .until(() -> 
slotService.getWorkerProfile().getAssignedSlots().length == 3);
+    }
+
     private JobMaster newJobInstanceWithRunningState(long jobId) throws 
InterruptedException {
+        return newJobInstanceWithRunningState(jobId, false);
+    }
+
+    private JobMaster newJobInstanceWithRunningState(long jobId, boolean 
restore)
+            throws InterruptedException {
         LogicalDag testLogicalDag =
                 TestUtils.createTestLogicalPlan(
                         "stream_fakesource_to_file.conf", 
"test_clear_coordinator_service", jobId);
@@ -228,6 +297,7 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
                 new JobImmutableInformation(
                         jobId,
                         "Test",
+                        restore,
                         
nodeEngine.getSerializationService().toData(testLogicalDag),
                         testLogicalDag.getJobConfig(),
                         Collections.emptyList(),
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
index 7a2b07a2a3..2cbcf14bd9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
@@ -19,7 +19,7 @@
 ######
 
 env {
-  parallelism = 1
+  parallelism = 2
   job.mode = "STREAMING"
   checkpoint.interval = 5000
 }
@@ -34,7 +34,6 @@ source {
           age = "int"
         }
       }
-      parallelism = 1
     }
 }
 

Reply via email to