This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9f498b8133 [Improve] influxdb options (#8966)
9f498b8133 is described below
commit 9f498b81333d2bf82be088be5b1306afed39b11a
Author: Jarvis <[email protected]>
AuthorDate: Tue Mar 18 13:26:54 2025 +0800
[Improve] influxdb options (#8966)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
...luxDBConfig.java => InfluxDBCommonOptions.java} | 48 +---------
.../seatunnel/influxdb/config/InfluxDBConfig.java | 83 +++-------------
.../{SinkConfig.java => InfluxDBSinkOptions.java} | 57 +----------
...ourceConfig.java => InfluxDBSourceOptions.java} | 56 ++---------
.../seatunnel/influxdb/config/SinkConfig.java | 105 ++++-----------------
.../seatunnel/influxdb/config/SourceConfig.java | 67 +++----------
.../influxdb/sink/InfluxDBSinkFactory.java | 44 ++++-----
.../seatunnel/influxdb/source/InfluxDBSource.java | 69 ++++----------
.../influxdb/source/InfluxDBSourceFactory.java | 49 ++++++----
.../source/InfluxDBSourceSplitEnumerator.java | 8 +-
11 files changed, 130 insertions(+), 458 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index de24681ac5..243e06922e 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,8 +195,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("MilvusSourceOptions");
whiteList.add("RocketMqSinkOptions");
whiteList.add("MaxcomputeSourceOptions");
- whiteList.add("InfluxDBSourceOptions");
- whiteList.add("InfluxDBSinkOptions");
whiteList.add("KuduSourceOptions");
whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBCommonOptions.java
similarity index 62%
copy from
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
copy to
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBCommonOptions.java
index eb05f53671..362ff09681 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBCommonOptions.java
@@ -17,18 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
-import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class InfluxDBConfig implements Serializable {
+public class InfluxDBCommonOptions {
public static final Option<String> USERNAME =
Options.key("username")
@@ -71,42 +63,4 @@ public class InfluxDBConfig implements Serializable {
.stringType()
.defaultValue("n")
.withDescription("the influxdb server query epoch");
-
- private static final String DEFAULT_FORMAT = "MSGPACK";
- private String url;
- private String username;
- private String password;
- private String database;
- private String format = DEFAULT_FORMAT;
- private int queryTimeOut = QUERY_TIMEOUT_SEC.defaultValue();
- private long connectTimeOut = CONNECT_TIMEOUT_MS.defaultValue();
- private String epoch = EPOCH.defaultValue();
-
- public InfluxDBConfig(Config config) {
- this.url = config.getString(URL.key());
-
- if (config.hasPath(USERNAME.key())) {
- this.username = config.getString(USERNAME.key());
- }
- if (config.hasPath(PASSWORD.key())) {
- this.password = config.getString(PASSWORD.key());
- }
- if (config.hasPath(DATABASES.key())) {
- this.database = config.getString(DATABASES.key());
- }
- if (config.hasPath(EPOCH.key())) {
- this.epoch = config.getString(EPOCH.key());
- }
- if (config.hasPath(CONNECT_TIMEOUT_MS.key())) {
- this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS.key());
- }
- if (config.hasPath(QUERY_TIMEOUT_SEC.key())) {
- this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC.key());
- }
- }
-
- @VisibleForTesting
- public InfluxDBConfig(String url) {
- this.url = url;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
index eb05f53671..c7d1ddc5f5 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -18,10 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Data;
@@ -30,79 +28,24 @@ import java.io.Serializable;
@Data
public class InfluxDBConfig implements Serializable {
- public static final Option<String> USERNAME =
- Options.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server username");
-
- public static final Option<String> PASSWORD =
- Options.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server password");
-
- public static final Option<String> URL =
- Options.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server url");
-
- public static final Option<Long> CONNECT_TIMEOUT_MS =
- Options.key("connect_timeout_ms")
- .longType()
- .defaultValue(15000L)
- .withDescription("the influxdb client connect timeout ms");
-
- public static final Option<Integer> QUERY_TIMEOUT_SEC =
- Options.key("query_timeout_sec")
- .intType()
- .defaultValue(3)
- .withDescription("the influxdb client query timeout ms");
-
- public static final Option<String> DATABASES =
- Options.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server database");
-
- public static final Option<String> EPOCH =
- Options.key("epoch")
- .stringType()
- .defaultValue("n")
- .withDescription("the influxdb server query epoch");
-
private static final String DEFAULT_FORMAT = "MSGPACK";
private String url;
private String username;
private String password;
private String database;
private String format = DEFAULT_FORMAT;
- private int queryTimeOut = QUERY_TIMEOUT_SEC.defaultValue();
- private long connectTimeOut = CONNECT_TIMEOUT_MS.defaultValue();
- private String epoch = EPOCH.defaultValue();
-
- public InfluxDBConfig(Config config) {
- this.url = config.getString(URL.key());
-
- if (config.hasPath(USERNAME.key())) {
- this.username = config.getString(USERNAME.key());
- }
- if (config.hasPath(PASSWORD.key())) {
- this.password = config.getString(PASSWORD.key());
- }
- if (config.hasPath(DATABASES.key())) {
- this.database = config.getString(DATABASES.key());
- }
- if (config.hasPath(EPOCH.key())) {
- this.epoch = config.getString(EPOCH.key());
- }
- if (config.hasPath(CONNECT_TIMEOUT_MS.key())) {
- this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS.key());
- }
- if (config.hasPath(QUERY_TIMEOUT_SEC.key())) {
- this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC.key());
- }
+ private int queryTimeOut;
+ private long connectTimeOut;
+ private String epoch;
+
+ public InfluxDBConfig(ReadonlyConfig config) {
+ this.url = config.get(InfluxDBCommonOptions.URL);
+ this.username = config.get(InfluxDBCommonOptions.USERNAME);
+ this.password = config.get(InfluxDBCommonOptions.PASSWORD);
+ this.database = config.get(InfluxDBCommonOptions.DATABASES);
+ this.epoch = config.get(InfluxDBCommonOptions.EPOCH);
+ this.connectTimeOut =
config.get(InfluxDBCommonOptions.CONNECT_TIMEOUT_MS);
+ this.queryTimeOut =
config.get(InfluxDBCommonOptions.QUERY_TIMEOUT_SEC);
}
@VisibleForTesting
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSinkOptions.java
similarity index 62%
copy from
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
copy to
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSinkOptions.java
index 071e3c235f..34bb61160a 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSinkOptions.java
@@ -17,25 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
import java.util.List;
-@Setter
-@Getter
-@ToString
-public class SinkConfig extends InfluxDBConfig {
- public SinkConfig(Config config) {
- super(config);
- loadConfig(config);
- }
+public class InfluxDBSinkOptions extends InfluxDBCommonOptions {
public static final Option<String> KEY_TIME =
Options.key("key_time")
@@ -90,46 +77,4 @@ public class SinkConfig extends InfluxDBConfig {
.stringType()
.noDefaultValue()
.withDescription("the influxdb client retention policy");
-
- private static final TimePrecision DEFAULT_TIME_PRECISION =
TimePrecision.NS;
-
- private String rp;
- private String measurement;
- private int writeTimeout = WRITE_TIMEOUT.defaultValue();
- private String keyTime;
- private List<String> keyTags;
- private int batchSize = BATCH_SIZE.defaultValue();
- private int maxRetries;
- private int retryBackoffMultiplierMs;
- private int maxRetryBackoffMs;
- private TimePrecision precision = DEFAULT_TIME_PRECISION;
-
- public void loadConfig(Config config) {
-
- if (config.hasPath(KEY_TIME.key())) {
- setKeyTime(config.getString(KEY_TIME.key()));
- }
- if (config.hasPath(KEY_TAGS.key())) {
- setKeyTags(config.getStringList(KEY_TAGS.key()));
- }
- if (config.hasPath(MAX_RETRIES.key())) {
- setMaxRetries(config.getInt(MAX_RETRIES.key()));
- }
- if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
- }
- if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
- setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
- }
- if (config.hasPath(WRITE_TIMEOUT.key())) {
- setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
- }
- if (config.hasPath(RETENTION_POLICY.key())) {
- setRp(config.getString(RETENTION_POLICY.key()));
- }
- if (config.hasPath(EPOCH.key())) {
-
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
- }
- setMeasurement(config.getString(KEY_MEASUREMENT.key()));
- }
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSourceOptions.java
similarity index 57%
copy from
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
copy to
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSourceOptions.java
index 50ca08ec55..bdca8df284 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSourceOptions.java
@@ -17,17 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import lombok.Getter;
-
-import java.util.List;
-
-@Getter
-public class SourceConfig extends InfluxDBConfig {
+public class InfluxDBSourceOptions extends InfluxDBCommonOptions {
public static final Option<String> SQL =
Options.key("sql")
@@ -47,54 +40,21 @@ public class SourceConfig extends InfluxDBConfig {
.noDefaultValue()
.withDescription("the influxdb column which is used as
split key");
- public static final Option<String> PARTITION_NUM =
+ public static final Option<Integer> PARTITION_NUM =
Options.key("partition_num")
- .stringType()
- .defaultValue("0")
+ .intType()
+ .defaultValue(0)
.withDescription("the influxdb server partition num");
- public static final Option<String> UPPER_BOUND =
+ public static final Option<Integer> UPPER_BOUND =
Options.key("upper_bound")
- .stringType()
+ .intType()
.noDefaultValue()
.withDescription("the influxdb server upper bound");
- public static final Option<String> LOWER_BOUND =
+ public static final Option<Integer> LOWER_BOUND =
Options.key("lower_bound")
- .stringType()
+ .intType()
.noDefaultValue()
.withDescription("the influxdb server lower bound");
-
- public static final String DEFAULT_PARTITIONS =
PARTITION_NUM.defaultValue();
- private String sql;
- private int partitionNum = 0;
- private String splitKey;
- private long lowerBound;
- private long upperBound;
-
- List<Integer> columnsIndex;
-
- public SourceConfig(Config config) {
- super(config);
- }
-
- public static SourceConfig loadConfig(Config config) {
- SourceConfig sourceConfig = new SourceConfig(config);
-
- sourceConfig.sql = config.getString(SQL.key());
-
- if (config.hasPath(PARTITION_NUM.key())) {
- sourceConfig.partitionNum = config.getInt(PARTITION_NUM.key());
- }
- if (config.hasPath(UPPER_BOUND.key())) {
- sourceConfig.upperBound = config.getInt(UPPER_BOUND.key());
- }
- if (config.hasPath(LOWER_BOUND.key())) {
- sourceConfig.lowerBound = config.getInt(LOWER_BOUND.key());
- }
- if (config.hasPath(SPLIT_COLUMN.key())) {
- sourceConfig.splitKey = config.getString(SPLIT_COLUMN.key());
- }
- return sourceConfig;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
index 071e3c235f..b1eaabfe2a 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
import lombok.Setter;
@@ -32,104 +29,42 @@ import java.util.List;
@Getter
@ToString
public class SinkConfig extends InfluxDBConfig {
- public SinkConfig(Config config) {
+
+ public SinkConfig(ReadonlyConfig config) {
super(config);
loadConfig(config);
}
- public static final Option<String> KEY_TIME =
- Options.key("key_time")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server key time");
-
- public static final Option<List<String>> KEY_TAGS =
- Options.key("key_tags")
- .listType()
- .noDefaultValue()
- .withDescription("the influxdb server key tags");
-
- public static final Option<String> KEY_MEASUREMENT =
- Options.key("measurement")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server measurement");
-
- public static final Option<Integer> BATCH_SIZE =
- Options.key("batch_size")
- .intType()
- .defaultValue(1024)
- .withDescription("batch size of the influxdb client");
-
- public static final Option<Integer> MAX_RETRIES =
- Options.key("max_retries")
- .intType()
- .noDefaultValue()
- .withDescription("max retries of the influxdb client");
-
- public static final Option<Integer> WRITE_TIMEOUT =
- Options.key("write_timeout")
- .intType()
- .defaultValue(5)
- .withDescription("the influxdb client write data timeout");
-
- public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
- Options.key("retry_backoff_multiplier_ms")
- .intType()
- .noDefaultValue()
- .withDescription("the influxdb client retry backoff
multiplier ms");
-
- public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
- Options.key("max_retry_backoff_ms")
- .intType()
- .noDefaultValue()
- .withDescription("the influxdb client max retry backoff
ms");
-
- public static final Option<String> RETENTION_POLICY =
- Options.key("rp")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb client retention policy");
-
private static final TimePrecision DEFAULT_TIME_PRECISION =
TimePrecision.NS;
private String rp;
private String measurement;
- private int writeTimeout = WRITE_TIMEOUT.defaultValue();
+ private int writeTimeout;
private String keyTime;
private List<String> keyTags;
- private int batchSize = BATCH_SIZE.defaultValue();
+ private int batchSize;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
private TimePrecision precision = DEFAULT_TIME_PRECISION;
- public void loadConfig(Config config) {
-
- if (config.hasPath(KEY_TIME.key())) {
- setKeyTime(config.getString(KEY_TIME.key()));
- }
- if (config.hasPath(KEY_TAGS.key())) {
- setKeyTags(config.getStringList(KEY_TAGS.key()));
- }
- if (config.hasPath(MAX_RETRIES.key())) {
- setMaxRetries(config.getInt(MAX_RETRIES.key()));
- }
- if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
- }
- if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
- setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
- }
- if (config.hasPath(WRITE_TIMEOUT.key())) {
- setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
+ public void loadConfig(ReadonlyConfig config) {
+ setKeyTime(config.get(InfluxDBSinkOptions.KEY_TIME));
+ setKeyTags(config.get(InfluxDBSinkOptions.KEY_TAGS));
+ setBatchSize(config.get(InfluxDBSinkOptions.BATCH_SIZE));
+ if (config.getOptional(InfluxDBSinkOptions.MAX_RETRIES).isPresent()) {
+ setMaxRetries(config.get(InfluxDBSinkOptions.MAX_RETRIES));
}
- if (config.hasPath(RETENTION_POLICY.key())) {
- setRp(config.getString(RETENTION_POLICY.key()));
+ if
(config.getOptional(InfluxDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS).isPresent())
{
+ setRetryBackoffMultiplierMs(
+
config.get(InfluxDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS));
}
- if (config.hasPath(EPOCH.key())) {
-
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
+ if
(config.getOptional(InfluxDBSinkOptions.MAX_RETRY_BACKOFF_MS).isPresent()) {
+
setMaxRetryBackoffMs(config.get(InfluxDBSinkOptions.MAX_RETRY_BACKOFF_MS));
}
- setMeasurement(config.getString(KEY_MEASUREMENT.key()));
+ setWriteTimeout(config.get(InfluxDBSinkOptions.WRITE_TIMEOUT));
+ setRp(config.get(InfluxDBSinkOptions.RETENTION_POLICY));
+
setPrecision(TimePrecision.getPrecision(config.get(InfluxDBSinkOptions.EPOCH)));
+ setMeasurement(config.get(InfluxDBSinkOptions.KEY_MEASUREMENT));
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
index 50ca08ec55..67cd62052c 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
@@ -29,43 +26,7 @@ import java.util.List;
@Getter
public class SourceConfig extends InfluxDBConfig {
- public static final Option<String> SQL =
- Options.key("sql")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server query sql");
-
- public static final Option<String> SQL_WHERE =
- Options.key("where")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server query sql where
condition");
-
- public static final Option<String> SPLIT_COLUMN =
- Options.key("split_column")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb column which is used as
split key");
-
- public static final Option<String> PARTITION_NUM =
- Options.key("partition_num")
- .stringType()
- .defaultValue("0")
- .withDescription("the influxdb server partition num");
-
- public static final Option<String> UPPER_BOUND =
- Options.key("upper_bound")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server upper bound");
-
- public static final Option<String> LOWER_BOUND =
- Options.key("lower_bound")
- .stringType()
- .noDefaultValue()
- .withDescription("the influxdb server lower bound");
-
- public static final String DEFAULT_PARTITIONS =
PARTITION_NUM.defaultValue();
+ public static final int DEFAULT_PARTITIONS =
InfluxDBSourceOptions.PARTITION_NUM.defaultValue();
private String sql;
private int partitionNum = 0;
private String splitKey;
@@ -74,26 +35,22 @@ public class SourceConfig extends InfluxDBConfig {
List<Integer> columnsIndex;
- public SourceConfig(Config config) {
+ public SourceConfig(ReadonlyConfig config) {
super(config);
}
- public static SourceConfig loadConfig(Config config) {
+ public static SourceConfig loadConfig(ReadonlyConfig config) {
SourceConfig sourceConfig = new SourceConfig(config);
-
- sourceConfig.sql = config.getString(SQL.key());
-
- if (config.hasPath(PARTITION_NUM.key())) {
- sourceConfig.partitionNum = config.getInt(PARTITION_NUM.key());
- }
- if (config.hasPath(UPPER_BOUND.key())) {
- sourceConfig.upperBound = config.getInt(UPPER_BOUND.key());
+ sourceConfig.sql = config.get(InfluxDBSourceOptions.SQL);
+ sourceConfig.partitionNum =
config.get(InfluxDBSourceOptions.PARTITION_NUM);
+ if (config.getOptional(InfluxDBSourceOptions.UPPER_BOUND).isPresent())
{
+ sourceConfig.upperBound =
config.get(InfluxDBSourceOptions.UPPER_BOUND);
}
- if (config.hasPath(LOWER_BOUND.key())) {
- sourceConfig.lowerBound = config.getInt(LOWER_BOUND.key());
+ if (config.getOptional(InfluxDBSourceOptions.LOWER_BOUND).isPresent())
{
+ sourceConfig.lowerBound =
config.get(InfluxDBSourceOptions.LOWER_BOUND);
}
- if (config.hasPath(SPLIT_COLUMN.key())) {
- sourceConfig.splitKey = config.getString(SPLIT_COLUMN.key());
+ if
(config.getOptional(InfluxDBSourceOptions.SPLIT_COLUMN).isPresent()) {
+ sourceConfig.splitKey =
config.get(InfluxDBSourceOptions.SPLIT_COLUMN);
}
return sourceConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
index 6206d7b8d5..ad2b726155 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import com.google.auto.service.AutoService;
@@ -33,18 +34,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TAGS;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TIME;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.MAX_RETRIES;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
-
@AutoService(Factory.class)
@Slf4j
public class InfluxDBSinkFactory implements TableSinkFactory {
@@ -57,16 +46,21 @@ public class InfluxDBSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(URL, DATABASES)
- .bundled(USERNAME, PASSWORD)
+ .required(InfluxDBSinkOptions.URL,
InfluxDBSinkOptions.DATABASES)
+ .bundled(InfluxDBSinkOptions.USERNAME,
InfluxDBSinkOptions.PASSWORD)
.optional(
- CONNECT_TIMEOUT_MS,
- KEY_MEASUREMENT,
- KEY_TAGS,
- KEY_TIME,
- BATCH_SIZE,
- MAX_RETRIES,
- RETRY_BACKOFF_MULTIPLIER_MS,
+ InfluxDBSinkOptions.CONNECT_TIMEOUT_MS,
+ InfluxDBSinkOptions.KEY_MEASUREMENT,
+ InfluxDBSinkOptions.KEY_TAGS,
+ InfluxDBSinkOptions.KEY_TIME,
+ InfluxDBSinkOptions.BATCH_SIZE,
+ InfluxDBSinkOptions.MAX_RETRIES,
+ InfluxDBSinkOptions.WRITE_TIMEOUT,
+ InfluxDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
+ InfluxDBSinkOptions.MAX_RETRY_BACKOFF_MS,
+ InfluxDBSinkOptions.RETENTION_POLICY,
+ InfluxDBSinkOptions.QUERY_TIMEOUT_SEC,
+ InfluxDBSinkOptions.EPOCH,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
@@ -75,12 +69,14 @@ public class InfluxDBSinkFactory implements
TableSinkFactory {
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
- if (!config.getOptional(KEY_MEASUREMENT).isPresent()) {
+ if
(!config.getOptional(InfluxDBSinkOptions.KEY_MEASUREMENT).isPresent()) {
Map<String, String> map = config.toMap();
- map.put(KEY_MEASUREMENT.key(),
catalogTable.getTableId().toTablePath().getFullName());
+ map.put(
+ InfluxDBSinkOptions.KEY_MEASUREMENT.key(),
+ catalogTable.getTableId().toTablePath().getFullName());
config = ReadonlyConfig.fromMap(new HashMap<>(map));
}
- SinkConfig sinkConfig = new SinkConfig(config.toConfig());
+ SinkConfig sinkConfig = new SinkConfig(config);
return () -> new InfluxDBSink(sinkConfig, catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 23bc69a440..282880482e 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -17,25 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
@@ -46,59 +35,35 @@ import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
-import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
-
@Slf4j
-@AutoService(SeaTunnelSource.class)
public class InfluxDBSource
implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit,
InfluxDBSourceState>,
SupportParallelism,
SupportColumnProjection {
- private SeaTunnelRowType typeInfo;
- private SourceConfig sourceConfig;
- private List<Integer> columnsIndexList;
+ private final CatalogTable catalogTable;
+ private final SourceConfig sourceConfig;
private static final String QUERY_LIMIT = " limit 1";
- @Override
- public String getPluginName() {
- return "InfluxDB";
+ public InfluxDBSource(CatalogTable catalogTable, SourceConfig
sourceConfig) {
+ this.catalogTable = catalogTable;
+ this.sourceConfig = sourceConfig;
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- config, SQL.key(),
ConnectorCommonOptions.SCHEMA.key());
- if (!result.isSuccess()) {
- throw new InfluxdbConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- try {
- this.sourceConfig = SourceConfig.loadConfig(config);
- this.typeInfo =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
- this.columnsIndexList =
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
- } catch (Exception e) {
- throw new InfluxdbConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
ExceptionUtils.getMessage(e)));
- }
+ public String getPluginName() {
+ return "InfluxDB";
}
@Override
@@ -106,14 +71,11 @@ public class InfluxDBSource
return Boundedness.BOUNDED;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return typeInfo;
- }
-
@Override
public SourceReader createReader(SourceReader.Context readerContext)
throws Exception {
- return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo,
columnsIndexList);
+ List<Integer> columnsIndexList =
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
+ return new InfluxdbSourceReader(
+ sourceConfig, readerContext,
catalogTable.getSeaTunnelRowType(), columnsIndexList);
}
@Override
@@ -130,6 +92,11 @@ public class InfluxDBSource
return new InfluxDBSourceSplitEnumerator(enumeratorContext,
checkpointState, sourceConfig);
}
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
+ }
+
private List<Integer> initColumnsIndex(InfluxDB influxdb) {
// query one row to get column info
String sql = sourceConfig.getSql();
@@ -148,7 +115,7 @@ public class InfluxDBSource
List<QueryResult.Series> serieList =
queryResult.getResults().get(0).getSeries();
List<String> fieldNames = new
ArrayList<>(serieList.get(0).getColumns());
- return Arrays.stream(typeInfo.getFieldNames())
+ return
Arrays.stream(catalogTable.getSeaTunnelRowType().getFieldNames())
.map(fieldNames::indexOf)
.collect(Collectors.toList());
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
index 0c03aa3223..9836c4ef48 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
@@ -20,23 +20,18 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.EPOCH;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_TIMEOUT_SEC;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.LOWER_BOUND;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.PARTITION_NUM;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SPLIT_COLUMN;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.UPPER_BOUND;
+import java.io.Serializable;
@AutoService(Factory.class)
public class InfluxDBSourceFactory implements TableSourceFactory {
@@ -48,13 +43,35 @@ public class InfluxDBSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(URL, SQL, DATABASES, ConnectorCommonOptions.SCHEMA)
- .bundled(USERNAME, PASSWORD)
- .bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
- .optional(EPOCH, CONNECT_TIMEOUT_MS, QUERY_TIMEOUT_SEC)
+ .required(
+ InfluxDBSourceOptions.URL,
+ InfluxDBSourceOptions.SQL,
+ InfluxDBSourceOptions.DATABASES,
+ ConnectorCommonOptions.SCHEMA)
+ .bundled(InfluxDBSourceOptions.USERNAME,
InfluxDBSourceOptions.PASSWORD)
+ .bundled(
+ InfluxDBSourceOptions.LOWER_BOUND,
+ InfluxDBSourceOptions.UPPER_BOUND,
+ InfluxDBSourceOptions.PARTITION_NUM,
+ InfluxDBSourceOptions.SPLIT_COLUMN)
+ .optional(
+ InfluxDBSourceOptions.EPOCH,
+ InfluxDBSourceOptions.SQL_WHERE,
+ InfluxDBSourceOptions.CONNECT_TIMEOUT_MS,
+ InfluxDBSourceOptions.QUERY_TIMEOUT_SEC)
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new InfluxDBSource(
+
CatalogTableUtil.buildWithConfig(context.getOptions()),
+ SourceConfig.loadConfig(context.getOptions()));
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return InfluxDBSource.class;
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index 690ea29181..98acc30e9f 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
@@ -36,8 +37,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE;
-
@Slf4j
public class InfluxDBSourceSplitEnumerator
implements SourceSplitEnumerator<InfluxDBSourceSplit,
InfluxDBSourceState> {
@@ -119,7 +118,8 @@ public class InfluxDBSourceSplitEnumerator
Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
// no need numPartitions, use one partition
if (config.getPartitionNum() == 0) {
- influxDBSourceSplits.add(new
InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql));
+ influxDBSourceSplits.add(
+ new
InfluxDBSourceSplit(String.valueOf(SourceConfig.DEFAULT_PARTITIONS), sql));
return influxDBSourceSplits;
}
// calculate numRange base on (lowerBound upperBound partitionNum)
@@ -127,7 +127,7 @@ public class InfluxDBSourceSplitEnumerator
genSplitNumRange(
config.getLowerBound(), config.getUpperBound(),
config.getPartitionNum());
- String[] sqls = sql.split(SQL_WHERE.key());
+ String[] sqls = sql.split(InfluxDBSourceOptions.SQL_WHERE.key());
if (sqls.length > 2) {
throw new InfluxdbConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,