This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a18fca8006 [Fix][Connector-tdengine] Fix sql exception and
concurrentmodifyexception when connect to taos and read data
a18fca8006 is described below
commit a18fca80061925ba3a2c9bd152b450dcf3d9308b
Author: Alex Ting <[email protected]>
AuthorDate: Thu Aug 8 11:22:41 2024 +0800
[Fix][Connector-tdengine] Fix sql exception and concurrentmodifyexception
when connect to taos and read data
---
.../tdengine/config/TDengineSourceConfig.java | 7 +-
.../seatunnel/tdengine/source/TDengineSource.java | 56 +++++-----
.../tdengine/source/TDengineSourceReader.java | 95 +++++++---------
.../source/TDengineSourceSplitEnumerator.java | 122 +++++++++++---------
.../tdengine/state/TDengineSourceState.java | 20 ++--
.../tdengine/source/TDengineSourceReaderTest.java | 124 +++++++++++++++++++++
6 files changed, 279 insertions(+), 145 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
index 0908c73387..4eabb754cf 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -30,7 +30,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengine
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
@Data
@@ -54,7 +53,10 @@ public class TDengineSourceConfig implements Serializable {
public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
- tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ?
pluginConfig.getString(URL) : null);
+ tdengineSourceConfig.setUrl(
+ pluginConfig.hasPath(ConfigNames.URL)
+ ? pluginConfig.getString(ConfigNames.URL)
+ : null);
tdengineSourceConfig.setDatabase(
pluginConfig.hasPath(DATABASE) ?
pluginConfig.getString(DATABASE) : null);
tdengineSourceConfig.setStable(
@@ -69,6 +71,7 @@ public class TDengineSourceConfig implements Serializable {
pluginConfig.hasPath(LOWER_BOUND) ?
pluginConfig.getString(LOWER_BOUND) : null);
tdengineSourceConfig.setTimezone(
pluginConfig.hasPath(TIMEZONE) ?
pluginConfig.getString(TIMEZONE) : "UTC");
+
return tdengineSourceConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
index 2f2e6a3f98..e72773781a 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -40,6 +40,7 @@ import
org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTyp
import org.apache.commons.lang3.ArrayUtils;
import com.google.auto.service.AutoService;
+import com.taosdata.jdbc.TSDBDriver;
import lombok.SneakyThrows;
import java.sql.Connection;
@@ -49,6 +50,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
@@ -127,42 +129,36 @@ public class TDengineSource
List<String> fieldNames = new ArrayList<>();
List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>();
- String jdbcUrl =
- String.join(
- "",
- config.getUrl(),
- config.getDatabase(),
- "?user=",
- config.getUsername(),
- "&password=",
- config.getPassword());
+ String jdbcUrl = String.join("", config.getUrl(),
config.getDatabase());
+
// check td driver whether exist and if not, try to register
checkDriverExist(jdbcUrl);
- try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
- try (Statement statement = conn.createStatement()) {
+
+ Properties properties = new Properties();
+ properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
+ properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
+ String metaSQL =
+ String.format(
+ "select table_name from information_schema.ins_tables
where db_name = '%s' and stable_name='%s'",
+ config.getDatabase(), config.getStable());
+ try (Connection conn = DriverManager.getConnection(jdbcUrl,
properties);
+ Statement statement = conn.createStatement();
ResultSet metaResultSet =
statement.executeQuery(
- "desc " + config.getDatabase() + "." +
config.getStable());
- while (metaResultSet.next()) {
- if (timestampFieldName == null) {
- timestampFieldName = metaResultSet.getString(1);
- }
- fieldNames.add(metaResultSet.getString(1));
-
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
+ String.format(
+ "desc %s.%s", config.getDatabase(),
config.getStable()));
+ ResultSet subTableNameResultSet =
statement.executeQuery(metaSQL)) {
+ while (metaResultSet.next()) {
+ if (timestampFieldName == null) {
+ timestampFieldName = metaResultSet.getString(1);
}
+ fieldNames.add(metaResultSet.getString(1));
+
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
}
- try (Statement statement = conn.createStatement()) {
- String metaSQL =
- "select table_name from information_schema.ins_tables
where db_name = '"
- + config.getDatabase()
- + "' and stable_name='"
- + config.getStable()
- + "';";
- ResultSet subTableNameResultSet =
statement.executeQuery(metaSQL);
- while (subTableNameResultSet.next()) {
- String subTableName = subTableNameResultSet.getString(1);
- subTableNames.add(subTableName);
- }
+
+ while (subTableNameResultSet.next()) {
+ String subTableName = subTableNameResultSet.getString(1);
+ subTableNames.add(subTableName);
}
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
index 6782f085bd..bb4184702d 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
-import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -25,9 +24,6 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
-import org.apache.commons.lang3.StringUtils;
-
-import com.google.common.collect.Sets;
import com.taosdata.jdbc.TSDBDriver;
import lombok.extern.slf4j.Slf4j;
@@ -39,84 +35,76 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
@Slf4j
public class TDengineSourceReader implements SourceReader<SeaTunnelRow,
TDengineSourceSplit> {
-
- private static final long THREAD_WAIT_TIME = 500L;
-
private final TDengineSourceConfig config;
- private final Set<TDengineSourceSplit> sourceSplits;
+ private final Deque<TDengineSourceSplit> sourceSplits;
private final Context context;
private Connection conn;
+ private volatile boolean noMoreSplit;
+
public TDengineSourceReader(TDengineSourceConfig config,
SourceReader.Context readerContext) {
this.config = config;
- this.sourceSplits = Sets.newHashSet();
+ this.sourceSplits = new ConcurrentLinkedDeque<>();
this.context = readerContext;
}
@Override
public void pollNext(Collector<SeaTunnelRow> collector) throws
InterruptedException {
- if (sourceSplits.isEmpty()) {
- Thread.sleep(THREAD_WAIT_TIME);
- return;
- }
synchronized (collector.getCheckpointLock()) {
- sourceSplits.forEach(
- split -> {
- try {
- read(split, collector);
- } catch (Exception e) {
- throw new TDengineConnectorException(
-
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
- "TDengine split read error",
- e);
- }
- });
- }
-
- if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- // signal to the source that we have reached the end of the data.
- log.info("Closed the bounded TDengine source");
- context.signalNoMoreElement();
+ log.info("polling new split from queue!");
+ TDengineSourceSplit split = sourceSplits.poll();
+ if (Objects.nonNull(split)) {
+ log.info(
+ "starting run new split {}, query sql: {}!",
+ split.splitId(),
+ split.getQuery());
+ try {
+ read(split, collector);
+ } catch (Exception e) {
+ throw new TDengineConnectorException(
+ CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
+ "TDengine split read error",
+ e);
+ }
+ } else if (noMoreSplit && sourceSplits.isEmpty()) {
+ // signal to the source that we have reached the end of the
data.
+ log.info("Closed the bounded TDengine source");
+ context.signalNoMoreElement();
+ } else {
+ Thread.sleep(1000L);
+ }
}
}
@Override
public void open() {
- String jdbcUrl =
- StringUtils.join(
- config.getUrl(),
- config.getDatabase(),
- "?user=",
- config.getUsername(),
- "&password=",
- config.getPassword());
- Properties connProps = new Properties();
- // todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
- // there is a exception : Caused by: java.sql.SQLException: can't
create connection with
- // server
- // under docker network env
- // @bobo (tdengine)
- connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
+ String jdbcUrl = config.getUrl();
+
+ Properties properties = new Properties();
+ properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
+ properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
+
try {
- // check td driver whether exist and if not, try to register
checkDriverExist(jdbcUrl);
- conn = DriverManager.getConnection(jdbcUrl, connProps);
+ conn = DriverManager.getConnection(jdbcUrl, properties);
} catch (SQLException e) {
throw new TDengineConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
- "get TDengine connection failed:" + jdbcUrl);
+ "get TDengine connection failed:" + jdbcUrl,
+ e);
}
}
@@ -135,8 +123,8 @@ public class TDengineSourceReader implements
SourceReader<SeaTunnelRow, TDengine
}
private void read(TDengineSourceSplit split, Collector<SeaTunnelRow>
output) throws Exception {
- try (Statement statement = conn.createStatement()) {
- final ResultSet resultSet =
statement.executeQuery(split.getQuery());
+ try (Statement statement = conn.createStatement();
+ ResultSet resultSet =
statement.executeQuery(split.getQuery())) {
ResultSetMetaData meta = resultSet.getMetaData();
while (resultSet.next()) {
@@ -151,6 +139,8 @@ public class TDengineSourceReader implements
SourceReader<SeaTunnelRow, TDengine
}
private Object convertDataType(Object object) {
+ if (Objects.isNull(object)) return null;
+
if (Timestamp.class.equals(object.getClass())) {
return ((Timestamp) object).toLocalDateTime();
} else if (byte[].class.equals(object.getClass())) {
@@ -171,7 +161,8 @@ public class TDengineSourceReader implements
SourceReader<SeaTunnelRow, TDengine
@Override
public void handleNoMoreSplits() {
- // do nothing
+ log.info("no more split accepted!");
+ noMoreSplit = true;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
index d5787ba557..911a9a6ec1 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
@@ -17,28 +17,34 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
-import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+@Slf4j
public class TDengineSourceSplitEnumerator
implements SourceSplitEnumerator<TDengineSourceSplit,
TDengineSourceState> {
private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
private final TDengineSourceConfig config;
private final StableMetadata stableMetadata;
- private Set<TDengineSourceSplit> pendingSplit = new HashSet<>();
- private Set<TDengineSourceSplit> assignedSplit = new HashSet<>();
+ private volatile boolean shouldEnumerate;
+ private final Object stateLock = new Object();
+ private final Map<Integer, List<TDengineSourceSplit>> pendingSplits = new
ConcurrentHashMap<>();
public TDengineSourceSplitEnumerator(
StableMetadata stableMetadata,
@@ -55,8 +61,10 @@ public class TDengineSourceSplitEnumerator
this.config = config;
this.context = context;
this.stableMetadata = stableMetadata;
+ this.shouldEnumerate = sourceState == null;
if (sourceState != null) {
- this.assignedSplit = sourceState.getAssignedSplit();
+ this.shouldEnumerate = sourceState.isShouldEnumerate();
+ this.pendingSplits.putAll(sourceState.getPendingSplits());
}
}
@@ -69,16 +77,33 @@ public class TDengineSourceSplitEnumerator
@Override
public void run() {
- pendingSplit = getAllSplits();
- assignSplit(context.registeredReaders());
+ Set<Integer> readers = context.registeredReaders();
+ if (shouldEnumerate) {
+ List<TDengineSourceSplit> newSplits = discoverySplits();
+
+ synchronized (stateLock) {
+ addPendingSplit(newSplits);
+ shouldEnumerate = false;
+ }
+
+ assignSplit(readers);
+ }
+
+ log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to
reader {}.", readers);
+ readers.forEach(context::signalNoMoreSplits);
}
- /*
- * each split has one sub table
- */
- private Set<TDengineSourceSplit> getAllSplits() {
+ private void addPendingSplit(List<TDengineSourceSplit> newSplits) {
+ int readerCount = context.currentParallelism();
+ for (TDengineSourceSplit split : newSplits) {
+ int ownerReader = getSplitOwner(split.splitId(), readerCount);
+ pendingSplits.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).add(split);
+ }
+ }
+
+ private List<TDengineSourceSplit> discoverySplits() {
final String timestampFieldName =
stableMetadata.getTimestampFieldName();
- final Set<TDengineSourceSplit> splits = new HashSet<>();
+ final List<TDengineSourceSplit> splits = new ArrayList<>();
for (String subTableName : stableMetadata.getSubTableNames()) {
TDengineSourceSplit splitBySubTable =
createSplitBySubTable(subTableName, timestampFieldName);
@@ -92,9 +117,11 @@ public class TDengineSourceSplitEnumerator
String selectFields =
Arrays.stream(stableMetadata.getRowType().getFieldNames())
.skip(1)
+ .map(name -> String.format("`%s`", name))
.collect(Collectors.joining(","));
String subTableSQL =
- "select " + selectFields + " from " + config.getDatabase() +
"." + subTableName;
+ String.format(
+ "select %s from %s.`%s`", selectFields,
config.getDatabase(), subTableName);
String start = config.getLowerBound();
String end = config.getUpperBound();
if (start != null || end != null) {
@@ -116,69 +143,64 @@ public class TDengineSourceSplitEnumerator
@Override
public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId)
{
+ log.info("Add back splits {} to TDengineSourceSplitEnumerator.",
splits);
if (!splits.isEmpty()) {
- pendingSplit.addAll(splits);
+ addPendingSplit(splits);
assignSplit(Collections.singletonList(subtaskId));
}
}
@Override
public int currentUnassignedSplitSize() {
- return pendingSplit.size();
+ return pendingSplits.size();
}
@Override
public void registerReader(int subtaskId) {
- if (!pendingSplit.isEmpty()) {
+ log.info("Register reader {} to TDengineSourceSplitEnumerator.",
subtaskId);
+ if (!pendingSplits.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
}
- private void assignSplit(Collection<Integer> taskIDList) {
- assignedSplit =
- pendingSplit.stream()
- .map(
- split -> {
- int splitOwner =
- getSplitOwner(
- split.splitId(),
context.currentParallelism());
- if (taskIDList.contains(splitOwner)) {
- context.assignSplit(splitOwner, split);
- return split;
- } else {
- return null;
- }
- })
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
- pendingSplit.clear();
+ private void assignSplit(Collection<Integer> readers) {
+ log.info("Assign pendingSplits to readers {}", readers);
+
+ for (int reader : readers) {
+ List<TDengineSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
+ if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
+ log.info("Assign splits {} to reader {}", assignmentForReader,
reader);
+ try {
+ context.assignSplit(reader, assignmentForReader);
+ } catch (Exception e) {
+ log.error(
+ "Failed to assign splits {} to reader {}",
+ assignmentForReader,
+ reader,
+ e);
+ pendingSplits.put(reader, assignmentForReader);
+ }
+ }
+ }
}
@Override
public TDengineSourceState snapshotState(long checkpointId) {
- return new TDengineSourceState(assignedSplit);
- }
-
- @Override
- public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
- SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- // nothing to do
+ synchronized (stateLock) {
+ return new TDengineSourceState(shouldEnumerate, pendingSplits);
+ }
}
@Override
- public void notifyCheckpointAborted(long checkpointId) throws Exception {
- SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId);
- }
+ public void notifyCheckpointComplete(long checkpointId) {}
@Override
public void close() {}
@Override
public void handleSplitRequest(int subtaskId) {
- // nothing to do
+ throw new TDengineConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ String.format("Unsupported handleSplitRequest: %d",
subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
index fc839682a9..4832cd398f 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
@@ -19,18 +19,16 @@ package
org.apache.seatunnel.connectors.seatunnel.tdengine.state;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
import java.io.Serializable;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
+@AllArgsConstructor
+@Getter
public class TDengineSourceState implements Serializable {
-
- private final Set<TDengineSourceSplit> assignedSplit;
-
- public TDengineSourceState(Set<TDengineSourceSplit> assignedSplit) {
- this.assignedSplit = assignedSplit;
- }
-
- public Set<TDengineSourceSplit> getAssignedSplit() {
- return assignedSplit;
- }
+ private boolean shouldEnumerate;
+ private final Map<Integer, List<TDengineSourceSplit>> pendingSplits;
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
new file mode 100644
index 0000000000..abd42fefe1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+class TDengineSourceReaderTest {
+ Logger logger;
+ TDengineSourceReader tDengineSourceReader;
+
+ @BeforeEach
+ void setup() {
+ tDengineSourceReader = new TDengineSourceReader(null, null);
+
+ List<TDengineSourceSplit> sourceSplits = new ArrayList<>();
+ int splitCnt = 100;
+ for (int i = 0; i < splitCnt; i++) {
+ sourceSplits.add(new TDengineSourceSplit(Integer.toString(i),
"select sever_status()"));
+ }
+
+ tDengineSourceReader.addSplits(sourceSplits);
+
+ logger = Logger.getLogger("TDengineSourceReaderTest");
+ }
+
+ @Test
+ void testPoll() throws InterruptedException {
+ TestCollector testCollector = new TestCollector();
+
+ int totalSplitCnt = 150;
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
+ pool.execute(
+ () -> {
+ for (int i = 0; i < totalSplitCnt; i++) {
+ try {
+ tDengineSourceReader.pollNext(testCollector);
+ Thread.sleep(new Random().nextInt(5));
+ } catch (TDengineConnectorException e) {
+ logger.info("skip create connection!");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ int newSplitCnt = 50;
+ int threadCnt = 3;
+ for (int i = 0; i < threadCnt; i++) {
+ pool.execute(
+ () -> {
+ for (int idx = 0; idx < newSplitCnt; idx++) {
+ logger.info(
+ String.format(
+ "%s receive new split",
+ Thread.currentThread().getName()));
+ tDengineSourceReader.addSplits(
+ Collections.singletonList(
+ new TDengineSourceSplit(
+ String.format(
+ "new_%s",
+
Thread.currentThread().getName() + idx),
+ "select
server_status()")));
+ try {
+ Thread.sleep(new Random().nextInt(5));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ pool.awaitTermination(3, TimeUnit.SECONDS);
+ }
+
+ private static class TestCollector implements Collector<SeaTunnelRow> {
+
+ private final List<SeaTunnelRow> rows = new ArrayList<>();
+
+ public List<SeaTunnelRow> getRows() {
+ return rows;
+ }
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ rows.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return new Object();
+ }
+ }
+}