This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9f498b8133 [Improve] influxdb options (#8966)
9f498b8133 is described below

commit 9f498b81333d2bf82be088be5b1306afed39b11a
Author: Jarvis <[email protected]>
AuthorDate: Tue Mar 18 13:26:54 2025 +0800

    [Improve] influxdb options (#8966)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   2 -
 ...luxDBConfig.java => InfluxDBCommonOptions.java} |  48 +---------
 .../seatunnel/influxdb/config/InfluxDBConfig.java  |  83 +++-------------
 .../{SinkConfig.java => InfluxDBSinkOptions.java}  |  57 +----------
 ...ourceConfig.java => InfluxDBSourceOptions.java} |  56 ++---------
 .../seatunnel/influxdb/config/SinkConfig.java      | 105 ++++-----------------
 .../seatunnel/influxdb/config/SourceConfig.java    |  67 +++----------
 .../influxdb/sink/InfluxDBSinkFactory.java         |  44 ++++-----
 .../seatunnel/influxdb/source/InfluxDBSource.java  |  69 ++++----------
 .../influxdb/source/InfluxDBSourceFactory.java     |  49 ++++++----
 .../source/InfluxDBSourceSplitEnumerator.java      |   8 +-
 11 files changed, 130 insertions(+), 458 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index de24681ac5..243e06922e 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,8 +195,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("MilvusSourceOptions");
         whiteList.add("RocketMqSinkOptions");
         whiteList.add("MaxcomputeSourceOptions");
