This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b247ff0ae [Feature][Connector][influx] Expose configurable options in
influx db (#3392)
b247ff0ae is described below
commit b247ff0aefd7f26ac9225929265a143564b74271
Author: FWLamb <[email protected]>
AuthorDate: Mon Nov 21 12:29:51 2022 +0800
[Feature][Connector][influx] Expose configurable options in influx db
(#3392)
* expose configurable options in InfluxDB
Co-authored-by: yangbinbin <[email protected]>
---
docs/en/connector-v2/sink/InfluxDB.md | 33 +++---
docs/en/connector-v2/source/InfluxDB.md | 7 +-
.../seatunnel/influxdb/config/InfluxDBConfig.java | 84 ++++++++++------
.../seatunnel/influxdb/config/SinkConfig.java | 111 ++++++++++++++-------
.../seatunnel/influxdb/config/SourceConfig.java | 62 ++++++++----
.../seatunnel/influxdb/sink/InfluxDBSink.java | 2 +-
.../influxdb/sink/InfluxDBSinkFactory.java | 67 +++++++++++++
.../seatunnel/influxdb/source/InfluxDBSource.java | 2 +-
.../influxdb/source/InfluxDBSourceFactory.java | 63 ++++++++++++
.../source/InfluxDBSourceSplitEnumerator.java | 2 +-
10 files changed, 334 insertions(+), 99 deletions(-)
diff --git a/docs/en/connector-v2/sink/InfluxDB.md
b/docs/en/connector-v2/sink/InfluxDB.md
index cff9787f9..f2cb27406 100644
--- a/docs/en/connector-v2/sink/InfluxDB.md
+++ b/docs/en/connector-v2/sink/InfluxDB.md
@@ -13,20 +13,21 @@ Write data to InfluxDB.
## Options
-| name | type | required | default value
|
-|-----------------------------|----------|----------|-------------------------------|
-| url | string | yes | -
|
-| database | string | yes |
|
-| measurement | string | yes |
|
-| username | string | no | -
|
-| password | string | no | -
|
-| key_time | string | yes | processing time
|
-| key_tags | array | no | exclude `field` &
`key_time` |
-| batch_size | int | no | 1024
|
-| batch_interval_ms | int | no | -
|
-| max_retries | int | no | -
|
-| retry_backoff_multiplier_ms | int | no | -
|
-| connect_timeout_ms | long | no | 15000
|
+| name | type | required | default value
|
+|-----------------------------|--------|----------|------------------------------|
+| url | string | yes | -
|
+| database | string | yes |
|
+| measurement | string | yes |
|
+| username | string | no | -
|
+| password | string | no | -
|
+| key_time | string | no | processing time
|
+| key_tags | array | no | exclude `field` &
`key_time` |
+| batch_size | int | no | 1024
|
+| batch_interval_ms | int | no | -
|
+| max_retries | int | no | -
|
+| retry_backoff_multiplier_ms | int | no | -
|
+| connect_timeout_ms | long | no | 15000
|
+| common-options | config | no | -
|
### url
the url to connect to influxDB e.g.
@@ -82,6 +83,10 @@ The amount of time to wait before attempting to retry a
request to `influxDB`
### connect_timeout_ms [long]
the timeout for connecting to InfluxDB, in milliseconds
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
+
## Examples
```hocon
sink {
diff --git a/docs/en/connector-v2/source/InfluxDB.md
b/docs/en/connector-v2/source/InfluxDB.md
index 368e93731..501b932a5 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -36,6 +36,7 @@ supports query SQL and can achieve projection effect.
| epoch | string | no | n |
| connect_timeout_ms | long | no | 15000 |
| query_timeout_sec | int | no | 3 |
+| common-options | config | no | - |
### url
the url to connect to influxDB e.g.
@@ -124,7 +125,11 @@ returned time precision
the `query_timeout` of the InfluxDB when you select, in seconds
### connect_timeout_ms [long]
-the timeout for connecting to InfluxDB, in milliseconds
+the timeout for connecting to InfluxDB, in milliseconds
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
## Examples
Example of multi parallelism and multi partition scanning
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
index 1332c5cab..ee309c2d9 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.annotations.VisibleForTesting;
@@ -25,51 +28,74 @@ import lombok.Data;
import java.io.Serializable;
@Data
+@SuppressWarnings("checkstyle:MagicNumber")
public class InfluxDBConfig implements Serializable {
- public static final String USERNAME = "username";
- public static final String PASSWORD = "password";
- public static final String URL = "url";
- private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
- private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";
- public static final String DATABASES = "database";
- private static final String DEFAULT_FORMAT = "MSGPACK";
- protected static final String EPOCH = "epoch";
- private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
- private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;
- private static final String DEFAULT_EPOCH = "n";
+ public static final Option<String> USERNAME = Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server username");
+
+ public static final Option<String> PASSWORD = Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server password");
+
+ public static final Option<String> URL = Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server url");
+
+ public static final Option<Long> CONNECT_TIMEOUT_MS =
Options.key("connect_timeout_ms")
+ .longType()
+ .defaultValue(15000L)
+ .withDescription("the influxdb client connect timeout ms");
+
+ public static final Option<Integer> QUERY_TIMEOUT_SEC =
Options.key("query_timeout_sec")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the influxdb client query timeout ms");
+ public static final Option<String> DATABASES = Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server database");
+
+ public static final Option<String> EPOCH = Options.key("epoch")
+ .stringType()
+ .defaultValue("n")
+ .withDescription("the influxdb server query epoch");
+
+ private static final String DEFAULT_FORMAT = "MSGPACK";
private String url;
private String username;
private String password;
private String database;
-
private String format = DEFAULT_FORMAT;
- private int queryTimeOut = DEFAULT_QUERY_TIMEOUT_SEC;
- private long connectTimeOut = DEFAULT_CONNECT_TIMEOUT_MS;
-
- private String epoch = DEFAULT_EPOCH;
+ private int queryTimeOut = QUERY_TIMEOUT_SEC.defaultValue();
+ private long connectTimeOut = CONNECT_TIMEOUT_MS.defaultValue();
+ private String epoch = EPOCH.defaultValue();
public InfluxDBConfig(Config config) {
- this.url = config.getString(URL);
+ this.url = config.getString(URL.key());
- if (config.hasPath(USERNAME)) {
- this.username = config.getString(USERNAME);
+ if (config.hasPath(USERNAME.key())) {
+ this.username = config.getString(USERNAME.key());
}
- if (config.hasPath(PASSWORD)) {
- this.password = config.getString(PASSWORD);
+ if (config.hasPath(PASSWORD.key())) {
+ this.password = config.getString(PASSWORD.key());
}
- if (config.hasPath(DATABASES)) {
- this.database = config.getString(DATABASES);
+ if (config.hasPath(DATABASES.key())) {
+ this.database = config.getString(DATABASES.key());
}
- if (config.hasPath(EPOCH)) {
- this.epoch = config.getString(EPOCH);
+ if (config.hasPath(EPOCH.key())) {
+ this.epoch = config.getString(EPOCH.key());
}
- if (config.hasPath(CONNECT_TIMEOUT_MS)) {
- this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS);
+ if (config.hasPath(CONNECT_TIMEOUT_MS.key())) {
+ this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS.key());
}
- if (config.hasPath(QUERY_TIMEOUT_SEC)) {
- this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC);
+ if (config.hasPath(QUERY_TIMEOUT_SEC.key())) {
+ this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC.key());
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
index c97d807fa..84ac2f58c 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.Getter;
@@ -28,32 +31,70 @@ import java.util.List;
@Setter
@Getter
@ToString
-public class SinkConfig extends InfluxDBConfig{
+@SuppressWarnings("checkstyle:MagicNumber")
+public class SinkConfig extends InfluxDBConfig {
public SinkConfig(Config config) {
super(config);
}
- private static final String KEY_TIME = "key_time";
- private static final String KEY_TAGS = "key_tags";
- public static final String KEY_MEASUREMENT = "measurement";
-
- private static final String BATCH_SIZE = "batch_size";
- private static final String BATCH_INTERVAL_MS = "batch_interval_ms";
- private static final String MAX_RETRIES = "max_retries";
- private static final String WRITE_TIMEOUT = "write_timeout";
- private static final String RETRY_BACKOFF_MULTIPLIER_MS =
"retry_backoff_multiplier_ms";
- private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
- private static final String RETENTION_POLICY = "rp";
- private static final int DEFAULT_BATCH_SIZE = 1024;
- private static final int DEFAULT_WRITE_TIMEOUT = 5;
+ public static final Option<String> KEY_TIME = Options.key("key_time")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server key time");
+
+ public static final Option<List<String>> KEY_TAGS = Options.key("key_tags")
+ .listType()
+ .noDefaultValue()
+ .withDescription("the influxdb server key tags");
+
+ public static final Option<String> KEY_MEASUREMENT =
Options.key("measurement")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server measurement");
+
+ public static final Option<Integer> BATCH_SIZE = Options.key("batch_size")
+ .intType()
+ .defaultValue(1024)
+ .withDescription("batch size of the influxdb client");
+
+ public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription("batch interval ms of the influxdb client");
+
+ public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries")
+ .intType()
+ .noDefaultValue()
+ .withDescription("max retries of the influxdb client");
+
+ public static final Option<Integer> WRITE_TIMEOUT =
Options.key("write_timeout")
+ .intType()
+ .defaultValue(5)
+ .withDescription("the influxdb client write data timeout");
+
+ public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
Options.key("retry_backoff_multiplier_ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription("the influxdb client retry backoff multiplier ms");
+
+ public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
Options.key("max_retry_backoff_ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription("the influxdb client max retry backoff ms");
+
+ public static final Option<String> RETENTION_POLICY = Options.key("rp")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb client retention policy");
+
private static final TimePrecision DEFAULT_TIME_PRECISION =
TimePrecision.NS;
private String rp;
private String measurement;
- private int writeTimeout = DEFAULT_WRITE_TIMEOUT;
+ private int writeTimeout = WRITE_TIMEOUT.defaultValue();
private String keyTime;
private List<String> keyTags;
- private int batchSize = DEFAULT_BATCH_SIZE;
+ private int batchSize = BATCH_SIZE.defaultValue();
private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
@@ -63,34 +104,34 @@ public class SinkConfig extends InfluxDBConfig{
public static SinkConfig loadConfig(Config config) {
SinkConfig sinkConfig = new SinkConfig(config);
- if (config.hasPath(KEY_TIME)) {
- sinkConfig.setKeyTime(config.getString(KEY_TIME));
+ if (config.hasPath(KEY_TIME.key())) {
+ sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
}
- if (config.hasPath(KEY_TAGS)) {
- sinkConfig.setKeyTags(config.getStringList(KEY_TAGS));
+ if (config.hasPath(KEY_TAGS.key())) {
+ sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
}
- if (config.hasPath(BATCH_INTERVAL_MS)) {
- sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS));
+ if (config.hasPath(BATCH_INTERVAL_MS.key())) {
+
sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS.key()));
}
- if (config.hasPath(MAX_RETRIES)) {
- sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES));
+ if (config.hasPath(MAX_RETRIES.key())) {
+ sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
}
- if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
-
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+ if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
+
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
}
- if (config.hasPath(MAX_RETRY_BACKOFF_MS)) {
-
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS));
+ if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
+
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
}
- if (config.hasPath(WRITE_TIMEOUT)) {
- sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT));
+ if (config.hasPath(WRITE_TIMEOUT.key())) {
+ sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
}
- if (config.hasPath(RETENTION_POLICY)) {
- sinkConfig.setRp(config.getString(RETENTION_POLICY));
+ if (config.hasPath(RETENTION_POLICY.key())) {
+ sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
}
- if (config.hasPath(EPOCH)) {
-
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH)));
+ if (config.hasPath(EPOCH.key())) {
+
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
}
- sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT));
+ sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
return sinkConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
index d0c3fe65e..0cb123524 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.Getter;
@@ -24,14 +27,39 @@ import lombok.Getter;
import java.util.List;
@Getter
-public class SourceConfig extends InfluxDBConfig{
- public static final String SQL = "sql";
- public static final String SQL_WHERE = "where";
- public static final String SPLIT_COLUMN = "split_column";
- private static final String PARTITION_NUM = "partition_num";
- private static final String UPPER_BOUND = "upper_bound";
- private static final String LOWER_BOUND = "lower_bound";
- public static final String DEFAULT_PARTITIONS = "0";
+public class SourceConfig extends InfluxDBConfig {
+
+ public static final Option<String> SQL = Options.key("sql")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server query sql");
+
+ public static final Option<String> SQL_WHERE = Options.key("where")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server query sql where condition");
+
+ public static final Option<String> SPLIT_COLUMN =
Options.key("split_column")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb column which is used as split key");
+
+ public static final Option<String> PARTITION_NUM =
Options.key("partition_num")
+ .stringType()
+ .defaultValue("0")
+ .withDescription("the influxdb server partition num");
+
+ public static final Option<String> UPPER_BOUND = Options.key("upper_bound")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server upper bound");
+
+ public static final Option<String> LOWER_BOUND = Options.key("lower_bound")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the influxdb server lower bound");
+
+ public static final String DEFAULT_PARTITIONS =
PARTITION_NUM.defaultValue();
private String sql;
private int partitionNum = 0;
private String splitKey;
@@ -47,19 +75,19 @@ public class SourceConfig extends InfluxDBConfig{
public static SourceConfig loadConfig(Config config) {
SourceConfig sourceConfig = new SourceConfig(config);
- sourceConfig.sql = config.getString(SQL);
+ sourceConfig.sql = config.getString(SQL.key());
- if (config.hasPath(PARTITION_NUM)) {
- sourceConfig.partitionNum = config.getInt(PARTITION_NUM);
+ if (config.hasPath(PARTITION_NUM.key())) {
+ sourceConfig.partitionNum = config.getInt(PARTITION_NUM.key());
}
- if (config.hasPath(UPPER_BOUND)) {
- sourceConfig.upperBound = config.getInt(UPPER_BOUND);
+ if (config.hasPath(UPPER_BOUND.key())) {
+ sourceConfig.upperBound = config.getInt(UPPER_BOUND.key());
}
- if (config.hasPath(LOWER_BOUND)) {
- sourceConfig.lowerBound = config.getInt(LOWER_BOUND);
+ if (config.hasPath(LOWER_BOUND.key())) {
+ sourceConfig.lowerBound = config.getInt(LOWER_BOUND.key());
}
- if (config.hasPath(SPLIT_COLUMN)) {
- sourceConfig.splitKey = config.getString(SPLIT_COLUMN);
+ if (config.hasPath(SPLIT_COLUMN.key())) {
+ sourceConfig.splitKey = config.getString(SPLIT_COLUMN.key());
}
return sourceConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 8d3eb7290..074e5a518 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -51,7 +51,7 @@ public class InfluxDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, URL,
KEY_MEASUREMENT);
+ CheckResult result = CheckConfigUtil.checkAllExists(config, URL.key(),
KEY_MEASUREMENT.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
new file mode 100644
index 000000000..01d53841e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_INTERVAL_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TAGS;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TIME;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.MAX_RETRIES;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class InfluxDBSinkFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "InfluxDB";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ URL,
+ DATABASES,
+ KEY_MEASUREMENT
+ )
+ .bundled(USERNAME, PASSWORD)
+ .optional(
+ CONNECT_TIMEOUT_MS,
+ KEY_TAGS,
+ KEY_TIME,
+ BATCH_SIZE,
+ BATCH_INTERVAL_MS,
+ MAX_RETRIES,
+ RETRY_BACKOFF_MULTIPLIER_MS
+ )
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 804e804f5..26230f0ae 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -65,7 +65,7 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, SQL);
+ CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
new file mode 100644
index 000000000..c5cb3ccd9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.EPOCH;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_TIMEOUT_SEC;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.LOWER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.PARTITION_NUM;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SPLIT_COLUMN;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.UPPER_BOUND;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class InfluxDBSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "InfluxDB";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ URL,
+ SQL,
+ DATABASES
+ )
+ .bundled(USERNAME, PASSWORD)
+ .bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
+ .optional(
+ EPOCH,
+ CONNECT_TIMEOUT_MS,
+ QUERY_TIMEOUT_SEC
+ )
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index 139a6e3ad..aad78e6fc 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -119,7 +119,7 @@ public class InfluxDBSourceSplitEnumerator implements
SourceSplitEnumerator<Infl
//calculate numRange base on (lowerBound upperBound partitionNum)
List<Pair<Long, Long>> rangePairs =
genSplitNumRange(config.getLowerBound(), config.getUpperBound(),
config.getPartitionNum());
- String[] sqls = sql.split(SQL_WHERE);
+ String[] sqls = sql.split(SQL_WHERE.key());
if (sqls.length > 2) {
throw new IllegalArgumentException("sql should not contain more
than one where");
}