This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b247ff0ae [Feature][Connector][influx] Expose configurable options in 
influx db (#3392)
b247ff0ae is described below

commit b247ff0aefd7f26ac9225929265a143564b74271
Author: FWLamb <[email protected]>
AuthorDate: Mon Nov 21 12:29:51 2022 +0800

    [Feature][Connector][influx] Expose configurable options in influx db 
(#3392)
    
    * expose configurable options in InfluxDB
    
    Co-authored-by: yangbinbin <[email protected]>
---
 docs/en/connector-v2/sink/InfluxDB.md              |  33 +++---
 docs/en/connector-v2/source/InfluxDB.md            |   7 +-
 .../seatunnel/influxdb/config/InfluxDBConfig.java  |  84 ++++++++++------
 .../seatunnel/influxdb/config/SinkConfig.java      | 111 ++++++++++++++-------
 .../seatunnel/influxdb/config/SourceConfig.java    |  62 ++++++++----
 .../seatunnel/influxdb/sink/InfluxDBSink.java      |   2 +-
 .../influxdb/sink/InfluxDBSinkFactory.java         |  67 +++++++++++++
 .../seatunnel/influxdb/source/InfluxDBSource.java  |   2 +-
 .../influxdb/source/InfluxDBSourceFactory.java     |  63 ++++++++++++
 .../source/InfluxDBSourceSplitEnumerator.java      |   2 +-
 10 files changed, 334 insertions(+), 99 deletions(-)

diff --git a/docs/en/connector-v2/sink/InfluxDB.md 
b/docs/en/connector-v2/sink/InfluxDB.md
index cff9787f9..f2cb27406 100644
--- a/docs/en/connector-v2/sink/InfluxDB.md
+++ b/docs/en/connector-v2/sink/InfluxDB.md
@@ -13,20 +13,21 @@ Write data to InfluxDB.
 
 ## Options
 
-| name                        | type     | required | default value            
     |
-|-----------------------------|----------|----------|-------------------------------|
-| url                         | string   | yes      | -                        
     |
-| database                    | string   | yes      |                          
     |
-| measurement                 | string   | yes      |                          
     |
-| username                    | string   | no       | -                        
     |
-| password                    | string   | no       | -                        
     |
-| key_time                    | string   | yes      | processing time          
     |
-| key_tags                    | array    | no       | exclude `field` & 
`key_time`  |
-| batch_size                  | int      | no       | 1024                     
     |
-| batch_interval_ms           | int      | no       | -                        
     |
-| max_retries                 | int      | no       | -                        
     |
-| retry_backoff_multiplier_ms | int      | no       | -                        
     |
-| connect_timeout_ms          | long     | no       | 15000                    
     |
+| name                        | type   | required | default value              
  |
+|-----------------------------|--------|----------|------------------------------|
+| url                         | string | yes      | -                          
  |
+| database                    | string | yes      |                            
  |
+| measurement                 | string | yes      |                            
  |
+| username                    | string | no       | -                          
  |
+| password                    | string | no       | -                          
  |
+| key_time                    | string | no       | processing time            
  |
+| key_tags                    | array  | no       | exclude `field` & 
`key_time` |
+| batch_size                  | int    | no       | 1024                       
  |
+| batch_interval_ms           | int    | no       | -                          
  |
+| max_retries                 | int    | no       | -                          
  |
+| retry_backoff_multiplier_ms | int    | no       | -                          
  |
+| connect_timeout_ms          | long   | no       | 15000                      
  |
+| common-options              | config | no       | -                          
  |
 
 ### url
 the url to connect to influxDB e.g.
@@ -82,6 +83,10 @@ The amount of time to wait before attempting to retry a 
request to `influxDB`
 ### connect_timeout_ms [long]
 the timeout for connecting to InfluxDB, in milliseconds 
 
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+
 ## Examples
 ```hocon
 sink {
diff --git a/docs/en/connector-v2/source/InfluxDB.md 
b/docs/en/connector-v2/source/InfluxDB.md
index 368e93731..501b932a5 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -36,6 +36,7 @@ supports query SQL and can achieve projection effect.
 | epoch              | string | no       | n             |
 | connect_timeout_ms | long   | no       | 15000         |
 | query_timeout_sec  | int    | no       | 3             |
+| common-options     | config | no       | -             |
 
 ### url
 the url to connect to influxDB e.g.
@@ -124,7 +125,11 @@ returned time precision
 the `query_timeout` of the InfluxDB when you select, in seconds
 
 ### connect_timeout_ms [long]
-the timeout for connecting to InfluxDB, in milliseconds 
+the timeout for connecting to InfluxDB, in milliseconds
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
 
 ## Examples
 Example of multi parallelism and multi partition scanning 
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
index 1332c5cab..ee309c2d9 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -25,51 +28,74 @@ import lombok.Data;
 import java.io.Serializable;
 
 @Data
+@SuppressWarnings("checkstyle:MagicNumber")
 public class InfluxDBConfig implements Serializable {
 
-    public static final String USERNAME = "username";
-    public static final String PASSWORD = "password";
-    public static final String URL = "url";
-    private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
-    private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";
-    public static final String DATABASES = "database";
-    private static final String DEFAULT_FORMAT = "MSGPACK";
-    protected static final String EPOCH = "epoch";
-    private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
-    private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;
-    private static final String DEFAULT_EPOCH = "n";
+    public static final Option<String> USERNAME = Options.key("username")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server username");
+
+    public static final Option<String> PASSWORD = Options.key("password")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server password");
+
+    public static final Option<String> URL = Options.key("url")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server url");
+
+    public static final Option<Long> CONNECT_TIMEOUT_MS = 
Options.key("connect_timeout_ms")
+        .longType()
+        .defaultValue(15000L)
+        .withDescription("the influxdb client connect timeout ms");
+
+    public static final Option<Integer> QUERY_TIMEOUT_SEC = 
Options.key("query_timeout_sec")
+        .intType()
+        .defaultValue(3)
+        .withDescription("the influxdb client query timeout ms");
 
+    public static final Option<String> DATABASES = Options.key("database")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server database");
+
+    public static final Option<String> EPOCH = Options.key("epoch")
+        .stringType()
+        .defaultValue("n")
+        .withDescription("the influxdb server query epoch");
+
+    private static final String DEFAULT_FORMAT = "MSGPACK";
     private String url;
     private String username;
     private String password;
     private String database;
-
     private String format = DEFAULT_FORMAT;
-    private int queryTimeOut = DEFAULT_QUERY_TIMEOUT_SEC;
-    private long connectTimeOut = DEFAULT_CONNECT_TIMEOUT_MS;
-
-    private String epoch = DEFAULT_EPOCH;
+    private int queryTimeOut = QUERY_TIMEOUT_SEC.defaultValue();
+    private long connectTimeOut = CONNECT_TIMEOUT_MS.defaultValue();
+    private String epoch = EPOCH.defaultValue();
 
     public InfluxDBConfig(Config config) {
-        this.url = config.getString(URL);
+        this.url = config.getString(URL.key());
 
-        if (config.hasPath(USERNAME)) {
-            this.username = config.getString(USERNAME);
+        if (config.hasPath(USERNAME.key())) {
+            this.username = config.getString(USERNAME.key());
         }
-        if (config.hasPath(PASSWORD)) {
-            this.password = config.getString(PASSWORD);
+        if (config.hasPath(PASSWORD.key())) {
+            this.password = config.getString(PASSWORD.key());
         }
-        if (config.hasPath(DATABASES)) {
-            this.database = config.getString(DATABASES);
+        if (config.hasPath(DATABASES.key())) {
+            this.database = config.getString(DATABASES.key());
         }
-        if (config.hasPath(EPOCH)) {
-            this.epoch = config.getString(EPOCH);
+        if (config.hasPath(EPOCH.key())) {
+            this.epoch = config.getString(EPOCH.key());
         }
-        if (config.hasPath(CONNECT_TIMEOUT_MS)) {
-            this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS);
+        if (config.hasPath(CONNECT_TIMEOUT_MS.key())) {
+            this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS.key());
         }
-        if (config.hasPath(QUERY_TIMEOUT_SEC)) {
-            this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC);
+        if (config.hasPath(QUERY_TIMEOUT_SEC.key())) {
+            this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC.key());
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
index c97d807fa..84ac2f58c 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -28,32 +31,70 @@ import java.util.List;
 @Setter
 @Getter
 @ToString
-public class SinkConfig extends InfluxDBConfig{
+@SuppressWarnings("checkstyle:MagicNumber")
+public class SinkConfig extends InfluxDBConfig {
     public SinkConfig(Config config) {
         super(config);
     }
 
-    private static final String KEY_TIME = "key_time";
-    private static final String KEY_TAGS = "key_tags";
-    public static final String KEY_MEASUREMENT = "measurement";
-
-    private static final String BATCH_SIZE = "batch_size";
-    private static final String BATCH_INTERVAL_MS = "batch_interval_ms";
-    private static final String MAX_RETRIES = "max_retries";
-    private static final String WRITE_TIMEOUT = "write_timeout";
-    private static final String RETRY_BACKOFF_MULTIPLIER_MS = 
"retry_backoff_multiplier_ms";
-    private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
-    private static final String RETENTION_POLICY = "rp";
-    private static final int DEFAULT_BATCH_SIZE = 1024;
-    private static final int DEFAULT_WRITE_TIMEOUT = 5;
+    public static final Option<String> KEY_TIME = Options.key("key_time")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server key time");
+
+    public static final Option<List<String>> KEY_TAGS = Options.key("key_tags")
+        .listType()
+        .noDefaultValue()
+        .withDescription("the influxdb server key tags");
+
+    public static final Option<String> KEY_MEASUREMENT = 
Options.key("measurement")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server measurement");
+
+    public static final Option<Integer> BATCH_SIZE = Options.key("batch_size")
+        .intType()
+        .defaultValue(1024)
+        .withDescription("batch size of the influxdb client");
+
+    public static final Option<Integer> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms")
+        .intType()
+        .noDefaultValue()
+        .withDescription("batch interval ms of the influxdb client");
+
+    public static final Option<Integer> MAX_RETRIES = 
Options.key("max_retries")
+        .intType()
+        .noDefaultValue()
+        .withDescription("max retries of the influxdb client");
+
+    public static final Option<Integer> WRITE_TIMEOUT = 
Options.key("write_timeout")
+        .intType()
+        .defaultValue(5)
+        .withDescription("the influxdb client write data timeout");
+
+    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS = 
Options.key("retry_backoff_multiplier_ms")
+        .intType()
+        .noDefaultValue()
+        .withDescription("the influxdb client retry backoff multiplier ms");
+
+    public static final Option<Integer> MAX_RETRY_BACKOFF_MS = 
Options.key("max_retry_backoff_ms")
+        .intType()
+        .noDefaultValue()
+        .withDescription("the influxdb client max retry backoff ms");
+
+    public static final Option<String> RETENTION_POLICY = Options.key("rp")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb client retention policy");
+
     private static final TimePrecision DEFAULT_TIME_PRECISION = 
TimePrecision.NS;
 
     private String rp;
     private String measurement;
-    private int writeTimeout = DEFAULT_WRITE_TIMEOUT;
+    private int writeTimeout = WRITE_TIMEOUT.defaultValue();
     private String keyTime;
     private List<String> keyTags;
-    private int batchSize = DEFAULT_BATCH_SIZE;
+    private int batchSize = BATCH_SIZE.defaultValue();
     private Integer batchIntervalMs;
     private int maxRetries;
     private int retryBackoffMultiplierMs;
@@ -63,34 +104,34 @@ public class SinkConfig extends InfluxDBConfig{
     public static SinkConfig loadConfig(Config config) {
         SinkConfig sinkConfig = new SinkConfig(config);
 
-        if (config.hasPath(KEY_TIME)) {
-            sinkConfig.setKeyTime(config.getString(KEY_TIME));
+        if (config.hasPath(KEY_TIME.key())) {
+            sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
         }
-        if (config.hasPath(KEY_TAGS)) {
-            sinkConfig.setKeyTags(config.getStringList(KEY_TAGS));
+        if (config.hasPath(KEY_TAGS.key())) {
+            sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
         }
-        if (config.hasPath(BATCH_INTERVAL_MS)) {
-            sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS));
+        if (config.hasPath(BATCH_INTERVAL_MS.key())) {
+            
sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS.key()));
         }
-        if (config.hasPath(MAX_RETRIES)) {
-            sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES));
+        if (config.hasPath(MAX_RETRIES.key())) {
+            sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
         }
-        if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
-            
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+        if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
+            
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
         }
-        if (config.hasPath(MAX_RETRY_BACKOFF_MS)) {
-            
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS));
+        if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
+            
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
         }
-        if (config.hasPath(WRITE_TIMEOUT)) {
-            sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT));
+        if (config.hasPath(WRITE_TIMEOUT.key())) {
+            sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
         }
-        if (config.hasPath(RETENTION_POLICY)) {
-            sinkConfig.setRp(config.getString(RETENTION_POLICY));
+        if (config.hasPath(RETENTION_POLICY.key())) {
+            sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
         }
-        if (config.hasPath(EPOCH)) {
-            
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH)));
+        if (config.hasPath(EPOCH.key())) {
+            
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
         }
-        sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT));
+        sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
         return sinkConfig;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
index d0c3fe65e..0cb123524 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -24,14 +27,39 @@ import lombok.Getter;
 import java.util.List;
 
 @Getter
-public class SourceConfig extends InfluxDBConfig{
-    public static final String SQL = "sql";
-    public static final String SQL_WHERE = "where";
-    public static final String SPLIT_COLUMN = "split_column";
-    private static final String PARTITION_NUM = "partition_num";
-    private static final String UPPER_BOUND = "upper_bound";
-    private static final String LOWER_BOUND = "lower_bound";
-    public static final String DEFAULT_PARTITIONS = "0";
+public class SourceConfig extends InfluxDBConfig {
+
+    public static final Option<String> SQL = Options.key("sql")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server query sql");
+
+    public static final Option<String> SQL_WHERE = Options.key("where")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server query sql where condition");
+
+    public static final Option<String> SPLIT_COLUMN = 
Options.key("split_column")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb column which is used as split key");
+
+    public static final Option<String> PARTITION_NUM = 
Options.key("partition_num")
+        .stringType()
+        .defaultValue("0")
+        .withDescription("the influxdb server partition num");
+
+    public static final Option<String> UPPER_BOUND = Options.key("upper_bound")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server upper bound");
+
+    public static final Option<String> LOWER_BOUND = Options.key("lower_bound")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the influxdb server lower bound");
+
+    public static final String DEFAULT_PARTITIONS = 
PARTITION_NUM.defaultValue();
     private String sql;
     private int partitionNum = 0;
     private String splitKey;
@@ -47,19 +75,19 @@ public class SourceConfig extends InfluxDBConfig{
     public static SourceConfig loadConfig(Config config) {
         SourceConfig sourceConfig = new SourceConfig(config);
 
-        sourceConfig.sql = config.getString(SQL);
+        sourceConfig.sql = config.getString(SQL.key());
 
-        if (config.hasPath(PARTITION_NUM)) {
-            sourceConfig.partitionNum = config.getInt(PARTITION_NUM);
+        if (config.hasPath(PARTITION_NUM.key())) {
+            sourceConfig.partitionNum = config.getInt(PARTITION_NUM.key());
         }
-        if (config.hasPath(UPPER_BOUND)) {
-            sourceConfig.upperBound = config.getInt(UPPER_BOUND);
+        if (config.hasPath(UPPER_BOUND.key())) {
+            sourceConfig.upperBound = config.getInt(UPPER_BOUND.key());
         }
-        if (config.hasPath(LOWER_BOUND)) {
-            sourceConfig.lowerBound = config.getInt(LOWER_BOUND);
+        if (config.hasPath(LOWER_BOUND.key())) {
+            sourceConfig.lowerBound = config.getInt(LOWER_BOUND.key());
         }
-        if (config.hasPath(SPLIT_COLUMN)) {
-            sourceConfig.splitKey = config.getString(SPLIT_COLUMN);
+        if (config.hasPath(SPLIT_COLUMN.key())) {
+            sourceConfig.splitKey = config.getString(SPLIT_COLUMN.key());
         }
         return sourceConfig;
     }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 8d3eb7290..074e5a518 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -51,7 +51,7 @@ public class InfluxDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, URL, 
KEY_MEASUREMENT);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URL.key(), 
KEY_MEASUREMENT.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
new file mode 100644
index 000000000..01d53841e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -0,0 +1,67 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_INTERVAL_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TAGS;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TIME;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.MAX_RETRIES;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class InfluxDBSinkFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "InfluxDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(
+                URL,
+                DATABASES,
+                KEY_MEASUREMENT
+            )
+            .bundled(USERNAME, PASSWORD)
+            .optional(
+                CONNECT_TIMEOUT_MS,
+                KEY_TAGS,
+                KEY_TIME,
+                BATCH_SIZE,
+                BATCH_INTERVAL_MS,
+                MAX_RETRIES,
+                RETRY_BACKOFF_MULTIPLIER_MS
+            )
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 804e804f5..26230f0ae 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -65,7 +65,7 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, SQL);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
new file mode 100644
index 000000000..c5cb3ccd9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.EPOCH;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_TIMEOUT_SEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.LOWER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.PARTITION_NUM;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SPLIT_COLUMN;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.UPPER_BOUND;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class InfluxDBSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "InfluxDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(
+                URL,
+                SQL,
+                DATABASES
+            )
+            .bundled(USERNAME, PASSWORD)
+            .bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
+            .optional(
+                EPOCH,
+                CONNECT_TIMEOUT_MS,
+                QUERY_TIMEOUT_SEC
+            )
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index 139a6e3ad..aad78e6fc 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -119,7 +119,7 @@ public class InfluxDBSourceSplitEnumerator implements 
SourceSplitEnumerator<Infl
         //calculate numRange base on (lowerBound upperBound partitionNum)
         List<Pair<Long, Long>> rangePairs = 
genSplitNumRange(config.getLowerBound(), config.getUpperBound(), 
config.getPartitionNum());
 
-        String[] sqls = sql.split(SQL_WHERE);
+        String[] sqls = sql.split(SQL_WHERE.key());
         if (sqls.length > 2) {
             throw new IllegalArgumentException("sql should not contain more 
than one where");
         }

Reply via email to