-        whiteList.add("InfluxDBSourceOptions");
-        whiteList.add("InfluxDBSinkOptions");
         whiteList.add("KuduSourceOptions");
         whiteList.add("SocketSinkOptions");
         whiteList.add("SelectDBSinkOptions");
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBCommonOptions.java
similarity index 62%
copy from 
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
copy to 
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBCommonOptions.java
index eb05f53671..362ff09681 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBCommonOptions.java
@@ -17,18 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
-import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class InfluxDBConfig implements Serializable {
+public class InfluxDBCommonOptions {
 
     public static final Option<String> USERNAME =
             Options.key("username")
@@ -71,42 +63,4 @@ public class InfluxDBConfig implements Serializable {
                     .stringType()
                     .defaultValue("n")
                     .withDescription("the influxdb server query epoch");
-
-    private static final String DEFAULT_FORMAT = "MSGPACK";
-    private String url;
-    private String username;
-    private String password;
-    private String database;
-    private String format = DEFAULT_FORMAT;
-    private int queryTimeOut = QUERY_TIMEOUT_SEC.defaultValue();
-    private long connectTimeOut = CONNECT_TIMEOUT_MS.defaultValue();
-    private String epoch = EPOCH.defaultValue();
-
-    public InfluxDBConfig(Config config) {
-        this.url = config.getString(URL.key());
-
-        if (config.hasPath(USERNAME.key())) {
-            this.username = config.getString(USERNAME.key());
-        }
-        if (config.hasPath(PASSWORD.key())) {
-            this.password = config.getString(PASSWORD.key());
-        }
-        if (config.hasPath(DATABASES.key())) {
-            this.database = config.getString(DATABASES.key());
-        }
-        if (config.hasPath(EPOCH.key())) {
-            this.epoch = config.getString(EPOCH.key());
-        }
-        if (config.hasPath(CONNECT_TIMEOUT_MS.key())) {
-            this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS.key());
-        }
-        if (config.hasPath(QUERY_TIMEOUT_SEC.key())) {
-            this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC.key());
-        }
-    }
-
-    @VisibleForTesting
-    public InfluxDBConfig(String url) {
-        this.url = url;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
index eb05f53671..c7d1ddc5f5 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -18,10 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
 import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Data;
 
@@ -30,79 +28,24 @@ import java.io.Serializable;
 @Data
 public class InfluxDBConfig implements Serializable {
 
-    public static final Option<String> USERNAME =
-            Options.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server username");
-
-    public static final Option<String> PASSWORD =
-            Options.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server password");
-
-    public static final Option<String> URL =
-            Options.key("url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server url");
-
-    public static final Option<Long> CONNECT_TIMEOUT_MS =
-            Options.key("connect_timeout_ms")
-                    .longType()
-                    .defaultValue(15000L)
-                    .withDescription("the influxdb client connect timeout ms");
-
-    public static final Option<Integer> QUERY_TIMEOUT_SEC =
-            Options.key("query_timeout_sec")
-                    .intType()
-                    .defaultValue(3)
-                    .withDescription("the influxdb client query timeout ms");
-
-    public static final Option<String> DATABASES =
-            Options.key("database")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server database");
-
-    public static final Option<String> EPOCH =
-            Options.key("epoch")
-                    .stringType()
-                    .defaultValue("n")
-                    .withDescription("the influxdb server query epoch");
-
     private static final String DEFAULT_FORMAT = "MSGPACK";
     private String url;
     private String username;
     private String password;
     private String database;
     private String format = DEFAULT_FORMAT;
-    private int queryTimeOut = QUERY_TIMEOUT_SEC.defaultValue();
-    private long connectTimeOut = CONNECT_TIMEOUT_MS.defaultValue();
-    private String epoch = EPOCH.defaultValue();
-
-    public InfluxDBConfig(Config config) {
-        this.url = config.getString(URL.key());
-
-        if (config.hasPath(USERNAME.key())) {
-            this.username = config.getString(USERNAME.key());
-        }
-        if (config.hasPath(PASSWORD.key())) {
-            this.password = config.getString(PASSWORD.key());
-        }
-        if (config.hasPath(DATABASES.key())) {
-            this.database = config.getString(DATABASES.key());
-        }
-        if (config.hasPath(EPOCH.key())) {
-            this.epoch = config.getString(EPOCH.key());
-        }
-        if (config.hasPath(CONNECT_TIMEOUT_MS.key())) {
-            this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS.key());
-        }
-        if (config.hasPath(QUERY_TIMEOUT_SEC.key())) {
-            this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC.key());
-        }
+    private int queryTimeOut;
+    private long connectTimeOut;
+    private String epoch;
+
+    public InfluxDBConfig(ReadonlyConfig config) {
+        this.url = config.get(InfluxDBCommonOptions.URL);
+        this.username = config.get(InfluxDBCommonOptions.USERNAME);
+        this.password = config.get(InfluxDBCommonOptions.PASSWORD);
+        this.database = config.get(InfluxDBCommonOptions.DATABASES);
+        this.epoch = config.get(InfluxDBCommonOptions.EPOCH);
+        this.connectTimeOut = 
config.get(InfluxDBCommonOptions.CONNECT_TIMEOUT_MS);
+        this.queryTimeOut = 
config.get(InfluxDBCommonOptions.QUERY_TIMEOUT_SEC);
     }
 
     @VisibleForTesting
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSinkOptions.java
similarity index 62%
copy from 
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
copy to 
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSinkOptions.java
index 071e3c235f..34bb61160a 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSinkOptions.java
@@ -17,25 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
 import java.util.List;
 
-@Setter
-@Getter
-@ToString
-public class SinkConfig extends InfluxDBConfig {
-    public SinkConfig(Config config) {
-        super(config);
-        loadConfig(config);
-    }
+public class InfluxDBSinkOptions extends InfluxDBCommonOptions {
 
     public static final Option<String> KEY_TIME =
             Options.key("key_time")
@@ -90,46 +77,4 @@ public class SinkConfig extends InfluxDBConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the influxdb client retention policy");
-
-    private static final TimePrecision DEFAULT_TIME_PRECISION = 
TimePrecision.NS;
-
-    private String rp;
-    private String measurement;
-    private int writeTimeout = WRITE_TIMEOUT.defaultValue();
-    private String keyTime;
-    private List<String> keyTags;
-    private int batchSize = BATCH_SIZE.defaultValue();
-    private int maxRetries;
-    private int retryBackoffMultiplierMs;
-    private int maxRetryBackoffMs;
-    private TimePrecision precision = DEFAULT_TIME_PRECISION;
-
-    public void loadConfig(Config config) {
-
-        if (config.hasPath(KEY_TIME.key())) {
-            setKeyTime(config.getString(KEY_TIME.key()));
-        }
-        if (config.hasPath(KEY_TAGS.key())) {
-            setKeyTags(config.getStringList(KEY_TAGS.key()));
-        }
-        if (config.hasPath(MAX_RETRIES.key())) {
-            setMaxRetries(config.getInt(MAX_RETRIES.key()));
-        }
-        if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-            
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
-        }
-        if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-            setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
-        }
-        if (config.hasPath(WRITE_TIMEOUT.key())) {
-            setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
-        }
-        if (config.hasPath(RETENTION_POLICY.key())) {
-            setRp(config.getString(RETENTION_POLICY.key()));
-        }
-        if (config.hasPath(EPOCH.key())) {
-            
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
-        }
-        setMeasurement(config.getString(KEY_MEASUREMENT.key()));
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSourceOptions.java
similarity index 57%
copy from 
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
copy to 
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSourceOptions.java
index 50ca08ec55..bdca8df284 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBSourceOptions.java
@@ -17,17 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import lombok.Getter;
-
-import java.util.List;
-
-@Getter
-public class SourceConfig extends InfluxDBConfig {
+public class InfluxDBSourceOptions extends InfluxDBCommonOptions {
 
     public static final Option<String> SQL =
             Options.key("sql")
@@ -47,54 +40,21 @@ public class SourceConfig extends InfluxDBConfig {
                     .noDefaultValue()
                     .withDescription("the influxdb column which is used as 
split key");
 
-    public static final Option<String> PARTITION_NUM =
+    public static final Option<Integer> PARTITION_NUM =
             Options.key("partition_num")
-                    .stringType()
-                    .defaultValue("0")
+                    .intType()
+                    .defaultValue(0)
                     .withDescription("the influxdb server partition num");
 
-    public static final Option<String> UPPER_BOUND =
+    public static final Option<Integer> UPPER_BOUND =
             Options.key("upper_bound")
-                    .stringType()
+                    .intType()
                     .noDefaultValue()
                     .withDescription("the influxdb server upper bound");
 
-    public static final Option<String> LOWER_BOUND =
+    public static final Option<Integer> LOWER_BOUND =
             Options.key("lower_bound")
-                    .stringType()
+                    .intType()
                     .noDefaultValue()
                     .withDescription("the influxdb server lower bound");
-
-    public static final String DEFAULT_PARTITIONS = 
PARTITION_NUM.defaultValue();
-    private String sql;
-    private int partitionNum = 0;
-    private String splitKey;
-    private long lowerBound;
-    private long upperBound;
-
-    List<Integer> columnsIndex;
-
-    public SourceConfig(Config config) {
-        super(config);
-    }
-
-    public static SourceConfig loadConfig(Config config) {
-        SourceConfig sourceConfig = new SourceConfig(config);
-
-        sourceConfig.sql = config.getString(SQL.key());
-
-        if (config.hasPath(PARTITION_NUM.key())) {
-            sourceConfig.partitionNum = config.getInt(PARTITION_NUM.key());
-        }
-        if (config.hasPath(UPPER_BOUND.key())) {
-            sourceConfig.upperBound = config.getInt(UPPER_BOUND.key());
-        }
-        if (config.hasPath(LOWER_BOUND.key())) {
-            sourceConfig.lowerBound = config.getInt(LOWER_BOUND.key());
-        }
-        if (config.hasPath(SPLIT_COLUMN.key())) {
-            sourceConfig.splitKey = config.getString(SPLIT_COLUMN.key());
-        }
-        return sourceConfig;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
index 071e3c235f..b1eaabfe2a 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -32,104 +29,42 @@ import java.util.List;
 @Getter
 @ToString
 public class SinkConfig extends InfluxDBConfig {
-    public SinkConfig(Config config) {
+
+    public SinkConfig(ReadonlyConfig config) {
         super(config);
         loadConfig(config);
     }
 
-    public static final Option<String> KEY_TIME =
-            Options.key("key_time")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server key time");
-
-    public static final Option<List<String>> KEY_TAGS =
-            Options.key("key_tags")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server key tags");
-
-    public static final Option<String> KEY_MEASUREMENT =
-            Options.key("measurement")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server measurement");
-
-    public static final Option<Integer> BATCH_SIZE =
-            Options.key("batch_size")
-                    .intType()
-                    .defaultValue(1024)
-                    .withDescription("batch size of the influxdb client");
-
-    public static final Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("max retries of the influxdb client");
-
-    public static final Option<Integer> WRITE_TIMEOUT =
-            Options.key("write_timeout")
-                    .intType()
-                    .defaultValue(5)
-                    .withDescription("the influxdb client write data timeout");
-
-    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
-            Options.key("retry_backoff_multiplier_ms")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb client retry backoff 
multiplier ms");
-
-    public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
-            Options.key("max_retry_backoff_ms")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb client max retry backoff 
ms");
-
-    public static final Option<String> RETENTION_POLICY =
-            Options.key("rp")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb client retention policy");
-
     private static final TimePrecision DEFAULT_TIME_PRECISION = 
TimePrecision.NS;
 
     private String rp;
     private String measurement;
-    private int writeTimeout = WRITE_TIMEOUT.defaultValue();
+    private int writeTimeout;
     private String keyTime;
     private List<String> keyTags;
-    private int batchSize = BATCH_SIZE.defaultValue();
+    private int batchSize;
     private int maxRetries;
     private int retryBackoffMultiplierMs;
     private int maxRetryBackoffMs;
     private TimePrecision precision = DEFAULT_TIME_PRECISION;
 
-    public void loadConfig(Config config) {
-
-        if (config.hasPath(KEY_TIME.key())) {
-            setKeyTime(config.getString(KEY_TIME.key()));
-        }
-        if (config.hasPath(KEY_TAGS.key())) {
-            setKeyTags(config.getStringList(KEY_TAGS.key()));
-        }
-        if (config.hasPath(MAX_RETRIES.key())) {
-            setMaxRetries(config.getInt(MAX_RETRIES.key()));
-        }
-        if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-            
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
-        }
-        if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-            setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
-        }
-        if (config.hasPath(WRITE_TIMEOUT.key())) {
-            setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
+    public void loadConfig(ReadonlyConfig config) {
+        setKeyTime(config.get(InfluxDBSinkOptions.KEY_TIME));
+        setKeyTags(config.get(InfluxDBSinkOptions.KEY_TAGS));
+        setBatchSize(config.get(InfluxDBSinkOptions.BATCH_SIZE));
+        if (config.getOptional(InfluxDBSinkOptions.MAX_RETRIES).isPresent()) {
+            setMaxRetries(config.get(InfluxDBSinkOptions.MAX_RETRIES));
         }
-        if (config.hasPath(RETENTION_POLICY.key())) {
-            setRp(config.getString(RETENTION_POLICY.key()));
+        if 
(config.getOptional(InfluxDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS).isPresent())
 {
+            setRetryBackoffMultiplierMs(
+                    
config.get(InfluxDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS));
         }
-        if (config.hasPath(EPOCH.key())) {
-            
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
+        if 
(config.getOptional(InfluxDBSinkOptions.MAX_RETRY_BACKOFF_MS).isPresent()) {
+            
setMaxRetryBackoffMs(config.get(InfluxDBSinkOptions.MAX_RETRY_BACKOFF_MS));
         }
-        setMeasurement(config.getString(KEY_MEASUREMENT.key()));
+        setWriteTimeout(config.get(InfluxDBSinkOptions.WRITE_TIMEOUT));
+        setRp(config.get(InfluxDBSinkOptions.RETENTION_POLICY));
+        
setPrecision(TimePrecision.getPrecision(config.get(InfluxDBSinkOptions.EPOCH)));
+        setMeasurement(config.get(InfluxDBSinkOptions.KEY_MEASUREMENT));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
index 50ca08ec55..67cd62052c 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
 
@@ -29,43 +26,7 @@ import java.util.List;
 @Getter
 public class SourceConfig extends InfluxDBConfig {
 
-    public static final Option<String> SQL =
-            Options.key("sql")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server query sql");
-
-    public static final Option<String> SQL_WHERE =
-            Options.key("where")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server query sql where 
condition");
-
-    public static final Option<String> SPLIT_COLUMN =
-            Options.key("split_column")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb column which is used as 
split key");
-
-    public static final Option<String> PARTITION_NUM =
-            Options.key("partition_num")
-                    .stringType()
-                    .defaultValue("0")
-                    .withDescription("the influxdb server partition num");
-
-    public static final Option<String> UPPER_BOUND =
-            Options.key("upper_bound")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server upper bound");
-
-    public static final Option<String> LOWER_BOUND =
-            Options.key("lower_bound")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the influxdb server lower bound");
-
-    public static final String DEFAULT_PARTITIONS = 
PARTITION_NUM.defaultValue();
+    public static final int DEFAULT_PARTITIONS = 
InfluxDBSourceOptions.PARTITION_NUM.defaultValue();
     private String sql;
     private int partitionNum = 0;
     private String splitKey;
@@ -74,26 +35,22 @@ public class SourceConfig extends InfluxDBConfig {
 
     List<Integer> columnsIndex;
 
-    public SourceConfig(Config config) {
+    public SourceConfig(ReadonlyConfig config) {
         super(config);
     }
 
-    public static SourceConfig loadConfig(Config config) {
+    public static SourceConfig loadConfig(ReadonlyConfig config) {
         SourceConfig sourceConfig = new SourceConfig(config);
-
-        sourceConfig.sql = config.getString(SQL.key());
-
-        if (config.hasPath(PARTITION_NUM.key())) {
-            sourceConfig.partitionNum = config.getInt(PARTITION_NUM.key());
-        }
-        if (config.hasPath(UPPER_BOUND.key())) {
-            sourceConfig.upperBound = config.getInt(UPPER_BOUND.key());
+        sourceConfig.sql = config.get(InfluxDBSourceOptions.SQL);
+        sourceConfig.partitionNum = 
config.get(InfluxDBSourceOptions.PARTITION_NUM);
+        if (config.getOptional(InfluxDBSourceOptions.UPPER_BOUND).isPresent()) 
{
+            sourceConfig.upperBound = 
config.get(InfluxDBSourceOptions.UPPER_BOUND);
         }
-        if (config.hasPath(LOWER_BOUND.key())) {
-            sourceConfig.lowerBound = config.getInt(LOWER_BOUND.key());
+        if (config.getOptional(InfluxDBSourceOptions.LOWER_BOUND).isPresent()) 
{
+            sourceConfig.lowerBound = 
config.get(InfluxDBSourceOptions.LOWER_BOUND);
         }
-        if (config.hasPath(SPLIT_COLUMN.key())) {
-            sourceConfig.splitKey = config.getString(SPLIT_COLUMN.key());
+        if 
(config.getOptional(InfluxDBSourceOptions.SPLIT_COLUMN).isPresent()) {
+            sourceConfig.splitKey = 
config.get(InfluxDBSourceOptions.SPLIT_COLUMN);
         }
         return sourceConfig;
     }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
index 6206d7b8d5..ad2b726155 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
 
 import com.google.auto.service.AutoService;
@@ -33,18 +34,6 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TAGS;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TIME;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.MAX_RETRIES;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
-
 @AutoService(Factory.class)
 @Slf4j
 public class InfluxDBSinkFactory implements TableSinkFactory {
@@ -57,16 +46,21 @@ public class InfluxDBSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URL, DATABASES)
-                .bundled(USERNAME, PASSWORD)
+                .required(InfluxDBSinkOptions.URL, 
InfluxDBSinkOptions.DATABASES)
+                .bundled(InfluxDBSinkOptions.USERNAME, 
InfluxDBSinkOptions.PASSWORD)
                 .optional(
-                        CONNECT_TIMEOUT_MS,
-                        KEY_MEASUREMENT,
-                        KEY_TAGS,
-                        KEY_TIME,
-                        BATCH_SIZE,
-                        MAX_RETRIES,
-                        RETRY_BACKOFF_MULTIPLIER_MS,
+                        InfluxDBSinkOptions.CONNECT_TIMEOUT_MS,
+                        InfluxDBSinkOptions.KEY_MEASUREMENT,
+                        InfluxDBSinkOptions.KEY_TAGS,
+                        InfluxDBSinkOptions.KEY_TIME,
+                        InfluxDBSinkOptions.BATCH_SIZE,
+                        InfluxDBSinkOptions.MAX_RETRIES,
+                        InfluxDBSinkOptions.WRITE_TIMEOUT,
+                        InfluxDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
+                        InfluxDBSinkOptions.MAX_RETRY_BACKOFF_MS,
+                        InfluxDBSinkOptions.RETENTION_POLICY,
+                        InfluxDBSinkOptions.QUERY_TIMEOUT_SEC,
+                        InfluxDBSinkOptions.EPOCH,
                         SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
@@ -75,12 +69,14 @@ public class InfluxDBSinkFactory implements 
TableSinkFactory {
     public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig config = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
-        if (!config.getOptional(KEY_MEASUREMENT).isPresent()) {
+        if 
(!config.getOptional(InfluxDBSinkOptions.KEY_MEASUREMENT).isPresent()) {
             Map<String, String> map = config.toMap();
-            map.put(KEY_MEASUREMENT.key(), 
catalogTable.getTableId().toTablePath().getFullName());
+            map.put(
+                    InfluxDBSinkOptions.KEY_MEASUREMENT.key(),
+                    catalogTable.getTableId().toTablePath().getFullName());
             config = ReadonlyConfig.fromMap(new HashMap<>(map));
         }
-        SinkConfig sinkConfig = new SinkConfig(config.toConfig());
+        SinkConfig sinkConfig = new SinkConfig(config);
         return () -> new InfluxDBSink(sinkConfig, catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 23bc69a440..282880482e 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -17,25 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
@@ -46,59 +35,35 @@ import org.influxdb.InfluxDB;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
-
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class InfluxDBSource
         implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, 
InfluxDBSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
-    private SeaTunnelRowType typeInfo;
-    private SourceConfig sourceConfig;
 
-    private List<Integer> columnsIndexList;
+    private final CatalogTable catalogTable;
+    private final SourceConfig sourceConfig;
 
     private static final String QUERY_LIMIT = " limit 1";
 
-    @Override
-    public String getPluginName() {
-        return "InfluxDB";
+    public InfluxDBSource(CatalogTable catalogTable, SourceConfig 
sourceConfig) {
+        this.catalogTable = catalogTable;
+        this.sourceConfig = sourceConfig;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        config, SQL.key(), 
ConnectorCommonOptions.SCHEMA.key());
-        if (!result.isSuccess()) {
-            throw new InfluxdbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        try {
-            this.sourceConfig = SourceConfig.loadConfig(config);
-            this.typeInfo = 
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
-            this.columnsIndexList = 
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
-        } catch (Exception e) {
-            throw new InfluxdbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
ExceptionUtils.getMessage(e)));
-        }
+    public String getPluginName() {
+        return "InfluxDB";
     }
 
     @Override
@@ -106,14 +71,11 @@ public class InfluxDBSource
         return Boundedness.BOUNDED;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return typeInfo;
-    }
-
     @Override
     public SourceReader createReader(SourceReader.Context readerContext) 
throws Exception {
-        return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo, 
columnsIndexList);
+        List<Integer> columnsIndexList = 
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
+        return new InfluxdbSourceReader(
+                sourceConfig, readerContext, 
catalogTable.getSeaTunnelRowType(), columnsIndexList);
     }
 
     @Override
@@ -130,6 +92,11 @@ public class InfluxDBSource
         return new InfluxDBSourceSplitEnumerator(enumeratorContext, 
checkpointState, sourceConfig);
     }
 
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
+    }
+
     private List<Integer> initColumnsIndex(InfluxDB influxdb) {
         // query one row to get column info
         String sql = sourceConfig.getSql();
@@ -148,7 +115,7 @@ public class InfluxDBSource
             List<QueryResult.Series> serieList = 
queryResult.getResults().get(0).getSeries();
             List<String> fieldNames = new 
ArrayList<>(serieList.get(0).getColumns());
 
-            return Arrays.stream(typeInfo.getFieldNames())
+            return 
Arrays.stream(catalogTable.getSeaTunnelRowType().getFieldNames())
                     .map(fieldNames::indexOf)
                     .collect(Collectors.toList());
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
index 0c03aa3223..9836c4ef48 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceFactory.java
@@ -20,23 +20,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.EPOCH;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_TIMEOUT_SEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.LOWER_BOUND;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.PARTITION_NUM;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SPLIT_COLUMN;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.UPPER_BOUND;
+import java.io.Serializable;
 
 @AutoService(Factory.class)
 public class InfluxDBSourceFactory implements TableSourceFactory {
@@ -48,13 +43,35 @@ public class InfluxDBSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URL, SQL, DATABASES, ConnectorCommonOptions.SCHEMA)
-                .bundled(USERNAME, PASSWORD)
-                .bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
-                .optional(EPOCH, CONNECT_TIMEOUT_MS, QUERY_TIMEOUT_SEC)
+                .required(
+                        InfluxDBSourceOptions.URL,
+                        InfluxDBSourceOptions.SQL,
+                        InfluxDBSourceOptions.DATABASES,
+                        ConnectorCommonOptions.SCHEMA)
+                .bundled(InfluxDBSourceOptions.USERNAME, 
InfluxDBSourceOptions.PASSWORD)
+                .bundled(
+                        InfluxDBSourceOptions.LOWER_BOUND,
+                        InfluxDBSourceOptions.UPPER_BOUND,
+                        InfluxDBSourceOptions.PARTITION_NUM,
+                        InfluxDBSourceOptions.SPLIT_COLUMN)
+                .optional(
+                        InfluxDBSourceOptions.EPOCH,
+                        InfluxDBSourceOptions.SQL_WHERE,
+                        InfluxDBSourceOptions.CONNECT_TIMEOUT_MS,
+                        InfluxDBSourceOptions.QUERY_TIMEOUT_SEC)
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new InfluxDBSource(
+                                
CatalogTableUtil.buildWithConfig(context.getOptions()),
+                                SourceConfig.loadConfig(context.getOptions()));
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return InfluxDBSource.class;
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index 690ea29181..98acc30e9f 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
@@ -36,8 +37,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE;
-
 @Slf4j
 public class InfluxDBSourceSplitEnumerator
         implements SourceSplitEnumerator<InfluxDBSourceSplit, 
InfluxDBSourceState> {
@@ -119,7 +118,8 @@ public class InfluxDBSourceSplitEnumerator
         Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
         // no need numPartitions, use one partition
         if (config.getPartitionNum() == 0) {
-            influxDBSourceSplits.add(new 
InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql));
+            influxDBSourceSplits.add(
+                    new 
InfluxDBSourceSplit(String.valueOf(SourceConfig.DEFAULT_PARTITIONS), sql));
             return influxDBSourceSplits;
         }
         // calculate numRange base on (lowerBound upperBound partitionNum)
@@ -127,7 +127,7 @@ public class InfluxDBSourceSplitEnumerator
                 genSplitNumRange(
                         config.getLowerBound(), config.getUpperBound(), 
config.getPartitionNum());
 
-        String[] sqls = sql.split(SQL_WHERE.key());
+        String[] sqls = sql.split(InfluxDBSourceOptions.SQL_WHERE.key());
         if (sqls.length > 2) {
             throw new InfluxdbConnectorException(
                     CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,


Reply via email to