This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 454c339b9c [Improve][CDC] Close idle subtasks gorup(reader/writer) in
increment phase (#6526)
454c339b9c is described below
commit 454c339b9c03da0654e743149e0fc8616577d864
Author: hailin0 <[email protected]>
AuthorDate: Wed May 15 13:49:41 2024 +0800
[Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase
(#6526)
---
.../cdc/base/dialect/JdbcDataSourceDialect.java | 4 +-
.../source/enumerator/HybridSplitAssigner.java | 3 +-
.../enumerator/IncrementalSourceEnumerator.java | 12 +++-
.../enumerator/IncrementalSplitAssigner.java | 4 ++
.../source/reader/IncrementalSourceReader.java | 8 ++-
.../seatunnel/cdc/mysql/source/MySqlDialect.java | 19 ++---
.../mysql/source/MysqlPooledDataSourceFactory.java | 36 ----------
.../reader/fetch/binlog/MySqlBinlogFetchTask.java | 20 ++++++
.../seatunnel/cdc/oracle/source/OracleDialect.java | 6 --
.../source/OraclePooledDataSourceFactory.java | 43 ------------
.../cdc/postgres/source/PostgresDialect.java | 6 --
.../source/PostgresPooledDataSourceFactory.java | 35 ---------
.../sqlserver/source/source/SqlServerDialect.java | 6 --
.../source/SqlServerPooledDataSourceFactory.java | 35 ---------
.../fetch/SqlServerSourceFetchTaskContext.java | 21 +++++-
.../common/source/reader/SourceReaderBase.java | 19 +++--
.../seatunnel/http/source/HttpSourceReader.java | 2 +-
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 24 ++++++-
.../src/test/resources/docker/setup.sql | 11 +--
.../src/test/resources/mysqlcdc_to_mysql.conf | 8 +--
.../mysqlcdc_to_mysql_with_custom_primary_key.conf | 4 +-
...ysqlcdc_to_mysql_with_disable_exactly_once.conf | 8 +--
...c_to_mysql_with_multi_table_mode_one_table.conf | 11 +--
...c_to_mysql_with_multi_table_mode_two_table.conf | 11 +--
.../mysqlcdc_to_mysql_with_no_primary_key.conf | 8 +--
.../seatunnel/e2e/connector/http/HttpIT.java | 1 +
.../server/checkpoint/CheckpointBarrier.java | 43 +++++++++---
.../server/checkpoint/CheckpointCoordinator.java | 82 +++++++++++++++++++++-
.../server/checkpoint/CheckpointManager.java | 9 +++
.../engine/server/dag/physical/PhysicalVertex.java | 8 +++
.../seatunnel/engine/server/master/JobMaster.java | 47 +++++++++++++
.../server/serializable/RecordSerializer.java | 8 ++-
.../serializable/TaskDataSerializerHook.java | 5 ++
.../engine/server/task/SeaTunnelTask.java | 3 +-
.../server/task/SinkAggregatedCommitterTask.java | 11 ++-
.../server/task/SourceSplitEnumeratorTask.java | 28 +++++---
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 2 +-
.../task/flow/ShuffleSourceFlowLifeCycle.java | 2 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 2 +-
.../server/task/flow/SourceFlowLifeCycle.java | 2 +-
.../server/task/flow/TransformFlowLifeCycle.java | 2 +-
.../group/queue/IntermediateBlockingQueue.java | 2 +-
.../group/queue/disruptor/RecordEventHandler.java | 2 +-
.../group/queue/disruptor/RecordEventProducer.java | 3 +-
.../operation/source/CloseIdleReaderOperation.java | 72 +++++++++++++++++++
.../source/SourceNoMoreElementOperation.java | 2 +-
.../engine/server/task/record/Barrier.java | 21 ++++++
.../engine/server/master/JobMasterTest.java | 70 ++++++++++++++++++
.../test/resources/stream_fakesource_to_file.conf | 3 +-
49 files changed, 541 insertions(+), 253 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 712328b5f9..4a497daf71 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -71,7 +71,9 @@ public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfi
}
/** Get a connection pool factory to create connection pool. */
- JdbcConnectionPoolFactory getPooledDataSourceFactory();
+ default JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+ throw new UnsupportedOperationException();
+ }
/** Query and build the schema of table. */
TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId
tableId);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
index d6b0bdb96c..4acd092478 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -114,7 +114,8 @@ public class HybridSplitAssigner<C extends SourceConfig>
implements SplitAssigne
@Override
public boolean waitingForCompletedSplits() {
- return snapshotSplitAssigner.waitingForCompletedSplits();
+ return snapshotSplitAssigner.waitingForCompletedSplits()
+ || incrementalSplitAssigner.waitingForAssignedSplits();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index b17b910e5d..87ca7dc507 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -177,8 +177,16 @@ public class IncrementalSourceEnumerator
awaitingReader.remove();
LOG.debug("Assign split {} to subtask {}", sourceSplit,
nextAwaiting);
} else {
- // there is no available splits by now, skip assigning
- break;
+ if (splitAssigner.waitingForCompletedSplits()) {
+ // there is no available splits by now, skip assigning
+ break;
+ } else {
+ LOG.info(
+ "No more splits available, signal no more splits
to subtask {}",
+ nextAwaiting);
+ context.signalNoMoreSplits(nextAwaiting);
+ awaitingReader.remove();
+ }
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index 7b45ee1ef6..1accc47af2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -278,4 +278,8 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
return context.getAssignedSnapshotSplit().isEmpty()
&& context.getSplitCompletedOffsets().isEmpty();
}
+
+ public boolean waitingForAssignedSplits() {
+ return !(splitAssigned && noMoreSplits());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index b5fc443310..abfccdeb75 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -111,7 +111,13 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
context.sendSplitRequest();
needSendSplitRequest.compareAndSet(true, false);
}
- super.pollNext(output);
+
+ if (isNoMoreSplitsAssignment() && isNoMoreElement()) {
+ log.info("Reader {} send NoMoreElement event",
context.getIndexOfSubtask());
+ context.signalNoMoreElement();
+ } else {
+ super.pollNext(output);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index c43b819f06..b450ab84ae 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -34,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.eumerator.MySq
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -70,20 +70,24 @@ public class MySqlDialect implements JdbcDataSourceDialect {
@Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig
sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig))
{
- return isTableIdCaseSensitive(jdbcConnection);
+ return isDataCollectionIdCaseSensitive(jdbcConnection);
} catch (SQLException e) {
throw new SeaTunnelException("Error reading MySQL variables: " +
e.getMessage(), e);
}
}
+ private boolean isDataCollectionIdCaseSensitive(JdbcConnection
jdbcConnection) {
+ return isTableIdCaseSensitive(jdbcConnection);
+ }
+
@Override
- public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
- return new MySqlChunkSplitter(sourceConfig, this);
+ public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
+ return
MySqlConnectionUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
}
@Override
- public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
- return new MysqlPooledDataSourceFactory();
+ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
+ return new MySqlChunkSplitter(sourceConfig, this);
}
@Override
@@ -101,8 +105,7 @@ public class MySqlDialect implements JdbcDataSourceDialect {
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc,
TableId tableId) {
if (mySqlSchema == null) {
mySqlSchema =
- new MySqlSchema(
- sourceConfig,
isDataCollectionIdCaseSensitive(sourceConfig), tableMap);
+ new MySqlSchema(sourceConfig,
isDataCollectionIdCaseSensitive(jdbc), tableMap);
}
return mySqlSchema.getTableSchema(jdbc, tableId);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
deleted file mode 100644
index 9a0ae21b20..0000000000
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MysqlPooledDataSourceFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-
-/** A MySQL datasource factory. */
-public class MysqlPooledDataSourceFactory extends JdbcConnectionPoolFactory {
-
- public static final String JDBC_URL_PATTERN =
-
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
-
- @Override
- public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
- String hostName = sourceConfig.getHostname();
- int port = sourceConfig.getPort();
-
- return String.format(JDBC_URL_PATTERN, hostName, port);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index f858864e78..c61cef79af 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.s
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
@@ -40,12 +41,15 @@ import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
+import lombok.extern.slf4j.Slf4j;
+import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;
+@Slf4j
public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
private final IncrementalSplit split;
private volatile boolean taskRunning = false;
@@ -72,6 +76,22 @@ public class MySqlBinlogFetchTask implements
FetchTask<SourceSplitBase> {
BinlogSplitChangeEventSourceContext changeEventSourceContext =
new BinlogSplitChangeEventSourceContext();
+ sourceFetchContext
+ .getBinaryLogClient()
+ .registerLifecycleListener(
+ new BinaryLogClient.AbstractLifecycleListener() {
+ @Override
+ public void onConnect(BinaryLogClient client) {
+ try {
+ sourceFetchContext.getConnection().close();
+ log.info(
+ "Binlog client connected, closed
idle jdbc connection.");
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
mySqlStreamingChangeEventSource.execute(
changeEventSourceContext,
sourceFetchContext.getOffsetContext());
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
index 5ffef4cc3c..d1908badc0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -90,11 +89,6 @@ public class OracleDialect implements JdbcDataSourceDialect {
return new OracleChunkSplitter(sourceConfig, this);
}
- @Override
- public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
- return new OraclePooledDataSourceFactory();
- }
-
@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
OracleSourceConfig oracleSourceConfig = (OracleSourceConfig)
sourceConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OraclePooledDataSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OraclePooledDataSourceFactory.java
deleted file mode 100644
index a17ac42349..0000000000
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OraclePooledDataSourceFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
-
-import org.apache.commons.lang3.StringUtils;
-
-/** A Oracle datasource factory. */
-public class OraclePooledDataSourceFactory extends JdbcConnectionPoolFactory {
-
- public static final String JDBC_URL_PATTERN = "jdbc:oracle:thin:@%s:%s:%s";
-
- @Override
- public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
- OracleSourceConfig oracleSourceConfig = (OracleSourceConfig)
sourceConfig;
- if (StringUtils.isNotBlank(oracleSourceConfig.getOriginUrl())) {
- return oracleSourceConfig.getOriginUrl();
- } else {
- String hostName = sourceConfig.getHostname();
- int port = sourceConfig.getPort();
- String database = sourceConfig.getDatabaseList().get(0);
- return String.format(JDBC_URL_PATTERN, hostName, port, database);
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
index cf6b624db9..72e8f6724e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
@@ -98,11 +97,6 @@ public class PostgresDialect implements
JdbcDataSourceDialect {
return new PostgresChunkSplitter(sourceConfig, this);
}
- @Override
- public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
- return new PostgresPooledDataSourceFactory();
- }
-
@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig)
sourceConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresPooledDataSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresPooledDataSourceFactory.java
deleted file mode 100644
index e1cfa4e912..0000000000
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresPooledDataSourceFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-
-/** Factory to create {@link JdbcConnectionPoolFactory} for Postgre SQL. */
-public class PostgresPooledDataSourceFactory extends JdbcConnectionPoolFactory
{
-
- private static final String URL_PATTERN = "jdbc:postgresql://%s:%s/%s";
-
- @Override
- public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
- String hostName = sourceConfig.getHostname();
- int port = sourceConfig.getPort();
- String database = sourceConfig.getDatabaseList().get(0);
- return String.format(URL_PATTERN, hostName, port, database);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index e667412378..aa82024b71 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -85,11 +84,6 @@ public class SqlServerDialect implements
JdbcDataSourceDialect {
return new SqlServerChunkSplitter(sourceConfig, this);
}
- @Override
- public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
- return new SqlServerPooledDataSourceFactory();
- }
-
@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig)
sourceConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerPooledDataSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerPooledDataSourceFactory.java
deleted file mode 100644
index 911b9af147..0000000000
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerPooledDataSourceFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
-
-import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
-
-/** Factory to create {@link JdbcConnectionPoolFactory} for SQL Server. */
-public class SqlServerPooledDataSourceFactory extends
JdbcConnectionPoolFactory {
-
- private static final String URL_PATTERN =
"jdbc:sqlserver://%s:%s;databaseName=%s";
-
- @Override
- public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
- String hostName = sourceConfig.getHostname();
- int port = sourceConfig.getPort();
- String database = sourceConfig.getDatabaseList().get(0);
- return String.format(URL_PATTERN, hostName, port, database);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 8178c9f30a..6aadf1aca9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -74,7 +74,7 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
private final SqlServerConnection dataConnection;
- private final SqlServerConnection metadataConnection;
+ private SqlServerConnection metadataConnection;
private final SqlServerEventMetadataProvider metadataProvider;
private SqlServerDatabaseSchema databaseSchema;
@@ -92,7 +92,6 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
super(sourceConfig, dataSourceDialect);
this.dataConnection =
createSqlServerConnection(sourceConfig.getDbzConfiguration());
- this.metadataConnection =
createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}
@@ -162,13 +161,29 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
taskContext, queue, metadataProvider);
this.errorHandler = new
SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
+ if (sourceSplitBase.isIncrementalSplit() || isExactlyOnce()) {
+ initMetadataConnection();
+ }
+ }
+
+ private void initMetadataConnection() {
+ if (this.metadataConnection == null) {
+ synchronized (this) {
+ if (this.metadataConnection == null) {
+ this.metadataConnection =
+
createSqlServerConnection(sourceConfig.getDbzConfiguration());
+ }
+ }
+ }
}
@Override
public void close() {
try {
this.dataConnection.close();
- this.metadataConnection.close();
+ if (this.metadataConnection != null) {
+ this.metadataConnection.close();
+ }
} catch (SQLException e) {
log.warn("Failed to close connection", e);
}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 29dd2ff6f5..7ec6cc83e9 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -65,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
private RecordsWithSplitIds<E> currentFetch;
protected SplitContext<T, SplitStateT> currentSplitContext;
private Collector<T> currentSplitOutput;
- private boolean noMoreSplitsAssignment;
+ @Getter private volatile boolean noMoreSplitsAssignment;
public SourceReaderBase(
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
@@ -94,10 +94,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
if (recordsWithSplitId == null) {
if (Boundedness.BOUNDED.equals(context.getBoundedness())
&& noMoreSplitsAssignment
- && splitFetcherManager.maybeShutdownFinishedFetchers()
- && elementsQueue.isEmpty()) {
+ && isNoMoreElement()) {
context.signalNoMoreElement();
- log.info("Send NoMoreElement event");
+ log.info(
+ "Reader {} into idle state, send NoMoreElement
event",
+ context.getIndexOfSubtask());
}
return;
}
@@ -137,7 +138,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
@Override
public void handleNoMoreSplits() {
- log.info("Reader received NoMoreSplits event.");
+ log.info("Reader {} received NoMoreSplits event.",
context.getIndexOfSubtask());
noMoreSplitsAssignment = true;
}
@@ -146,9 +147,15 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
log.info("Received unhandled source event: {}", sourceEvent);
}
+ protected boolean isNoMoreElement() {
+ return splitFetcherManager.maybeShutdownFinishedFetchers()
+ && elementsQueue.isEmpty()
+ && currentFetch == null;
+ }
+
@Override
public void close() {
- log.info("Closing Source Reader.");
+ log.info("Closing Source Reader {}.", context.getIndexOfSubtask());
try {
splitFetcherManager.close(options.getSourceReaderCloseTimeout());
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index c2c3da6968..104d769ef5 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -130,7 +130,7 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
collect(output, content);
}
}
- log.info(
+ log.debug(
"http client execute success request param:[{}], http
response status code:[{}], content:[{}]",
httpParameter.getParams(),
response.getCode(),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 62dc3f077d..7fab60f9fc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -63,8 +63,8 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
// mysql
private static final String MYSQL_HOST = "mysql_cdc_e2e";
- private static final String MYSQL_USER_NAME = "st_user";
- private static final String MYSQL_USER_PASSWORD = "seatunnel";
+ private static final String MYSQL_USER_NAME = "mysqluser";
+ private static final String MYSQL_USER_PASSWORD = "mysqlpw";
private static final String MYSQL_DATABASE = "mysql_cdc";
private static final String MYSQL_DATABASE2 = "mysql_cdc2";
private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
@@ -357,6 +357,12 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
getSourceQuerySQL(
MYSQL_DATABASE2,
SOURCE_TABLE_1)))));
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .pollInterval(1000, TimeUnit.MILLISECONDS)
+ .until(() -> getConnectionStatus("st_user_source").size() ==
1);
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .pollInterval(1000, TimeUnit.MILLISECONDS)
+ .until(() -> getConnectionStatus("st_user_sink").size() == 1);
Pattern jobIdPattern =
Pattern.compile(
@@ -413,6 +419,13 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
MYSQL_DATABASE2,
SOURCE_TABLE_2)))));
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .pollInterval(1000, TimeUnit.MILLISECONDS)
+ .until(() -> getConnectionStatus("st_user_source").size() ==
1);
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .pollInterval(1000, TimeUnit.MILLISECONDS)
+ .until(() -> getConnectionStatus("st_user_sink").size() == 1);
+
log.info("****************** container logs start ******************");
String containerLogs = container.getServerLogs();
log.info(containerLogs);
@@ -479,6 +492,13 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
MYSQL_CONTAINER.getPassword());
}
+ private List<List<Object>> getConnectionStatus(String user) {
+ return query(
+ "select USER,HOST,DB,COMMAND,TIME,STATE from
information_schema.processlist where USER = '"
+ + user
+ + "'");
+ }
+
private List<List<Object>> query(String sql) {
try (Connection connection = getJdbcConnection()) {
ResultSet resultSet =
connection.createStatement().executeQuery(sql);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
index 0a54c2c282..079b8f1d95 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
@@ -19,13 +19,16 @@
-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 2 users different privileges:
--
--- 1) 'st_user' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
--- 2) 'mysqluser' - all privileges
+-- 1) 'mysqluser' - all privileges
+-- 2) 'st_user_source' - all privileges required by the snapshot reader AND
binlog reader (used for testing)
+-- 3) 'st_user_sink' - all privileges required by the write data (used for
testing)
--
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
DROP, LOCK TABLES ON *.* TO 'st_user'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+CREATE USER 'st_user_source' IDENTIFIED BY 'mysqlpw';
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
DROP, LOCK TABLES ON *.* TO 'st_user_source'@'%';
+CREATE USER 'st_user_sink' IDENTIFIED BY 'mysqlpw';
+GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, INDEX, ALTER ON *.* TO
'st_user_sink'@'%';
--
----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
--
----------------------------------------------------------------------------------------------------------------
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
index a5874db62d..4a352455d9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
@@ -31,8 +31,8 @@ source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
- username = "st_user"
- password = "seatunnel"
+ username = "st_user_source"
+ password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
@@ -55,8 +55,8 @@ sink {
source_table_name = "trans_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
driver = "com.mysql.cj.jdbc.Driver"
- user = "st_user"
- password = "seatunnel"
+ user = "st_user_sink"
+ password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
index ba3e94855f..1d1c1c80c7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
@@ -30,7 +30,7 @@ source {
result_table_name = "customers_mysql_cdc"
server-id = 5652
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
- username = "mysqluser"
+ username = "st_user_source"
password = "mysqlpw"
exactly_once = true
table-names =
["mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key",
"mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"]
@@ -52,7 +52,7 @@ sink {
source_table_name = "customers_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
driver = "com.mysql.cj.jdbc.Driver"
- user = "mysqluser"
+ user = "st_user_sink"
password = "mysqlpw"
database = "mysql_cdc2"
generate_sink_sql = true
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
index 4b91a877d4..fdc4302662 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_disable_exactly_once.conf
@@ -32,8 +32,8 @@ source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
- username = "st_user"
- password = "seatunnel"
+ username = "st_user_source"
+ password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
@@ -48,8 +48,8 @@ sink {
source_table_name = "customers_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
driver = "com.mysql.cj.jdbc.Driver"
- user = "st_user"
- password = "seatunnel"
+ user = "st_user_sink"
+ password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
index 180cedcda8..c382c1c586 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf
@@ -20,7 +20,7 @@
env {
# You can set engine configuration here
- parallelism = 1
+ parallelism = 3
job.mode = "STREAMING"
checkpoint.interval = 5000
}
@@ -28,11 +28,14 @@ env {
source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
- server-id = 5652
- username = "mysqluser"
+ server-id = 5652-5660
+ username = "st_user_source"
password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+ snapshot.split.size = 1
+ snapshot.fetch.size = 1
}
}
@@ -44,7 +47,7 @@ sink {
source_table_name = "customers_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
driver = "com.mysql.cj.jdbc.Driver"
- user = "mysqluser"
+ user = "st_user_sink"
password = "mysqlpw"
database = "mysql_cdc2"
generate_sink_sql = true
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
index f9bc9b1e01..cb10cf2644 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf
@@ -20,7 +20,7 @@
env {
# You can set engine configuration here
- parallelism = 1
+ parallelism = 3
job.mode = "STREAMING"
checkpoint.interval = 5000
}
@@ -28,11 +28,14 @@ env {
source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
- server-id = 5652
- username = "mysqluser"
+ server-id = 5652-5660
+ username = "st_user_source"
password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table",
"mysql_cdc.mysql_cdc_e2e_source_table2"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+
+ snapshot.split.size = 1
+ snapshot.fetch.size = 1
}
}
@@ -44,7 +47,7 @@ sink {
source_table_name = "customers_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
driver = "com.mysql.cj.jdbc.Driver"
- user = "mysqluser"
+ user = "st_user_sink"
password = "mysqlpw"
database = "mysql_cdc2"
generate_sink_sql = true
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
index 46df806ae7..e35403ba23 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_no_primary_key.conf
@@ -29,8 +29,8 @@ source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
- username = "st_user"
- password = "seatunnel"
+ username = "st_user_source"
+ password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table_no_primary_key"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
@@ -43,8 +43,8 @@ sink {
source_table_name = "customers_mysql_cdc"
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
driver = "com.mysql.cj.jdbc.Driver"
- user = "st_user"
- password = "seatunnel"
+ user = "st_user_sink"
+ password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 9dc38cbd1c..f53d8c1d45 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -75,6 +75,7 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
.withEnv(
"MOCKSERVER_INITIALIZATION_JSON_PATH",
TMP_DIR + getMockServerConfig())
+ .withEnv("MOCKSERVER_LOG_LEVEL", "WARN")
.withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
.waitingFor(new
HttpWaitStrategy().forPath("/").forStatusCode(404));
Startables.deepStart(Stream.of(mockserverContainer)).join();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
index 7179cc8cb3..1cf1f8bdfd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java
@@ -18,27 +18,46 @@
package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.google.common.base.Objects;
+import lombok.Getter;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
+@Getter
public class CheckpointBarrier implements Barrier, Serializable {
private final long id;
private final long timestamp;
private final CheckpointType checkpointType;
+ private final Set<TaskLocation> prepareCloseTasks;
+ private final Set<TaskLocation> closedTasks;
public CheckpointBarrier(long id, long timestamp, CheckpointType
checkpointType) {
+ this(id, timestamp, checkpointType, Collections.emptySet(),
Collections.emptySet());
+ }
+
+ public CheckpointBarrier(
+ long id,
+ long timestamp,
+ CheckpointType checkpointType,
+ Set<TaskLocation> prepareCloseTasks,
+ Set<TaskLocation> closedTasks) {
this.id = id;
this.timestamp = timestamp;
this.checkpointType = checkNotNull(checkpointType);
- }
-
- public long getId() {
- return id;
+ this.prepareCloseTasks = prepareCloseTasks;
+ this.closedTasks = closedTasks;
+ if (new HashSet(prepareCloseTasks).removeAll(closedTasks)) {
+ throw new IllegalArgumentException(
+ "The prepareCloseTasks collection should not contain
elements of the closedTasks collection");
+ }
}
@Override
@@ -51,12 +70,17 @@ public class CheckpointBarrier implements Barrier,
Serializable {
return checkpointType.isFinalCheckpoint();
}
- public long getTimestamp() {
- return timestamp;
+ @Override
+ public boolean prepareClose(TaskLocation task) {
+ if (prepareClose()) {
+ return true;
+ }
+ return prepareCloseTasks.contains(task);
}
- public CheckpointType getCheckpointType() {
- return checkpointType;
+ @Override
+ public Set<TaskLocation> closedTasks() {
+ return Collections.unmodifiableSet(closedTasks);
}
@Override
@@ -81,7 +105,8 @@ public class CheckpointBarrier implements Barrier,
Serializable {
@Override
public String toString() {
return String.format(
- "CheckpointBarrier %d @ %d Options: %s", id, timestamp,
checkpointType);
+ "CheckpointBarrier %d @ %d type: %s, prepareClose: %s, closed:
%s",
+ id, timestamp, checkpointType, prepareCloseTasks, closedTasks);
}
public boolean isAuto() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 47392f7a31..12f5acd597 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -55,6 +55,7 @@ import lombok.SneakyThrows;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -111,6 +112,8 @@ public class CheckpointCoordinator {
private final CheckpointPlan plan;
private final Set<TaskLocation> readyToCloseStartingTask;
+ private final Set<TaskLocation> readyToCloseIdleTask;
+ @Getter private final Set<TaskLocation> closedIdleTask;
private final ConcurrentHashMap<Long, PendingCheckpoint>
pendingCheckpoints;
private final ArrayDeque<String> completedCheckpointIds;
@@ -189,6 +192,8 @@ public class CheckpointCoordinator {
this.pipelineTaskStatus = new ConcurrentHashMap<>();
this.checkpointIdCounter = checkpointIdCounter;
this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
+ this.readyToCloseIdleTask = new CopyOnWriteArraySet<>();
+ this.closedIdleTask = new CopyOnWriteArraySet<>();
LOG.info(
"Create CheckpointCoordinator for job({}@{}) with plan({})",
@@ -309,7 +314,11 @@ public class CheckpointCoordinator {
for (int i = tuple.f1();
i < actionState.getParallelism();
i += currentParallelism) {
-
states.add(actionState.getSubtaskStates().get(i));
+ ActionSubtaskState subtaskState =
+
actionState.getSubtaskStates().get(i);
+ if (subtaskState != null) {
+ states.add(subtaskState);
+ }
}
});
}
@@ -397,6 +406,64 @@ public class CheckpointCoordinator {
}
}
+ protected void readyToCloseIdleTask(TaskLocation taskLocation) {
+ if (plan.getStartingSubtasks().contains(taskLocation)) {
+ throw new UnsupportedOperationException("Unsupported close
starting task");
+ }
+
+ LOG.info(
+ "Received close idle task[{}]({}/{}). {}",
+ taskLocation.getTaskID(),
+ taskLocation.getPipelineId(),
+ taskLocation.getJobId(),
+ taskLocation);
+ synchronized (readyToCloseIdleTask) {
+ if (readyToCloseIdleTask.contains(taskLocation)
+ || closedIdleTask.contains(taskLocation)) {
+ LOG.warn(
+ "task[{}]({}/{}) already in closed. {}",
+ taskLocation.getTaskID(),
+ taskLocation.getPipelineId(),
+ taskLocation.getJobId(),
+ taskLocation);
+ return;
+ }
+
+ List<TaskLocation> subTaskList = new ArrayList<>();
+ for (TaskLocation subTask : plan.getPipelineSubtasks()) {
+ if
(subTask.getTaskGroupLocation().equals(taskLocation.getTaskGroupLocation())) {
+ // close all subtask in the same task group
+ subTaskList.add(subTask);
+ LOG.info(
+ "Add task[{}]({}/{}) to prepare close list",
+ subTask.getTaskID(),
+ subTask.getPipelineId(),
+ subTask.getJobId());
+ }
+ }
+ if (subTaskList.size() != 2) {
+ throw new UnsupportedOperationException(
+ "Unsupported close not reader/writer task group: " +
subTaskList);
+ }
+ readyToCloseIdleTask.addAll(subTaskList);
+ tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
+ }
+ }
+
+ protected void completedCloseIdleTask(TaskLocation taskLocation) {
+ synchronized (readyToCloseIdleTask) {
+ if (readyToCloseIdleTask.contains(taskLocation)) {
+ readyToCloseIdleTask.remove(taskLocation);
+ closedIdleTask.add(taskLocation);
+ LOG.info(
+ "Completed close task[{}]({}/{})",
+ taskLocation.getTaskID(),
+ taskLocation.getPipelineId(),
+ taskLocation.getJobId());
+ }
+ }
+ }
+
protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted=
" + alreadyStarted);
errorByPhysicalVertex = new AtomicReference<>();
@@ -553,7 +620,9 @@ public class CheckpointCoordinator {
pendingCheckpoint.getCheckpointId(),
pendingCheckpoint
.getCheckpointTimestamp(),
-
pendingCheckpoint.getCheckpointType()),
+
pendingCheckpoint.getCheckpointType(),
+ new
HashSet<>(readyToCloseIdleTask),
+ new
HashSet<>(closedIdleTask)),
executorService)
.thenApplyAsync(this::triggerCheckpoint,
executorService);
@@ -664,8 +733,8 @@ public class CheckpointCoordinator {
}
private Set<Long> getNotYetAcknowledgedTasks() {
- // TODO: some tasks have completed and don't need to be ack
return plan.getPipelineSubtasks().stream()
+ .filter(e -> !closedIdleTask.contains(e))
.map(TaskLocation::getTaskID)
.collect(Collectors.toCollection(CopyOnWriteArraySet::new));
}
@@ -715,6 +784,8 @@ public class CheckpointCoordinator {
}
pipelineTaskStatus.clear();
readyToCloseStartingTask.clear();
+ readyToCloseIdleTask.clear();
+ closedIdleTask.clear();
pendingCounter.set(0);
schemaChanging.set(false);
scheduler.shutdownNow();
@@ -752,6 +823,11 @@ public class CheckpointCoordinator {
pendingCheckpoint.getCheckpointType().isSavepoint()
? SubtaskStatus.SAVEPOINT_PREPARE_CLOSE
: SubtaskStatus.RUNNING);
+
+ if (ackOperation.getBarrier().getCheckpointType().notFinalCheckpoint()
+ && ackOperation.getBarrier().prepareClose(location)) {
+ completedCloseIdleTask(location);
+ }
}
public synchronized void completePendingCheckpoint(CompletedCheckpoint
completedCheckpoint) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 9ef2e6623b..21c2b90df5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -216,6 +216,15 @@ public class CheckpointManager {
getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
}
+ /**
+ * Called by the {@link SourceSplitEnumeratorTask}. <br>
+ * used by SourceSplitEnumeratorTask to tell CheckpointCoordinator
pipeline will trigger close
+ * barrier of idle task by SourceSplitEnumeratorTask.
+ */
+ public void readyToCloseIdleTask(TaskLocation taskLocation) {
+
getCheckpointCoordinator(taskLocation).readyToCloseIdleTask(taskLocation);
+ }
+
/**
* Called by the JobMaster. <br>
* Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is
used to shut down the
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index f16b0cc6e4..4fbcfa4fa3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
@@ -31,6 +33,7 @@ import
org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskDeployState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -350,6 +353,11 @@ public class PhysicalVertex {
this.connectorJarIdentifiers);
}
+ @VisibleForTesting
+ public TaskGroup getTaskGroup() {
+ return taskGroup;
+ }
+
public synchronized void updateTaskState(@NonNull ExecutionState
targetState) {
try {
ExecutionState current = (ExecutionState)
runningJobStateIMap.get(taskGroupLocation);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 01b2af48a8..ece0c18c93 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -389,6 +389,46 @@ public class JobMaster {
return jobDAGInfo;
}
+ public void releaseTaskGroupResource(
+ PipelineLocation pipelineLocation, TaskGroupLocation
taskGroupLocation) {
+ Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
+ ownedSlotProfilesIMap.get(pipelineLocation);
+ if (taskGroupLocationSlotProfileMap == null) {
+ return;
+ }
+ SlotProfile taskGroupSlotProfile =
taskGroupLocationSlotProfileMap.get(taskGroupLocation);
+ if (taskGroupSlotProfile == null) {
+ return;
+ }
+
+ try {
+ RetryUtils.retryWithException(
+ () -> {
+ LOGGER.info(
+ String.format(
+ "release the task group resource %s",
taskGroupLocation));
+
+ resourceManager
+ .releaseResources(
+ jobImmutableInformation.getJobId(),
+
Collections.singletonList(taskGroupSlotProfile))
+ .join();
+
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception ->
ExceptionUtil.isOperationNeedRetryException(exception),
+ Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ LOGGER.warning(
+ String.format(
+ "release the task group resource failed %s, with
exception: %s ",
+ taskGroupLocation, ExceptionUtils.getMessage(e)));
+ }
+ }
+
public void releasePipelineResource(SubPlan subPlan) {
try {
Map<TaskGroupLocation, SlotProfile>
taskGroupLocationSlotProfileMap =
@@ -663,6 +703,13 @@ public class JobMaster {
task.updateStateByExecutionService(
taskExecutionState);
+ if (taskExecutionState
+ .getExecutionState()
+ .isEndState()) {
+ releaseTaskGroupResource(
+
pipeline.getPipelineLocation(),
+
task.getTaskGroupLocation());
+ }
});
});
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
index 90caab3c18..975810693d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
@@ -45,6 +45,8 @@ public class RecordSerializer implements
StreamSerializer<Record> {
out.writeLong(checkpointBarrier.getId());
out.writeLong(checkpointBarrier.getTimestamp());
out.writeString(checkpointBarrier.getCheckpointType().getName());
+ out.writeObject(checkpointBarrier.getPrepareCloseTasks());
+ out.writeObject(checkpointBarrier.getClosedTasks());
} else if (data instanceof SeaTunnelRow) {
SeaTunnelRow row = (SeaTunnelRow) data;
out.writeByte(RecordDataType.SEATUNNEL_ROW.ordinal());
@@ -67,7 +69,11 @@ public class RecordSerializer implements
StreamSerializer<Record> {
if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
data =
new CheckpointBarrier(
- in.readLong(), in.readLong(),
CheckpointType.fromName(in.readString()));
+ in.readLong(),
+ in.readLong(),
+ CheckpointType.fromName(in.readString()),
+ in.readObject(),
+ in.readObject());
} else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
String tableId = in.readString();
byte rowKind = in.readByte();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 7c298272c7..a926767576 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequest
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import
org.apache.seatunnel.engine.server.task.operation.source.CloseIdleReaderOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
@@ -101,6 +102,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int REPORT_JOB_EVENT = 25;
+ public static final int CLOSE_READER_OPERATION = 26;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
@@ -171,6 +174,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
return new DeleteConnectorJarInExecutionNode();
case REPORT_JOB_EVENT:
return new JobEventReportOperation();
+ case CLOSE_READER_OPERATION:
+ return new CloseIdleReaderOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 3ee636bbc5..d66921c742 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -339,7 +339,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
Integer ackSize =
cycleAcks.compute(barrier.getId(), (id, count) -> count ==
null ? 1 : ++count);
if (ackSize == allCycles.size()) {
- if (barrier.prepareClose()) {
+ cycleAcks.remove(barrier.getId());
+ if (barrier.prepareClose(this.taskLocation)) {
this.prepareCloseStatus = true;
this.prepareCloseBarrierId.set(barrier.getId());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 48c3ca197d..34a14cf585 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -216,6 +216,12 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
}
}
+ private long getClosedWriters(Barrier barrier) {
+ return barrier.closedTasks().stream()
+ .filter(task -> writerAddressMap.containsKey(task.getTaskID()))
+ .count();
+ }
+
@Override
public void triggerBarrier(Barrier barrier) throws Exception {
long startTime = System.currentTimeMillis();
@@ -224,10 +230,11 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
Integer count =
checkpointBarrierCounter.compute(
barrier.getId(), (id, num) -> num == null ? 1 : ++num);
- if (count != maxWriterSize) {
+
+ if (count != (maxWriterSize - getClosedWriters(barrier))) {
return;
}
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.taskLocation)) {
this.prepareCloseStatus = true;
this.prepareCloseBarrierId.set(barrier.getId());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index e95684c1c5..2672dab382 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -32,12 +33,12 @@ import
org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
import
org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
+import
org.apache.seatunnel.engine.server.task.operation.source.CloseIdleReaderOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import com.hazelcast.cluster.Address;
-import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -57,7 +58,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@@ -144,7 +144,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
long startTime = System.currentTimeMillis();
log.debug("split enumer trigger barrier [{}]", barrier);
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.taskLocation)) {
this.prepareCloseTriggered = true;
this.prepareCloseBarrierId.set(barrier.getId());
}
@@ -158,7 +158,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
serialize = enumeratorStateSerializer.serialize(snapshotState);
}
log.debug("source split enumerator send state [{}] to master",
snapshotState);
- sendToAllReader(location -> new BarrierFlowOperation(barrier,
location));
+ sendToActiveReader(barrier);
}
if (barrier.snapshot()) {
this.getExecutionContext()
@@ -276,10 +276,18 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
return enumerator;
}
- public void readerFinished(long taskID) {
- unfinishedReaders.remove(taskID);
+ public void readerFinished(TaskLocation taskLocation) {
+ unfinishedReaders.remove(taskLocation.getTaskID());
if (unfinishedReaders.isEmpty()) {
prepareCloseStatus = true;
+ } else if
(Boundedness.UNBOUNDED.equals(this.source.getSource().getBoundedness())) {
+ log.info(
+ "Send close idle reader {} operation of unbounded job. {}",
+ taskLocation.getTaskIndex(),
+ taskLocation);
+ this.getExecutionContext()
+ .sendToMaster(new CloseIdleReaderOperation(jobID,
taskLocation))
+ .join();
}
}
@@ -348,10 +356,13 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
.collect(Collectors.toSet());
}
- private void sendToAllReader(Function<TaskLocation, Operation> function) {
+ private void sendToActiveReader(Barrier barrier) {
List<InvocationFuture<?>> futures = new ArrayList<>();
taskMemberMapping.forEach(
(location, address) -> {
+ if (barrier.closedTasks().contains(location)) {
+ return;
+ }
log.debug(
"split enumerator send to read--size: {},
location: {}, address: {}",
taskMemberMapping.size(),
@@ -359,7 +370,8 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
address.toString());
futures.add(
this.getExecutionContext()
- .sendToMember(function.apply(location),
address));
+ .sendToMember(
+ new BarrierFlowOperation(barrier,
location), address));
});
futures.forEach(InvocationFuture::join);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 32ec5cb8f4..9bdbd37f80 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -78,7 +78,7 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle
shuffleFlush();
Barrier barrier = (Barrier) record.getData();
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(runningTask.getTaskLocation())) {
prepareClose = true;
}
if (barrier.snapshot()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 2f14c67701..87b29eab21 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -107,7 +107,7 @@ public class ShuffleSourceFlowLifeCycle<T> extends
AbstractFlowLifeCycle
// publish barrier
if (alignedBarriersCounter == shuffles.length) {
- if (barrier.prepareClose()) {
+ if
(barrier.prepareClose(runningTask.getTaskLocation())) {
prepareClose = true;
}
if (barrier.snapshot()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 1a2143b027..48c530a0c3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -183,7 +183,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
long startTime = System.currentTimeMillis();
Barrier barrier = (Barrier) record.getData();
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.taskLocation)) {
prepareClose = true;
}
if (barrier.snapshot()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 64e5bfd22b..83675575b4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -268,7 +268,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
// Block the reader from adding barrier to the collector.
synchronized (collector.getCheckpointLock()) {
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.currentTaskLocation)) {
this.prepareClose = true;
}
if (barrier.snapshot()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index 187aa3659b..0447513b5f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -75,7 +75,7 @@ public class TransformFlowLifeCycle<T> extends
ActionFlowLifeCycle
public void received(Record<?> record) {
if (record.getData() instanceof Barrier) {
CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
prepareClose = true;
}
if (barrier.snapshot()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index f7cd7b0c23..b8e53faabd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -64,7 +64,7 @@ public class IntermediateBlockingQueue extends
AbstractIntermediateQueue<Blockin
if (record.getData() instanceof Barrier) {
CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
getRunningTask().ack(barrier);
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.getRunningTask().getTaskLocation()))
{
getIntermediateQueueFlowLifeCycle().setPrepareClose(true);
}
consumer.accept(record);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
index d525a0b257..6cc5195de5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
@@ -54,7 +54,7 @@ public class RecordEventHandler implements
EventHandler<RecordEvent> {
if (record.getData() instanceof Barrier) {
CheckpointBarrier barrier = (CheckpointBarrier)
record.getData();
runningTask.ack(barrier);
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
this.intermediateQueueFlowLifeCycle.setPrepareClose(true);
}
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
index 021bb8d2f0..ea47f83a79 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
@@ -34,7 +34,8 @@ public class RecordEventProducer {
if (record.getData() instanceof Barrier) {
CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
intermediateQueueFlowLifeCycle.getRunningTask().ack(barrier);
- if (barrier.prepareClose()) {
+ if (barrier.prepareClose(
+
intermediateQueueFlowLifeCycle.getRunningTask().getTaskLocation())) {
intermediateQueueFlowLifeCycle.setPrepareClose(true);
}
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java
new file mode 100644
index 0000000000..abedf1a499
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation.source;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+
+public class CloseIdleReaderOperation extends Operation implements
IdentifiedDataSerializable {
+ private long jobId;
+ private TaskLocation taskLocation;
+
+ public CloseIdleReaderOperation() {}
+
+ public CloseIdleReaderOperation(long jobId, TaskLocation taskLocation) {
+ this.jobId = jobId;
+ this.taskLocation = taskLocation;
+ }
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ server.getCoordinatorService()
+ .getJobMaster(jobId)
+ .getCheckpointManager()
+ .readyToCloseIdleTask(taskLocation);
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ out.writeLong(jobId);
+ out.writeObject(taskLocation);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ jobId = in.readLong();
+ taskLocation = in.readObject();
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.CLOSE_READER_OPERATION;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index fec6afcb91..60b67c0c83 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -57,7 +57,7 @@ public class SourceNoMoreElementOperation extends Operation
implements Identifie
Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
- task.readerFinished(currentTaskID.getTaskID());
+ task.readerFinished(currentTaskID);
Thread.currentThread().setContextClassLoader(oldClassLoader);
return null;
},
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
index 67042247b6..4350c63372 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.engine.server.task.record;
+import org.apache.seatunnel.engine.server.execution.TaskLocation;
+
+import java.util.Set;
+
/** barrier flowing in data flow */
public interface Barrier {
Long PREPARE_CLOSE_BARRIER_ID = Long.MAX_VALUE;
@@ -32,4 +36,21 @@ public interface Barrier {
/** Barrier indicating that the task should prepare to close. */
boolean prepareClose();
+
+ /**
+ * Barrier indicating that the task should prepare to close.
+ *
+ * @param task task location
+ * @return If the task is included, the return true
+ */
+ default boolean prepareClose(TaskLocation task) {
+ return prepareClose();
+ }
+
+ /**
+ * Indicates a list of tasks that have been closed.
+ *
+ * @return
+ */
+ Set<TaskLocation> closedTasks();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index dd886d923e..cbfde91b37 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -28,9 +28,15 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotService;
+import org.apache.seatunnel.engine.server.task.CoordinatorTask;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -219,7 +225,70 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
Assertions.assertTrue(jobMaster.isNeedRestore());
}
+ @Test
+ public void testCloseIdleTask() throws InterruptedException {
+ long jobId =
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
+ Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus());
+
+ assertCloseIdleTask(jobMaster);
+
+ server.getCoordinatorService().savePoint(jobId);
+ server.getCoordinatorService().getJobStatus(jobId);
+ await().atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ JobStatus jobStatus =
+
server.getCoordinatorService().getJobStatus(jobId);
+ Assertions.assertEquals(JobStatus.SAVEPOINT_DONE,
jobStatus);
+ });
+ jobMaster = newJobInstanceWithRunningState(jobId, true);
+ Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus());
+
+ assertCloseIdleTask(jobMaster);
+ }
+
+ private void assertCloseIdleTask(JobMaster jobMaster) {
+ SlotService slotService = server.getSlotService();
+ Assertions.assertEquals(4,
slotService.getWorkerProfile().getAssignedSlots().length);
+
+ Assertions.assertEquals(1,
jobMaster.getPhysicalPlan().getPipelineList().size());
+ SubPlan subPlan = jobMaster.getPhysicalPlan().getPipelineList().get(0);
+ try {
+ PhysicalVertex coordinatorVertex1 =
subPlan.getCoordinatorVertexList().get(0);
+ CoordinatorTask coordinatorTask =
+ (CoordinatorTask)
+
coordinatorVertex1.getTaskGroup().getTasks().stream().findFirst().get();
+ jobMaster
+ .getCheckpointManager()
+ .readyToCloseIdleTask(coordinatorTask.getTaskLocation());
+ Assertions.fail("should throw UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // ignore
+ }
+
+ Assertions.assertEquals(2, subPlan.getPhysicalVertexList().size());
+ PhysicalVertex taskGroup1 = subPlan.getPhysicalVertexList().get(0);
+ SeaTunnelTask seaTunnelTask =
+ (SeaTunnelTask)
taskGroup1.getTaskGroup().getTasks().stream().findFirst().get();
+
jobMaster.getCheckpointManager().readyToCloseIdleTask(seaTunnelTask.getTaskLocation());
+
+ CheckpointCoordinator checkpointCoordinator =
+ jobMaster
+ .getCheckpointManager()
+
.getCheckpointCoordinator(seaTunnelTask.getTaskLocation().getPipelineId());
+ await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> checkpointCoordinator.getClosedIdleTask().size()
== 2);
+ await().atMost(60, TimeUnit.SECONDS)
+ .until(() ->
slotService.getWorkerProfile().getAssignedSlots().length == 3);
+ }
+
private JobMaster newJobInstanceWithRunningState(long jobId) throws
InterruptedException {
+ return newJobInstanceWithRunningState(jobId, false);
+ }
+
+ private JobMaster newJobInstanceWithRunningState(long jobId, boolean
restore)
+ throws InterruptedException {
LogicalDag testLogicalDag =
TestUtils.createTestLogicalPlan(
"stream_fakesource_to_file.conf",
"test_clear_coordinator_service", jobId);
@@ -228,6 +297,7 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
new JobImmutableInformation(
jobId,
"Test",
+ restore,
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList(),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
index 7a2b07a2a3..2cbcf14bd9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
@@ -19,7 +19,7 @@
######
env {
- parallelism = 1
+ parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 5000
}
@@ -34,7 +34,6 @@ source {
age = "int"
}
}
- parallelism = 1
}
}