This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b136a0dc43 [Feature][connector-tdengine] Support subtable and 
fieldNames in tdengine source (#9593)
b136a0dc43 is described below

commit b136a0dc438c668931f81553f01fcc880155da6e
Author: jiachuan.zhu <[email protected]>
AuthorDate: Mon Jul 28 15:29:29 2025 +0800

    [Feature][connector-tdengine] Support subtable and fieldNames in tdengine 
source (#9593)
---
 docs/en/connector-v2/sink/TDengine.md              |  21 +++--
 docs/en/connector-v2/source/TDengine.md            |  31 +++++--
 docs/zh/connector-v2/sink/TDengine.md              |   9 +-
 docs/zh/connector-v2/source/TDengine.md            | 102 +++++++++++++++++++++
 .../tdengine/config/TDengineSinkConfig.java        |   8 +-
 .../tdengine/config/TDengineSinkOptions.java       |  11 +++
 .../tdengine/config/TDengineSourceConfig.java      |  17 +++-
 .../tdengine/config/TDengineSourceOptions.java     |  19 ++++
 .../tdengine/sink/TDengineSinkWriter.java          |   6 +-
 .../seatunnel/tdengine/source/TDengineSource.java  |  14 ++-
 .../tdengine/source/TDengineSourceReaderTest.java  |  76 +++++++++++++++
 .../e2e/connector/tdengine/TDengineIT.java         |  27 +++++-
 ...engine_source_to_sink_filter_by_fieldNames.conf |  53 +++++++++++
 13 files changed, 365 insertions(+), 29 deletions(-)

diff --git a/docs/en/connector-v2/sink/TDengine.md 
b/docs/en/connector-v2/sink/TDengine.md
index 8e7fac69c0..55755c1696 100644
--- a/docs/en/connector-v2/sink/TDengine.md
+++ b/docs/en/connector-v2/sink/TDengine.md
@@ -15,14 +15,15 @@ Used to write data to TDengine. You need to create stable 
before running seatunn
 
 ## Options
 
-|   name   |  type  | required | default value |
-|----------|--------|----------|---------------|
-| url      | string | yes      | -             |
-| username | string | yes      | -             |
-| password | string | yes      | -             |
-| database | string | yes      |               |
-| stable   | string | yes      | -             |
-| timezone | string | no       | UTC           |
+| name         | type   | required | default value |
+|--------------|--------|----------|---------------|
+| url          | string | yes      | -             |
+| username     | string | yes      | -             |
+| password     | string | yes      | -             |
+| database     | string | yes      |               |
+| stable       | string | yes      | -             |
+| timezone     | string | no       | UTC           |
+| write_columns| list   | no       | -             |
 
 ### url [string]
 
@@ -54,6 +55,9 @@ the stable of the TDengine when you select
 
 the timeznoe of the TDengine sever, it's important to the ts field
 
+### write_columns [list]
+The field names to be inserted into TDengine. If not set, all fields will be 
written. The plugin will automatically append TAGS columns, so please do not 
include TAGS columns in this option.
+
 ## Example
 
 ### sink
@@ -67,6 +71,7 @@ sink {
           database : "power2"
           stable : "meters2"
           timezone: UTC
+          write_columns: ["ts", "voltage", "current", "power"]
         }
 }
 ```
diff --git a/docs/en/connector-v2/source/TDengine.md 
b/docs/en/connector-v2/source/TDengine.md
index 2f82882706..d5e4ee1d50 100644
--- a/docs/en/connector-v2/source/TDengine.md
+++ b/docs/en/connector-v2/source/TDengine.md
@@ -22,15 +22,17 @@ supports query SQL and can achieve projection effect.
 
 ## Options
 
-|    name     |  type  | required | default value |
-|-------------|--------|----------|---------------|
-| url         | string | yes      | -             |
-| username    | string | yes      | -             |
-| password    | string | yes      | -             |
-| database    | string | yes      |               |
-| stable      | string | yes      | -             |
-| lower_bound | long   | yes      | -             |
-| upper_bound | long   | yes      | -             |
+| name         | type   | required | default value |
+|--------------|--------|----------|---------------|
+| url          | string | yes      | -             |
+| username     | string | yes      | -             |
+| password     | string | yes      | -             |
+| database     | string | yes      |               |
+| stable       | string | yes      | -             |
+| sub_tables   | list   | no       | -             |
+| lower_bound  | long   | yes      | -             |
+| upper_bound  | long   | yes      | -             |
+| read_columns | list   | no       | -             |
 
 ### url [string]
 
@@ -58,6 +60,9 @@ the database of the TDengine when you select
 
 the stable of the TDengine when you select
 
+### sub_tables [list]
+A list of sub_table names. If not specified, all sub-tables will be selected. 
If specified, only the specified sub-tables will be selected.
+
 ### lower_bound [long]
 
 the lower_bound of the migration period
@@ -66,6 +71,10 @@ the lower_bound of the migration period
 
 the upper_bound of the migration period
 
+### read_columns [list]
+A list of column names to read. If not specified, all columns will be 
selected. 
+When reading from a super table, please make sure to put the TAGS columns at 
the end of the list.
+
 ## Example
 
 ### source
@@ -78,9 +87,11 @@ source {
           password : "taosdata"
           database : "power"
           stable : "meters"
+          sub_tables : ["meter_1","meter_2"]
           lower_bound : "2018-10-03 14:38:05.000"
           upper_bound : "2018-10-03 14:38:16.800"
-          plugin_output = "tdengine_result"
+          plugin_output : "tdengine_result"
+          read_columns : ["ts","voltage","current","power"]
         }
 }
 ```
diff --git a/docs/zh/connector-v2/sink/TDengine.md 
b/docs/zh/connector-v2/sink/TDengine.md
index 98a03d54f1..bf12253de9 100644
--- a/docs/zh/connector-v2/sink/TDengine.md
+++ b/docs/zh/connector-v2/sink/TDengine.md
@@ -15,7 +15,7 @@ import ChangeLog from '../changelog/connector-tdengine.md';
 
 ## 选项
 
-|   名称   |  类型  | 是否必传 | 默认值 |
+|   名称   | 类型     | 是否必传 | 默认值 |
 |----------|--------|----------|---------------|
 | url      | string | 是      | -             |
 | username | string | 是      | -             |
@@ -23,6 +23,7 @@ import ChangeLog from '../changelog/connector-tdengine.md';
 | database | string | 是      |               |
 | stable   | string | 是      | -             |
 | timezone | string | 否       | UTC           |
+| write_columns | list   | 否       | -             |
 
 ### url [string]
 
@@ -54,6 +55,10 @@ TDengine的超级表
 
 TDengine服务器的时间,对ts字段很重要
 
+### write_columns [list]
+TDengine的写入列,默认为所有列。无需包含 TAGS 字段,插件会自动处理 TAGS 字段的写入。
+
+
 ## 示例
 
 ### sink
@@ -67,10 +72,12 @@ sink {
           database : "power2"
           stable : "meters2"
           timezone: UTC
+          write_columns: ["ts", "voltage", "current", "power"]
         }
 }
 ```
 
+
 ## 变更日志
 
 <ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/TDengine.md 
b/docs/zh/connector-v2/source/TDengine.md
new file mode 100644
index 0000000000..2bd543a60c
--- /dev/null
+++ b/docs/zh/connector-v2/source/TDengine.md
@@ -0,0 +1,102 @@
+import ChangeLog from '../changelog/connector-tdengine.md';
+
+# TDengine
+
+> TDengine 源端连接器
+
+## 描述
+
+通过 TDengine 读取外部数据源的数据。
+
+## 主要特性
+
+- [x] [批处理](../../concept/connector-v2-features.md)
+- [ ] [流式](../../concept/connector-v2-features.md)
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [列投影](../../concept/connector-v2-features.md)
+
+支持查询 SQL,并可实现投影效果。
+
+- [x] [并行度](../../concept/connector-v2-features.md)
+- [ ] [支持用户自定义分片](../../concept/connector-v2-features.md)
+
+## 配置项
+
+| 名称           | 类型   | 必填 | 默认值         |
+|----------------|--------|------|----------------|
+| url            | string | 是   | -              |
+| username       | string | 是   | -              |
+| password       | string | 是   | -              |
+| database       | string | 是   |                |
+| stable         | string | 是   | -              |
+| sub_tables     | list   | 否   | -              |
+| lower_bound    | long   | 是   | -              |
+| upper_bound    | long   | 是   | -              |
+| read_columns   | list   | 否   | -              |
+
+### url [string]
+
+选择 TDengine 时的连接 URL
+
+例如:
+
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+选择 TDengine 时的用户名
+
+### password [string]
+
+选择 TDengine 时的密码
+
+### database [string]
+
+选择 TDengine 时的数据库名
+
+### stable [string]
+
+选择 TDengine 时的超级表名
+
+### sub_tables [list]
+
+TDengine 的子表名。如果不指定,则会选择所有子表;如果指定,则只选择指定的子表。
+
+### lower_bound [long]
+
+迁移时间段的下界
+
+### upper_bound [long]
+
+迁移时间段的上界
+
+### read_columns [list]
+
+选择 TDengine 时的列名。如果不指定,则选择所有字段;如果指定,则只选择指定的字段。读取超级表时,请包含TAGS 字段,并放在末尾。
+
+## 示例
+
+### source 配置示例
+
+```hocon
+source {
+        TDengine {
+          url : "jdbc:TAOS-RS://localhost:6041/"
+          username : "root"
+          password : "taosdata"
+          database : "power"
+          stable : "meters"
+          sub_tables : ["meter_1","meter_2"]
+          lower_bound : "2018-10-03 14:38:05.000"
+          upper_bound : "2018-10-03 14:38:16.800"
+          plugin_output = "tdengine_result"
+          read_columns : ["ts","voltage","current","power"]
+        }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
index b2892c61d6..cac4d5fe78 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
@@ -23,6 +23,7 @@ import lombok.Builder;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Optional;
 
 @Data
@@ -37,6 +38,7 @@ public class TDengineSinkConfig implements Serializable {
     private String database;
     private String stable;
     private String timezone;
+    private String writeColumns;
 
     public static TDengineSinkConfig of(ReadonlyConfig config) {
         Builder builder = TDengineSinkConfig.builder();
@@ -50,7 +52,11 @@ public class TDengineSinkConfig implements Serializable {
         Optional<String> optionalTimezone = 
config.getOptional(TDengineSinkOptions.TIMEZONE);
 
         
builder.timezone(optionalTimezone.orElseGet(TDengineSinkOptions.TIMEZONE::defaultValue));
-
+        Optional<List<String>> optionalWriteColumns =
+                config.getOptional(TDengineSinkOptions.WRITE_COLUMNS);
+        if (optionalWriteColumns.isPresent()) {
+            builder.writeColumns(String.join(",", optionalWriteColumns.get()));
+        }
         return builder.build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
index 7f64177656..2d112bf644 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.configuration.Options;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
+import java.util.List;
+
 @Data
 @AllArgsConstructor
 public class TDengineSinkOptions extends TDengineCommonOptions {
@@ -32,4 +34,13 @@ public class TDengineSinkOptions extends 
TDengineCommonOptions {
                     .stringType()
                     .defaultValue("UTC")
                     .withDescription("The timezone used for timestamp 
conversion, default is UTC");
+
+    public static final Option<List<String>> WRITE_COLUMNS =
+            Options.key("write_columns")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The field names to be written to TDengine "
+                                    + "If not specified, all fields will be 
written. "
+                                    + "This option is useful when the source 
schema does not match the TDengine table schema.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
index 5ea375f558..f283bbf72e 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -23,6 +23,8 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 @Data
 public class TDengineSourceConfig implements Serializable {
@@ -36,8 +38,9 @@ public class TDengineSourceConfig implements Serializable {
     private String stable;
     private String lowerBound;
     private String upperBound;
-    private List<String> fields;
     private List<String> tags;
+    private Set<String> subTables;
+    private Set<String> readColumns;
 
     public static TDengineSourceConfig buildSourceConfig(ReadonlyConfig 
pluginConfig) {
         TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
@@ -48,6 +51,18 @@ public class TDengineSourceConfig implements Serializable {
         
tdengineSourceConfig.setPassword(pluginConfig.get(TDengineSourceOptions.PASSWORD));
         
tdengineSourceConfig.setUpperBound(pluginConfig.get(TDengineSourceOptions.UPPER_BOUND));
         
tdengineSourceConfig.setLowerBound(pluginConfig.get(TDengineSourceOptions.LOWER_BOUND));
+        if 
(pluginConfig.getOptional(TDengineSourceOptions.SUB_TABLES).isPresent()) {
+            tdengineSourceConfig.setSubTables(
+                    pluginConfig.get(TDengineSourceOptions.SUB_TABLES).stream()
+                            .collect(Collectors.toSet()));
+        }
+        if 
(pluginConfig.getOptional(TDengineSourceOptions.READ_COLUMNS).isPresent()) {
+            tdengineSourceConfig.setReadColumns(
+                    
pluginConfig.get(TDengineSourceOptions.READ_COLUMNS).stream()
+                            .collect(Collectors.toSet()));
+        } else {
+            tdengineSourceConfig.setReadColumns(null);
+        }
         return tdengineSourceConfig;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
index 4d6bb0789d..69fb15e9db 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.tdengine.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.util.List;
+
 public class TDengineSourceOptions extends TDengineCommonOptions {
 
     public static final Option<String> LOWER_BOUND =
@@ -33,4 +35,21 @@ public class TDengineSourceOptions extends 
TDengineCommonOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The upper bound for data query range");
+
+    public static final Option<List<String>> SUB_TABLES =
+            Options.key("sub_tables")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The sub table names to query data from, separated 
by comma , "
+                                    + "if not specified, all sub tables will 
be queried");
+
+    public static final Option<List<String>> READ_COLUMNS =
+            Options.key("read_columns")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The field names to be read from TDengine "
+                                    + "If not specified, all columns will be 
read. "
+                                    + "This option is useful for selecting 
specific columns when querying data from TDengine.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index 4137becdb9..d76c78264e 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -103,10 +103,13 @@ public class TDengineSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
                 conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)) {
             String sql =
                     String.format(
-                            "INSERT INTO %s using %s tags ( %s ) VALUES ( %s 
);",
+                            "INSERT INTO %s using %s tags ( %s ) %s VALUES ( 
%s );",
                             element.getField(0),
                             config.getStable(),
                             tagValues,
+                            StringUtils.isEmpty(config.getWriteColumns())
+                                    ? ""
+                                    : "( " + config.getWriteColumns() + " )",
                             StringUtils.join(convertDataType(metrics), ","));
             final int rowCount = statement.executeUpdate(sql);
             if (rowCount == 0) {
@@ -140,6 +143,7 @@ public class TDengineSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
                             if (object == null) {
                                 return null;
                             }
+
                             if (LocalDateTime.class.equals(object.getClass())) 
{
                                 // transform timezone according to the config
                                 return "'"
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
index 7e396fa0f9..3cbaee29e8 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTyp
 import org.apache.commons.lang3.ArrayUtils;
 
 import com.taosdata.jdbc.TSDBDriver;
+import lombok.Getter;
 import lombok.SneakyThrows;
 
 import java.sql.Connection;
@@ -59,8 +60,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineU
  */
 public class TDengineSource
         implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, 
TDengineSourceState> {
-
-    private final StableMetadata stableMetadata;
+    @Getter private final StableMetadata stableMetadata;
     private final TDengineSourceConfig tdengineSourceConfig;
     private final CatalogTable catalogTable;
 
@@ -137,12 +137,22 @@ public class TDengineSource
                 if (timestampFieldName == null) {
                     timestampFieldName = metaResultSet.getString(1);
                 }
+                if (config.getReadColumns() != null
+                        && !config.getReadColumns().isEmpty()
+                        && 
!config.getReadColumns().contains(metaResultSet.getString(1))) {
+                    continue;
+                }
                 fieldNames.add(metaResultSet.getString(1));
                 
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
             }
 
             while (subTableNameResultSet.next()) {
                 String subTableName = subTableNameResultSet.getString(1);
+                if (config.getSubTables() != null
+                        && !config.getSubTables().isEmpty()
+                        && !config.getSubTables().contains(subTableName)) {
+                    continue;
+                }
                 subTableNames.add(subTableName);
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
index abd42fefe1..3a99670329 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
@@ -18,22 +18,42 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
 
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
 class TDengineSourceReaderTest {
     Logger logger;
     TDengineSourceReader tDengineSourceReader;
@@ -103,6 +123,62 @@ class TDengineSourceReaderTest {
         pool.awaitTermination(3, TimeUnit.SECONDS);
     }
 
+    @Test
+    public void testGetStableMetadata() throws SQLException {
+
+        try (MockedStatic<DriverManager> dm = mockStatic(DriverManager.class)) 
{
+
+            Connection mockConn = mock(Connection.class);
+            Statement mockStatement = mock(Statement.class);
+            ResultSet metadataResultSet = mock(ResultSet.class);
+            ResultSet tableResultSet = mock(ResultSet.class);
+
+            dm.when(() -> DriverManager.getConnection(anyString(), 
any(Properties.class)))
+                    .thenReturn(mockConn);
+
+            when(mockConn.createStatement()).thenReturn(mockStatement);
+
+            when(mockStatement.executeQuery(
+                            argThat(
+                                    sql ->
+                                            StringUtils.isNotEmpty(sql)
+                                                    && sql.trim()
+                                                            .toLowerCase()
+                                                            
.startsWith("desc"))))
+                    .thenReturn(metadataResultSet);
+            when(metadataResultSet.next()).thenReturn(true, true, false);
+            when(metadataResultSet.getString(1)).thenReturn("ts", "col1", 
"col1", "col2");
+            when(metadataResultSet.getString(2)).thenReturn("INT", 
"VARCHAR(20)");
+
+            when(mockStatement.executeQuery(
+                            argThat(
+                                    sql ->
+                                            sql.trim()
+                                                    .toLowerCase()
+                                                    .startsWith(
+                                                            "select table_name 
from information_schema.ins_tables"))))
+                    .thenReturn(tableResultSet);
+            when(tableResultSet.next()).thenReturn(true, true, false);
+            when(tableResultSet.getString(1)).thenReturn("sub_table_1", 
"sub_table_2");
+            Map<String, Object> map = new HashMap<>();
+            map.put("url", "jdbc:TAOS-RS://localhost:6041/");
+            map.put("database", "test_db");
+            map.put("username", "root");
+            map.put("password", "taosdata");
+            map.put("stable", "stable");
+            map.put("sub_tables", "sub_table_1");
+            map.put("read_columns", "col1");
+
+            ReadonlyConfig config = ReadonlyConfig.fromMap(map);
+            TDengineSource source = new TDengineSource(config);
+            StableMetadata stableMetadata = source.getStableMetadata();
+            Assertions.assertEquals(1, 
stableMetadata.getSubTableNames().size());
+            Assertions.assertEquals("sub_table_1", 
stableMetadata.getSubTableNames().get(0));
+            Assertions.assertEquals(2, 
stableMetadata.getRowType().getFieldNames().length);
+            Assertions.assertEquals("col1", 
stableMetadata.getRowType().getFieldNames()[1]);
+        }
+    }
+
     private static class TestCollector implements Collector<SeaTunnelRow> {
 
         private final List<SeaTunnelRow> rows = new ArrayList<>();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
index 6199da00aa..12935b378d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -134,6 +134,12 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
                     "CREATE STABLE power2.meters4 (ts TIMESTAMP, current 
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
                             + "TAGS (location BINARY(64), groupId INT)");
         }
+        try (Statement stmt = connection2.createStatement()) {
+            stmt.execute("CREATE DATABASE power3 KEEP 3650");
+            stmt.execute(
+                    "CREATE STABLE power3.meters5 (ts TIMESTAMP, current 
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+                            + "TAGS (location BINARY(64), groupId INT)");
+        }
         return rowCount;
     }
 
@@ -143,7 +149,7 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
                 container.executeJob("/tdengine/tdengine_source_to_sink.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
 
-        long rowCountInserted = readSinkDataset("meters2");
+        long rowCountInserted = readSinkDataset("power2", "meters2");
         Assertions.assertEquals(rowCountInserted, testDataCount);
     }
 
@@ -153,21 +159,32 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
                 
container.executeJob("/tdengine/tdengine_fake_to_sink_multitable.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
 
-        long rowCountInserted = readSinkDataset("meters3");
-        long rowCountInserted2 = readSinkDataset("meters4");
+        long rowCountInserted = readSinkDataset("power2", "meters3");
+        long rowCountInserted2 = readSinkDataset("power2", "meters4");
         Assertions.assertEquals(rowCountInserted, testDataCountMulti_Table1);
         Assertions.assertEquals(rowCountInserted2, testDataCountMulti_Table2);
     }
 
+    @TestTemplate
+    public void testTDEngineSourceToSinkFilterByFieldName(TestContainer 
container)
+            throws Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        long rowCountInserted = readSinkDataset("power3", "meters5");
+        Assertions.assertEquals(4, rowCountInserted);
+    }
+
     @SneakyThrows
-    private long readSinkDataset(String stableName) {
+    private long readSinkDataset(String database, String stableName) {
         // Validate table name
         if (stableName == null || !stableName.matches("^[a-zA-Z0-9_]+$")) {
             throw new IllegalArgumentException("Invalid table name provided: " 
+ stableName);
         }
 
         long rowCount;
-        String sql = String.format("SELECT COUNT(1) FROM power2.%s;", 
stableName);
+        String sql = String.format("SELECT COUNT(1) FROM %s.%s;", database, 
stableName);
         try (Statement stmt = connection2.createStatement();
                 ResultSet resultSet = stmt.executeQuery(sql); ) {
             resultSet.next();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf
new file mode 100644
index 0000000000..c70452a29a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  TDengine {
+      url: "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/"
+      username: "root"
+      password: "taosdata"
+      database: "power"
+      stable: "meters"
+      lower_bound: "2018-10-03 14:38:05.000"
+      upper_bound: "2018-10-03 14:38:16.801"
+      sub_tables: ["d1001","d1002"]
+      read_columns: 
["ts","current","voltage","phase","off","nc","location","groupid"]
+  }
+}
+
+transform {
+}
+
+sink {
+  TDengine {
+    url: "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
+    username: "root"
+    password: "taosdata"
+    database: "power3"
+    stable: "meters5"
+    timezone: "UTC"
+    write_columns: ["ts","current","voltage","phase","off","nc"]
+  }
+}
\ No newline at end of file

Reply via email to