This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b136a0dc43 [Feature][connector-tdengine] Support subtable and
fieldNames in tdengine source (#9593)
b136a0dc43 is described below
commit b136a0dc438c668931f81553f01fcc880155da6e
Author: jiachuan.zhu <[email protected]>
AuthorDate: Mon Jul 28 15:29:29 2025 +0800
[Feature][connector-tdengine] Support subtable and fieldNames in tdengine
source (#9593)
---
docs/en/connector-v2/sink/TDengine.md | 21 +++--
docs/en/connector-v2/source/TDengine.md | 31 +++++--
docs/zh/connector-v2/sink/TDengine.md | 9 +-
docs/zh/connector-v2/source/TDengine.md | 102 +++++++++++++++++++++
.../tdengine/config/TDengineSinkConfig.java | 8 +-
.../tdengine/config/TDengineSinkOptions.java | 11 +++
.../tdengine/config/TDengineSourceConfig.java | 17 +++-
.../tdengine/config/TDengineSourceOptions.java | 19 ++++
.../tdengine/sink/TDengineSinkWriter.java | 6 +-
.../seatunnel/tdengine/source/TDengineSource.java | 14 ++-
.../tdengine/source/TDengineSourceReaderTest.java | 76 +++++++++++++++
.../e2e/connector/tdengine/TDengineIT.java | 27 +++++-
...engine_source_to_sink_filter_by_fieldNames.conf | 53 +++++++++++
13 files changed, 365 insertions(+), 29 deletions(-)
diff --git a/docs/en/connector-v2/sink/TDengine.md
b/docs/en/connector-v2/sink/TDengine.md
index 8e7fac69c0..55755c1696 100644
--- a/docs/en/connector-v2/sink/TDengine.md
+++ b/docs/en/connector-v2/sink/TDengine.md
@@ -15,14 +15,15 @@ Used to write data to TDengine. You need to create stable
before running seatunn
## Options
-| name | type | required | default value |
-|----------|--------|----------|---------------|
-| url | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| database | string | yes | |
-| stable | string | yes | - |
-| timezone | string | no | UTC |
+| name | type | required | default value |
+|--------------|--------|----------|---------------|
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | |
+| stable | string | yes | - |
+| timezone | string | no | UTC |
+| write_columns| list | no | - |
### url [string]
@@ -54,6 +55,9 @@ the stable of the TDengine when you select
the timeznoe of the TDengine sever, it's important to the ts field
+### write_columns [list]
+The field names to be inserted into TDengine. If not set, all fields will be
written. The plugin will automatically append TAGS columns, so please do not
include TAGS columns in this option.
+
## Example
### sink
@@ -67,6 +71,7 @@ sink {
database : "power2"
stable : "meters2"
timezone: UTC
+ write_columns: ["ts", "voltage", "current", "power"]
}
}
```
diff --git a/docs/en/connector-v2/source/TDengine.md
b/docs/en/connector-v2/source/TDengine.md
index 2f82882706..d5e4ee1d50 100644
--- a/docs/en/connector-v2/source/TDengine.md
+++ b/docs/en/connector-v2/source/TDengine.md
@@ -22,15 +22,17 @@ supports query SQL and can achieve projection effect.
## Options
-| name | type | required | default value |
-|-------------|--------|----------|---------------|
-| url | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| database | string | yes | |
-| stable | string | yes | - |
-| lower_bound | long | yes | - |
-| upper_bound | long | yes | - |
+| name | type | required | default value |
+|--------------|--------|----------|---------------|
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | |
+| stable | string | yes | - |
+| sub_tables | list | no | - |
+| lower_bound | long | yes | - |
+| upper_bound | long | yes | - |
+| read_columns | list | no | - |
### url [string]
@@ -58,6 +60,9 @@ the database of the TDengine when you select
the stable of the TDengine when you select
+### sub_tables [list]
+A list of sub_table names. If not specified, all sub-tables will be selected.
If specified, only the specified sub-tables will be selected.
+
### lower_bound [long]
the lower_bound of the migration period
@@ -66,6 +71,10 @@ the lower_bound of the migration period
the upper_bound of the migration period
+### read_columns [list]
+A list of column names to read. If not specified, all columns will be
selected.
+When reading from a super table, please make sure to put the TAGS columns at
the end of the list.
+
## Example
### source
@@ -78,9 +87,11 @@ source {
password : "taosdata"
database : "power"
stable : "meters"
+ sub_tables : ["meter_1","meter_2"]
lower_bound : "2018-10-03 14:38:05.000"
upper_bound : "2018-10-03 14:38:16.800"
- plugin_output = "tdengine_result"
+ plugin_output : "tdengine_result"
+ read_columns : ["ts","voltage","current","power"]
}
}
```
diff --git a/docs/zh/connector-v2/sink/TDengine.md
b/docs/zh/connector-v2/sink/TDengine.md
index 98a03d54f1..bf12253de9 100644
--- a/docs/zh/connector-v2/sink/TDengine.md
+++ b/docs/zh/connector-v2/sink/TDengine.md
@@ -15,7 +15,7 @@ import ChangeLog from '../changelog/connector-tdengine.md';
## 选项
-| 名称 | 类型 | 是否必传 | 默认值 |
+| 名称 | 类型 | 是否必传 | 默认值 |
|----------|--------|----------|---------------|
| url | string | 是 | - |
| username | string | 是 | - |
@@ -23,6 +23,7 @@ import ChangeLog from '../changelog/connector-tdengine.md';
| database | string | 是 | |
| stable | string | 是 | - |
| timezone | string | 否 | UTC |
+| write_columns | list | 否 | - |
### url [string]
@@ -54,6 +55,10 @@ TDengine的超级表
TDengine服务器的时间,对ts字段很重要
+### write_columns [list]
+TDengine的写入列,默认为所有列。无需包含 TAGS 字段,插件会自动处理 TAGS 字段的写入。
+
+
## 示例
### sink
@@ -67,10 +72,12 @@ sink {
database : "power2"
stable : "meters2"
timezone: UTC
+ write_columns: ["ts", "voltage", "current", "power"]
}
}
```
+
## 变更日志
<ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/TDengine.md
b/docs/zh/connector-v2/source/TDengine.md
new file mode 100644
index 0000000000..2bd543a60c
--- /dev/null
+++ b/docs/zh/connector-v2/source/TDengine.md
@@ -0,0 +1,102 @@
+import ChangeLog from '../changelog/connector-tdengine.md';
+
+# TDengine
+
+> TDengine 源端连接器
+
+## 描述
+
+通过 TDengine 读取外部数据源的数据。
+
+## 主要特性
+
+- [x] [批处理](../../concept/connector-v2-features.md)
+- [ ] [流式](../../concept/connector-v2-features.md)
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [列投影](../../concept/connector-v2-features.md)
+
+支持查询 SQL,并可实现投影效果。
+
+- [x] [并行度](../../concept/connector-v2-features.md)
+- [ ] [支持用户自定义分片](../../concept/connector-v2-features.md)
+
+## 配置项
+
+| 名称 | 类型 | 必填 | 默认值 |
+|----------------|--------|------|----------------|
+| url | string | 是 | - |
+| username | string | 是 | - |
+| password | string | 是 | - |
+| database | string | 是 | |
+| stable | string | 是 | - |
+| sub_tables | list | 否 | - |
+| lower_bound | long | 是 | - |
+| upper_bound | long | 是 | - |
+| read_columns | list | 否 | - |
+
+### url [string]
+
+选择 TDengine 时的连接 URL
+
+例如:
+
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+选择 TDengine 时的用户名
+
+### password [string]
+
+选择 TDengine 时的密码
+
+### database [string]
+
+选择 TDengine 时的数据库名
+
+### stable [string]
+
+选择 TDengine 时的超级表名
+
+### sub_tables [list]
+
+TDengine 的子表名。如果不指定,则会选择所有子表;如果指定,则只选择指定的子表。
+
+### lower_bound [long]
+
+迁移时间段的下界
+
+### upper_bound [long]
+
+迁移时间段的上界
+
+### read_columns [list]
+
+选择 TDengine 时的列名。如果不指定,则选择所有字段;如果指定,则只选择指定的字段。读取超级表时,请包含TAGS 字段,并放在末尾。
+
+## 示例
+
+### source 配置示例
+
+```hocon
+source {
+ TDengine {
+ url : "jdbc:TAOS-RS://localhost:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power"
+ stable : "meters"
+ sub_tables : ["meter_1","meter_2"]
+ lower_bound : "2018-10-03 14:38:05.000"
+ upper_bound : "2018-10-03 14:38:16.800"
+ plugin_output = "tdengine_result"
+ read_columns : ["ts","voltage","current","power"]
+ }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
index b2892c61d6..cac4d5fe78 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
@@ -23,6 +23,7 @@ import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
+import java.util.List;
import java.util.Optional;
@Data
@@ -37,6 +38,7 @@ public class TDengineSinkConfig implements Serializable {
private String database;
private String stable;
private String timezone;
+ private String writeColumns;
public static TDengineSinkConfig of(ReadonlyConfig config) {
Builder builder = TDengineSinkConfig.builder();
@@ -50,7 +52,11 @@ public class TDengineSinkConfig implements Serializable {
Optional<String> optionalTimezone =
config.getOptional(TDengineSinkOptions.TIMEZONE);
builder.timezone(optionalTimezone.orElseGet(TDengineSinkOptions.TIMEZONE::defaultValue));
-
+ Optional<List<String>> optionalWriteColumns =
+ config.getOptional(TDengineSinkOptions.WRITE_COLUMNS);
+ if (optionalWriteColumns.isPresent()) {
+ builder.writeColumns(String.join(",", optionalWriteColumns.get()));
+ }
return builder.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
index 7f64177656..2d112bf644 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.configuration.Options;
import lombok.AllArgsConstructor;
import lombok.Data;
+import java.util.List;
+
@Data
@AllArgsConstructor
public class TDengineSinkOptions extends TDengineCommonOptions {
@@ -32,4 +34,13 @@ public class TDengineSinkOptions extends
TDengineCommonOptions {
.stringType()
.defaultValue("UTC")
.withDescription("The timezone used for timestamp
conversion, default is UTC");
+
+ public static final Option<List<String>> WRITE_COLUMNS =
+ Options.key("write_columns")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "The field names to be written to TDengine "
+ + "If not specified, all fields will be
written. "
+ + "This option is useful when the source
schema does not match the TDengine table schema.");
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
index 5ea375f558..f283bbf72e 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -23,6 +23,8 @@ import lombok.Data;
import java.io.Serializable;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
@Data
public class TDengineSourceConfig implements Serializable {
@@ -36,8 +38,9 @@ public class TDengineSourceConfig implements Serializable {
private String stable;
private String lowerBound;
private String upperBound;
- private List<String> fields;
private List<String> tags;
+ private Set<String> subTables;
+ private Set<String> readColumns;
public static TDengineSourceConfig buildSourceConfig(ReadonlyConfig
pluginConfig) {
TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
@@ -48,6 +51,18 @@ public class TDengineSourceConfig implements Serializable {
tdengineSourceConfig.setPassword(pluginConfig.get(TDengineSourceOptions.PASSWORD));
tdengineSourceConfig.setUpperBound(pluginConfig.get(TDengineSourceOptions.UPPER_BOUND));
tdengineSourceConfig.setLowerBound(pluginConfig.get(TDengineSourceOptions.LOWER_BOUND));
+ if
(pluginConfig.getOptional(TDengineSourceOptions.SUB_TABLES).isPresent()) {
+ tdengineSourceConfig.setSubTables(
+ pluginConfig.get(TDengineSourceOptions.SUB_TABLES).stream()
+ .collect(Collectors.toSet()));
+ }
+ if
(pluginConfig.getOptional(TDengineSourceOptions.READ_COLUMNS).isPresent()) {
+ tdengineSourceConfig.setReadColumns(
+
pluginConfig.get(TDengineSourceOptions.READ_COLUMNS).stream()
+ .collect(Collectors.toSet()));
+ } else {
+ tdengineSourceConfig.setReadColumns(null);
+ }
return tdengineSourceConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
index 4d6bb0789d..69fb15e9db 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.tdengine.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.util.List;
+
public class TDengineSourceOptions extends TDengineCommonOptions {
public static final Option<String> LOWER_BOUND =
@@ -33,4 +35,21 @@ public class TDengineSourceOptions extends
TDengineCommonOptions {
.stringType()
.noDefaultValue()
.withDescription("The upper bound for data query range");
+
+ public static final Option<List<String>> SUB_TABLES =
+ Options.key("sub_tables")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "The sub table names to query data from, separated
by comma , "
+ + "if not specified, all sub tables will
be queried");
+
+ public static final Option<List<String>> READ_COLUMNS =
+ Options.key("read_columns")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "The field names to be read from TDengine "
+ + "If not specified, all columns will be
read. "
+ + "This option is useful for selecting
specific columns when querying data from TDengine.");
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index 4137becdb9..d76c78264e 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -103,10 +103,13 @@ public class TDengineSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
String sql =
String.format(
- "INSERT INTO %s using %s tags ( %s ) VALUES ( %s
);",
+ "INSERT INTO %s using %s tags ( %s ) %s VALUES (
%s );",
element.getField(0),
config.getStable(),
tagValues,
+ StringUtils.isEmpty(config.getWriteColumns())
+ ? ""
+ : "( " + config.getWriteColumns() + " )",
StringUtils.join(convertDataType(metrics), ","));
final int rowCount = statement.executeUpdate(sql);
if (rowCount == 0) {
@@ -140,6 +143,7 @@ public class TDengineSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
if (object == null) {
return null;
}
+
if (LocalDateTime.class.equals(object.getClass()))
{
// transform timezone according to the config
return "'"
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
index 7e396fa0f9..3cbaee29e8 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTyp
import org.apache.commons.lang3.ArrayUtils;
import com.taosdata.jdbc.TSDBDriver;
+import lombok.Getter;
import lombok.SneakyThrows;
import java.sql.Connection;
@@ -59,8 +60,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineU
*/
public class TDengineSource
implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit,
TDengineSourceState> {
-
- private final StableMetadata stableMetadata;
+ @Getter private final StableMetadata stableMetadata;
private final TDengineSourceConfig tdengineSourceConfig;
private final CatalogTable catalogTable;
@@ -137,12 +137,22 @@ public class TDengineSource
if (timestampFieldName == null) {
timestampFieldName = metaResultSet.getString(1);
}
+ if (config.getReadColumns() != null
+ && !config.getReadColumns().isEmpty()
+ &&
!config.getReadColumns().contains(metaResultSet.getString(1))) {
+ continue;
+ }
fieldNames.add(metaResultSet.getString(1));
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
}
while (subTableNameResultSet.next()) {
String subTableName = subTableNameResultSet.getString(1);
+ if (config.getSubTables() != null
+ && !config.getSubTables().isEmpty()
+ && !config.getSubTables().contains(subTableName)) {
+ continue;
+ }
subTableNames.add(subTableName);
}
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
index abd42fefe1..3a99670329 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
@@ -18,22 +18,42 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
class TDengineSourceReaderTest {
Logger logger;
TDengineSourceReader tDengineSourceReader;
@@ -103,6 +123,62 @@ class TDengineSourceReaderTest {
pool.awaitTermination(3, TimeUnit.SECONDS);
}
+ @Test
+ public void testGetStableMetadata() throws SQLException {
+
+ try (MockedStatic<DriverManager> dm = mockStatic(DriverManager.class))
{
+
+ Connection mockConn = mock(Connection.class);
+ Statement mockStatement = mock(Statement.class);
+ ResultSet metadataResultSet = mock(ResultSet.class);
+ ResultSet tableResultSet = mock(ResultSet.class);
+
+ dm.when(() -> DriverManager.getConnection(anyString(),
any(Properties.class)))
+ .thenReturn(mockConn);
+
+ when(mockConn.createStatement()).thenReturn(mockStatement);
+
+ when(mockStatement.executeQuery(
+ argThat(
+ sql ->
+ StringUtils.isNotEmpty(sql)
+ && sql.trim()
+ .toLowerCase()
+
.startsWith("desc"))))
+ .thenReturn(metadataResultSet);
+ when(metadataResultSet.next()).thenReturn(true, true, false);
+ when(metadataResultSet.getString(1)).thenReturn("ts", "col1",
"col1", "col2");
+ when(metadataResultSet.getString(2)).thenReturn("INT",
"VARCHAR(20)");
+
+ when(mockStatement.executeQuery(
+ argThat(
+ sql ->
+ sql.trim()
+ .toLowerCase()
+ .startsWith(
+ "select table_name
from information_schema.ins_tables"))))
+ .thenReturn(tableResultSet);
+ when(tableResultSet.next()).thenReturn(true, true, false);
+ when(tableResultSet.getString(1)).thenReturn("sub_table_1",
"sub_table_2");
+ Map<String, Object> map = new HashMap<>();
+ map.put("url", "jdbc:TAOS-RS://localhost:6041/");
+ map.put("database", "test_db");
+ map.put("username", "root");
+ map.put("password", "taosdata");
+ map.put("stable", "stable");
+ map.put("sub_tables", "sub_table_1");
+ map.put("read_columns", "col1");
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(map);
+ TDengineSource source = new TDengineSource(config);
+ StableMetadata stableMetadata = source.getStableMetadata();
+ Assertions.assertEquals(1,
stableMetadata.getSubTableNames().size());
+ Assertions.assertEquals("sub_table_1",
stableMetadata.getSubTableNames().get(0));
+ Assertions.assertEquals(2,
stableMetadata.getRowType().getFieldNames().length);
+ Assertions.assertEquals("col1",
stableMetadata.getRowType().getFieldNames()[1]);
+ }
+ }
+
private static class TestCollector implements Collector<SeaTunnelRow> {
private final List<SeaTunnelRow> rows = new ArrayList<>();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
index 6199da00aa..12935b378d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -134,6 +134,12 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
"CREATE STABLE power2.meters4 (ts TIMESTAMP, current
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+ "TAGS (location BINARY(64), groupId INT)");
}
+ try (Statement stmt = connection2.createStatement()) {
+ stmt.execute("CREATE DATABASE power3 KEEP 3650");
+ stmt.execute(
+ "CREATE STABLE power3.meters5 (ts TIMESTAMP, current
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+ + "TAGS (location BINARY(64), groupId INT)");
+ }
return rowCount;
}
@@ -143,7 +149,7 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
container.executeJob("/tdengine/tdengine_source_to_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- long rowCountInserted = readSinkDataset("meters2");
+ long rowCountInserted = readSinkDataset("power2", "meters2");
Assertions.assertEquals(rowCountInserted, testDataCount);
}
@@ -153,21 +159,32 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
container.executeJob("/tdengine/tdengine_fake_to_sink_multitable.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- long rowCountInserted = readSinkDataset("meters3");
- long rowCountInserted2 = readSinkDataset("meters4");
+ long rowCountInserted = readSinkDataset("power2", "meters3");
+ long rowCountInserted2 = readSinkDataset("power2", "meters4");
Assertions.assertEquals(rowCountInserted, testDataCountMulti_Table1);
Assertions.assertEquals(rowCountInserted2, testDataCountMulti_Table2);
}
+ @TestTemplate
+ public void testTDEngineSourceToSinkFilterByFieldName(TestContainer
container)
+ throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long rowCountInserted = readSinkDataset("power3", "meters5");
+ Assertions.assertEquals(4, rowCountInserted);
+ }
+
@SneakyThrows
- private long readSinkDataset(String stableName) {
+ private long readSinkDataset(String database, String stableName) {
// Validate table name
if (stableName == null || !stableName.matches("^[a-zA-Z0-9_]+$")) {
throw new IllegalArgumentException("Invalid table name provided: "
+ stableName);
}
long rowCount;
- String sql = String.format("SELECT COUNT(1) FROM power2.%s;",
stableName);
+ String sql = String.format("SELECT COUNT(1) FROM %s.%s;", database,
stableName);
try (Statement stmt = connection2.createStatement();
ResultSet resultSet = stmt.executeQuery(sql); ) {
resultSet.next();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf
new file mode 100644
index 0000000000..c70452a29a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ TDengine {
+ url: "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/"
+ username: "root"
+ password: "taosdata"
+ database: "power"
+ stable: "meters"
+ lower_bound: "2018-10-03 14:38:05.000"
+ upper_bound: "2018-10-03 14:38:16.801"
+ sub_tables: ["d1001","d1002"]
+ read_columns:
["ts","current","voltage","phase","off","nc","location","groupid"]
+ }
+}
+
+transform {
+}
+
+sink {
+ TDengine {
+ url: "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
+ username: "root"
+ password: "taosdata"
+ database: "power3"
+ stable: "meters5"
+ timezone: "UTC"
+ write_columns: ["ts","current","voltage","phase","off","nc"]
+ }
+}
\ No newline at end of file