This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ff122fe405 [improve] tdengine options (#9399)
ff122fe405 is described below
commit ff122fe4054e44a5fcdbdcf7936fb8e9fd370545
Author: Jarvis <[email protected]>
AuthorDate: Sat Jun 28 23:02:03 2025 +0800
[improve] tdengine options (#9399)
---
.../tdengine/config/TDengineCommonOptions.java | 1 +
.../tdengine/config/TDengineSourceConfig.java | 54 ++++------------------
.../tdengine/config/TDengineSourceOptions.java | 9 +---
.../seatunnel/tdengine/source/TDengineSource.java | 51 ++++++++------------
.../tdengine/source/TDengineSourceFactory.java | 27 +++++++----
5 files changed, 49 insertions(+), 93 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
index 2f9ae2d051..91344062c2 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
@@ -24,6 +24,7 @@ import lombok.Data;
@Data
public abstract class TDengineCommonOptions {
+
public static final Option<String> URL =
Options.key("url")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
index 4eabb754cf..5ea375f558 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -17,21 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
-
@Data
public class TDengineSourceConfig implements Serializable {
@@ -42,48 +34,20 @@ public class TDengineSourceConfig implements Serializable {
private String password;
private String database;
private String stable;
- // param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side,
other than the JDBC
- // client side
- // so this param represent the server-side timezone setting up
- private String timezone;
private String lowerBound;
private String upperBound;
private List<String> fields;
private List<String> tags;
- public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+ public static TDengineSourceConfig buildSourceConfig(ReadonlyConfig
pluginConfig) {
TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
- tdengineSourceConfig.setUrl(
- pluginConfig.hasPath(ConfigNames.URL)
- ? pluginConfig.getString(ConfigNames.URL)
- : null);
- tdengineSourceConfig.setDatabase(
- pluginConfig.hasPath(DATABASE) ?
pluginConfig.getString(DATABASE) : null);
- tdengineSourceConfig.setStable(
- pluginConfig.hasPath(STABLE) ? pluginConfig.getString(STABLE)
: null);
- tdengineSourceConfig.setUsername(
- pluginConfig.hasPath(USERNAME) ?
pluginConfig.getString(USERNAME) : null);
- tdengineSourceConfig.setPassword(
- pluginConfig.hasPath(PASSWORD) ?
pluginConfig.getString(PASSWORD) : null);
- tdengineSourceConfig.setUpperBound(
- pluginConfig.hasPath(UPPER_BOUND) ?
pluginConfig.getString(UPPER_BOUND) : null);
- tdengineSourceConfig.setLowerBound(
- pluginConfig.hasPath(LOWER_BOUND) ?
pluginConfig.getString(LOWER_BOUND) : null);
- tdengineSourceConfig.setTimezone(
- pluginConfig.hasPath(TIMEZONE) ?
pluginConfig.getString(TIMEZONE) : "UTC");
-
+
tdengineSourceConfig.setUrl(pluginConfig.get(TDengineSourceOptions.URL));
+
tdengineSourceConfig.setDatabase(pluginConfig.get(TDengineSourceOptions.DATABASE));
+
tdengineSourceConfig.setStable(pluginConfig.get(TDengineSourceOptions.STABLE));
+
tdengineSourceConfig.setUsername(pluginConfig.get(TDengineSourceOptions.USERNAME));
+
tdengineSourceConfig.setPassword(pluginConfig.get(TDengineSourceOptions.PASSWORD));
+
tdengineSourceConfig.setUpperBound(pluginConfig.get(TDengineSourceOptions.UPPER_BOUND));
+
tdengineSourceConfig.setLowerBound(pluginConfig.get(TDengineSourceOptions.LOWER_BOUND));
return tdengineSourceConfig;
}
-
- public static class ConfigNames {
-
- public static String URL = "url";
- public static String USERNAME = "username";
- public static String PASSWORD = "password";
- public static String DATABASE = "database";
- public static String STABLE = "stable";
- public static String TIMEZONE = "timezone";
- public static String LOWER_BOUND = "lower_bound";
- public static String UPPER_BOUND = "upper_bound";
- }
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
index 58ce50f92f..4d6bb0789d 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
@@ -20,21 +20,16 @@ package
org.apache.seatunnel.connectors.seatunnel.tdengine.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-@Data
-@AllArgsConstructor
public class TDengineSourceOptions extends TDengineCommonOptions {
public static final Option<String> LOWER_BOUND =
- Options.key("lowerBound")
+ Options.key("lower_bound")
.stringType()
.noDefaultValue()
.withDescription("The lower bound for data query range");
public static final Option<String> UPPER_BOUND =
- Options.key("upperBound")
+ Options.key("upper_bound")
.stringType()
.noDefaultValue()
.withDescription("The upper bound for data query range");
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
index e72773781a..7e396fa0f9 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -17,29 +17,24 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceReader.Context;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;
import org.apache.commons.lang3.ArrayUtils;
-import com.google.auto.service.AutoService;
import com.taosdata.jdbc.TSDBDriver;
import lombok.SneakyThrows;
@@ -49,14 +44,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
@@ -66,31 +57,25 @@ import static
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineU
* <p>TODO: wait for optimization 1. batch -> batch + stream 2. one item of
data writing -> a batch
* of data writing
*/
-@AutoService(SeaTunnelSource.class)
public class TDengineSource
implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit,
TDengineSourceState> {
- private StableMetadata stableMetadata;
- private TDengineSourceConfig tdengineSourceConfig;
+ private final StableMetadata stableMetadata;
+ private final TDengineSourceConfig tdengineSourceConfig;
+ private final CatalogTable catalogTable;
- @Override
- public String getPluginName() {
- return "TDengine";
+ @SneakyThrows
+ public TDengineSource(ReadonlyConfig pluginConfig) {
+ this.tdengineSourceConfig = buildSourceConfig(pluginConfig);
+ this.stableMetadata = getStableMetadata(tdengineSourceConfig);
+ this.catalogTable =
+ CatalogTableUtil.getCatalogTable(
+ tdengineSourceConfig.getStable(),
stableMetadata.getRowType());
}
- @SneakyThrows
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig, URL, DATABASE, STABLE, USERNAME,
PASSWORD);
- if (!result.isSuccess()) {
- throw new TDengineConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- "TDengine connection require
url/database/stable/username/password. All of these must not be empty.");
- }
- tdengineSourceConfig = buildSourceConfig(pluginConfig);
- stableMetadata = getStableMetadata(tdengineSourceConfig);
+ public String getPluginName() {
+ return "TDengine";
}
@Override
@@ -99,8 +84,8 @@ public class TDengineSource
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return stableMetadata.getRowType();
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
index 921e4d0d70..c875593ef0 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
@@ -19,18 +19,16 @@ package
org.apache.seatunnel.connectors.seatunnel.tdengine.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.LOWER_BOUND;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.STABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.UPPER_BOUND;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.USERNAME;
+import java.io.Serializable;
@AutoService(Factory.class)
public class TDengineSourceFactory implements TableSourceFactory {
@@ -43,10 +41,23 @@ public class TDengineSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(URL, USERNAME, PASSWORD, DATABASE, STABLE,
LOWER_BOUND, UPPER_BOUND)
+ .required(
+ TDengineSourceOptions.URL,
+ TDengineSourceOptions.USERNAME,
+ TDengineSourceOptions.PASSWORD,
+ TDengineSourceOptions.DATABASE,
+ TDengineSourceOptions.STABLE,
+ TDengineSourceOptions.LOWER_BOUND,
+ TDengineSourceOptions.UPPER_BOUND)
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
TDengineSource(context.getOptions());
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return TDengineSource.class;