This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ff122fe405 [improve] tdengine options (#9399)
ff122fe405 is described below

commit ff122fe4054e44a5fcdbdcf7936fb8e9fd370545
Author: Jarvis <[email protected]>
AuthorDate: Sat Jun 28 23:02:03 2025 +0800

    [improve] tdengine options (#9399)
---
 .../tdengine/config/TDengineCommonOptions.java     |  1 +
 .../tdengine/config/TDengineSourceConfig.java      | 54 ++++------------------
 .../tdengine/config/TDengineSourceOptions.java     |  9 +---
 .../seatunnel/tdengine/source/TDengineSource.java  | 51 ++++++++------------
 .../tdengine/source/TDengineSourceFactory.java     | 27 +++++++----
 5 files changed, 49 insertions(+), 93 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
index 2f9ae2d051..91344062c2 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
@@ -24,6 +24,7 @@ import lombok.Data;
 
 @Data
 public abstract class TDengineCommonOptions {
+
     public static final Option<String> URL =
             Options.key("url")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
index 4eabb754cf..5ea375f558 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -17,21 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Data;
 
 import java.io.Serializable;
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
-
 @Data
 public class TDengineSourceConfig implements Serializable {
 
@@ -42,48 +34,20 @@ public class TDengineSourceConfig implements Serializable {
     private String password;
     private String database;
     private String stable;
-    // param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, 
other than the JDBC
-    // client side
-    // so this param represent the server-side timezone setting up
-    private String timezone;
     private String lowerBound;
     private String upperBound;
     private List<String> fields;
     private List<String> tags;
 
-    public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+    public static TDengineSourceConfig buildSourceConfig(ReadonlyConfig 
pluginConfig) {
         TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
-        tdengineSourceConfig.setUrl(
-                pluginConfig.hasPath(ConfigNames.URL)
-                        ? pluginConfig.getString(ConfigNames.URL)
-                        : null);
-        tdengineSourceConfig.setDatabase(
-                pluginConfig.hasPath(DATABASE) ? 
pluginConfig.getString(DATABASE) : null);
-        tdengineSourceConfig.setStable(
-                pluginConfig.hasPath(STABLE) ? pluginConfig.getString(STABLE) 
: null);
-        tdengineSourceConfig.setUsername(
-                pluginConfig.hasPath(USERNAME) ? 
pluginConfig.getString(USERNAME) : null);
-        tdengineSourceConfig.setPassword(
-                pluginConfig.hasPath(PASSWORD) ? 
pluginConfig.getString(PASSWORD) : null);
-        tdengineSourceConfig.setUpperBound(
-                pluginConfig.hasPath(UPPER_BOUND) ? 
pluginConfig.getString(UPPER_BOUND) : null);
-        tdengineSourceConfig.setLowerBound(
-                pluginConfig.hasPath(LOWER_BOUND) ? 
pluginConfig.getString(LOWER_BOUND) : null);
-        tdengineSourceConfig.setTimezone(
-                pluginConfig.hasPath(TIMEZONE) ? 
pluginConfig.getString(TIMEZONE) : "UTC");
-
+        
tdengineSourceConfig.setUrl(pluginConfig.get(TDengineSourceOptions.URL));
+        
tdengineSourceConfig.setDatabase(pluginConfig.get(TDengineSourceOptions.DATABASE));
+        
tdengineSourceConfig.setStable(pluginConfig.get(TDengineSourceOptions.STABLE));
+        
tdengineSourceConfig.setUsername(pluginConfig.get(TDengineSourceOptions.USERNAME));
+        
tdengineSourceConfig.setPassword(pluginConfig.get(TDengineSourceOptions.PASSWORD));
+        
tdengineSourceConfig.setUpperBound(pluginConfig.get(TDengineSourceOptions.UPPER_BOUND));
+        
tdengineSourceConfig.setLowerBound(pluginConfig.get(TDengineSourceOptions.LOWER_BOUND));
         return tdengineSourceConfig;
     }
-
-    public static class ConfigNames {
-
-        public static String URL = "url";
-        public static String USERNAME = "username";
-        public static String PASSWORD = "password";
-        public static String DATABASE = "database";
-        public static String STABLE = "stable";
-        public static String TIMEZONE = "timezone";
-        public static String LOWER_BOUND = "lower_bound";
-        public static String UPPER_BOUND = "upper_bound";
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
index 58ce50f92f..4d6bb0789d 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
@@ -20,21 +20,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.tdengine.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-@Data
-@AllArgsConstructor
 public class TDengineSourceOptions extends TDengineCommonOptions {
 
     public static final Option<String> LOWER_BOUND =
-            Options.key("lowerBound")
+            Options.key("lower_bound")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The lower bound for data query range");
 
     public static final Option<String> UPPER_BOUND =
-            Options.key("upperBound")
+            Options.key("upper_bound")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The upper bound for data query range");
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
index e72773781a..7e396fa0f9 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -17,29 +17,24 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceReader.Context;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;
 
 import org.apache.commons.lang3.ArrayUtils;
 
-import com.google.auto.service.AutoService;
 import com.taosdata.jdbc.TSDBDriver;
 import lombok.SneakyThrows;
 
@@ -49,14 +44,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
 
@@ -66,31 +57,25 @@ import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineU
  * <p>TODO: wait for optimization 1. batch -> batch + stream 2. one item of 
data writing -> a batch
  * of data writing
  */
-@AutoService(SeaTunnelSource.class)
 public class TDengineSource
         implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, 
TDengineSourceState> {
 
-    private StableMetadata stableMetadata;
-    private TDengineSourceConfig tdengineSourceConfig;
+    private final StableMetadata stableMetadata;
+    private final TDengineSourceConfig tdengineSourceConfig;
+    private final CatalogTable catalogTable;
 
-    @Override
-    public String getPluginName() {
-        return "TDengine";
+    @SneakyThrows
+    public TDengineSource(ReadonlyConfig pluginConfig) {
+        this.tdengineSourceConfig = buildSourceConfig(pluginConfig);
+        this.stableMetadata = getStableMetadata(tdengineSourceConfig);
+        this.catalogTable =
+                CatalogTableUtil.getCatalogTable(
+                        tdengineSourceConfig.getStable(), 
stableMetadata.getRowType());
     }
 
-    @SneakyThrows
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, URL, DATABASE, STABLE, USERNAME, 
PASSWORD);
-        if (!result.isSuccess()) {
-            throw new TDengineConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    "TDengine connection require 
url/database/stable/username/password. All of these must not be empty.");
-        }
-        tdengineSourceConfig = buildSourceConfig(pluginConfig);
-        stableMetadata = getStableMetadata(tdengineSourceConfig);
+    public String getPluginName() {
+        return "TDengine";
     }
 
     @Override
@@ -99,8 +84,8 @@ public class TDengineSource
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return stableMetadata.getRowType();
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
index 921e4d0d70..c875593ef0 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
@@ -19,18 +19,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.tdengine.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.LOWER_BOUND;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.STABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.UPPER_BOUND;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.USERNAME;
+import java.io.Serializable;
 
 @AutoService(Factory.class)
 public class TDengineSourceFactory implements TableSourceFactory {
@@ -43,10 +41,23 @@ public class TDengineSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URL, USERNAME, PASSWORD, DATABASE, STABLE, 
LOWER_BOUND, UPPER_BOUND)
+                .required(
+                        TDengineSourceOptions.URL,
+                        TDengineSourceOptions.USERNAME,
+                        TDengineSourceOptions.PASSWORD,
+                        TDengineSourceOptions.DATABASE,
+                        TDengineSourceOptions.STABLE,
+                        TDengineSourceOptions.LOWER_BOUND,
+                        TDengineSourceOptions.UPPER_BOUND)
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
TDengineSource(context.getOptions());
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return TDengineSource.class;

Reply via email to