This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 630e88479 [Feature][Connector-V2] influxdb sink connector (#3174)
630e88479 is described below

commit 630e88479138e54224227e2fa974835a23b4bb31
Author: Bibo <[email protected]>
AuthorDate: Tue Nov 8 18:53:18 2022 +0800

    [Feature][Connector-V2] influxdb sink connector (#3174)
    
    * [Feature][Connector-V2] Add influxDB connector sink
    
    * fix doc style
    
    * remove old e2e for influxdb
    
    * fix e2e and License header
    
    * add Changelog
    
    * delete useless log4j file
    
    * mv scheduler to constructor of InfluxDBSinkWriter
    
    * remove InfluxDBSinkWriter useless synchronized
---
 docs/en/connector-v2/sink/InfluxDB.md              | 104 +++++++++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/influxdb/client/InfluxDBClient.java  |  15 ++
 .../seatunnel/influxdb/config/InfluxDBConfig.java  |  36 +---
 .../seatunnel/influxdb/config/SinkConfig.java      |  97 ++++++++++
 .../seatunnel/influxdb/config/SourceConfig.java    |  67 +++++++
 .../seatunnel/influxdb/config/TimePrecision.java   |  49 +++++
 .../influxdb/serialize/DefaultSerializer.java      | 162 ++++++++++++++++
 .../seatunnel/influxdb/serialize/Serializer.java   |  26 +++
 .../seatunnel/influxdb/sink/InfluxDBSink.java      |  75 ++++++++
 .../influxdb/sink/InfluxDBSinkWriter.java          | 176 ++++++++++++++++++
 .../seatunnel/influxdb/source/InfluxDBSource.java  |  23 ++-
 .../source/InfluxDBSourceSplitEnumerator.java      |  12 +-
 .../connector-influxdb-e2e/pom.xml                 |   5 +-
 .../influxdb/InfluxDBSourceToAssertIT.java         | 122 ------------
 .../e2e/connector/influxdb/InfluxdbIT.java         | 205 +++++++++++++++++++++
 .../src/test/resources/influxdb-to-influxdb.conf   |  58 ++++++
 .../test/resources/influxdb_source_to_assert.conf  | 188 -------------------
 18 files changed, 1056 insertions(+), 365 deletions(-)

diff --git a/docs/en/connector-v2/sink/InfluxDB.md 
b/docs/en/connector-v2/sink/InfluxDB.md
new file mode 100644
index 000000000..cff9787f9
--- /dev/null
+++ b/docs/en/connector-v2/sink/InfluxDB.md
@@ -0,0 +1,104 @@
+# InfluxDB
+
+> InfluxDB sink connector
+
+## Description
+
+Write data to InfluxDB.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                        | type     | required | default value            
     |
+|-----------------------------|----------|----------|-------------------------------|
+| url                         | string   | yes      | -                        
     |
+| database                    | string   | yes      |                          
     |
+| measurement                 | string   | yes      |                          
     |
+| username                    | string   | no       | -                        
     |
+| password                    | string   | no       | -                        
     |
+| key_time                    | string   | yes      | processing time          
     |
+| key_tags                    | array    | no       | exclude `field` & 
`key_time`  |
+| batch_size                  | int      | no       | 1024                     
     |
+| batch_interval_ms           | int      | no       | -                        
     |
+| max_retries                 | int      | no       | -                        
     |
+| retry_backoff_multiplier_ms | int      | no       | -                        
     |
+| connect_timeout_ms          | long     | no       | 15000                    
     |
+
+### url
+the url to connect to influxDB e.g.
+``` 
+http://influxdb-host:8086
+```
+
+### database [string]
+
+The name of `influxDB` database
+
+### measurement [string]
+
+The name of `influxDB` measurement
+
+### username [string]
+
+`influxDB` user username
+
+### password [string]
+
+`influxDB` user password
+
+### key_time [string]
+
+Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If 
not specified, use processing-time as timestamp
+
+### key_tags [array]
+
+Specify field-name of the `influxDB` measurement tags in SeaTunnelRow.
+If not specified, include all fields with `influxDB` measurement field
+
+### batch_size [int]
+
+For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the influxDB
+
+### batch_interval_ms [int]
+
+For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the influxDB
+
+### max_retries [int]
+
+The number of retries to flush failed
+
+### retry_backoff_multiplier_ms [int]
+
+Using as a multiplier for generating the next delay for backoff
+
+### max_retry_backoff_ms [int]
+
+The amount of time to wait before attempting to retry a request to `influxDB`
+
+### connect_timeout_ms [long]
+the timeout for connecting to InfluxDB, in milliseconds 
+
+## Examples
+```hocon
+sink {
+    InfluxDB {
+        url = "http://influxdb-host:8086";
+        database = "test"
+        measurement = "sink"
+        key_time = "time"
+        key_tags = ["label"]
+        batch_size = 1
+    }
+}
+
+```
+
+## Changelog
+
+### next version
+
+- Add InfluxDB Sink Connector
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 11d7fd4a9..1ac1e2687 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -140,3 +140,4 @@ seatunnel.sink.S3File = connector-file-s3
 seatunnel.source.Amazondynamodb = connector-amazondynamodb
 seatunnel.sink.Amazondynamodb = connector-amazondynamodb
 seatunnel.sink.StarRocks = connector-starrocks
+seatunnel.sink.InfluxDB = connector-influxdb
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
index 8743d5aa2..3ad3a99d5 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.influxdb.client;
 
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
 
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.HttpUrl;
@@ -75,4 +76,18 @@ public class InfluxDBClient {
         log.info("connect influxdb successful. sever version :{}.", version);
         return influxDB;
     }
+
+    public static void setWriteProperty(InfluxDB influxDB, SinkConfig 
sinkConfig) {
+        String rp = sinkConfig.getRp();
+        if (!StringUtils.isEmpty(rp)) {
+            influxDB.setRetentionPolicy(rp);
+        }
+    }
+
+    public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws 
ConnectException {
+        InfluxDB influxDB = getInfluxDB(sinkConfig);
+        influxDB.setDatabase(sinkConfig.getDatabase());
+        setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
+        return  influxDB;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
index 9a04e7d4a..1332c5cab 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 import lombok.Data;
 
 import java.io.Serializable;
-import java.util.List;
 
 @Data
 public class InfluxDBConfig implements Serializable {
@@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable {
     public static final String URL = "url";
     private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
     private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";
-
-    public static final String SQL = "sql";
-    public static final String SQL_WHERE = "where";
-
     public static final String DATABASES = "database";
-    public static final String SPLIT_COLUMN = "split_column";
-    private static final String PARTITION_NUM = "partition_num";
-    private static final String UPPER_BOUND = "upper_bound";
-    private static final String LOWER_BOUND = "lower_bound";
-
-
     private static final String DEFAULT_FORMAT = "MSGPACK";
-    private static final String EPOCH = "epoch";
-
-    public static final String DEFAULT_PARTITIONS = "0";
+    protected static final String EPOCH = "epoch";
     private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
     private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;
-
     private static final String DEFAULT_EPOCH = "n";
 
     private String url;
     private String username;
     private String password;
-    private String sql;
-    private int partitionNum = 0;
-    private String splitKey;
-    private long lowerBound;
-    private long upperBound;
     private String database;
 
     private String format = DEFAULT_FORMAT;
@@ -69,11 +50,8 @@ public class InfluxDBConfig implements Serializable {
 
     private String epoch = DEFAULT_EPOCH;
 
-    List<Integer> columnsIndex;
-
     public InfluxDBConfig(Config config) {
         this.url = config.getString(URL);
-        this.sql = config.getString(SQL);
 
         if (config.hasPath(USERNAME)) {
             this.username = config.getString(USERNAME);
@@ -81,18 +59,6 @@ public class InfluxDBConfig implements Serializable {
         if (config.hasPath(PASSWORD)) {
             this.password = config.getString(PASSWORD);
         }
-        if (config.hasPath(PARTITION_NUM)) {
-            this.partitionNum = config.getInt(PARTITION_NUM);
-        }
-        if (config.hasPath(UPPER_BOUND)) {
-            this.upperBound = config.getInt(UPPER_BOUND);
-        }
-        if (config.hasPath(LOWER_BOUND)) {
-            this.lowerBound = config.getInt(LOWER_BOUND);
-        }
-        if (config.hasPath(SPLIT_COLUMN)) {
-            this.splitKey = config.getString(SPLIT_COLUMN);
-        }
         if (config.hasPath(DATABASES)) {
             this.database = config.getString(DATABASES);
         }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
new file mode 100644
index 000000000..c97d807fa
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.List;
+
+@Setter
+@Getter
+@ToString
+public class SinkConfig extends InfluxDBConfig{
+    public SinkConfig(Config config) {
+        super(config);
+    }
+
+    private static final String KEY_TIME = "key_time";
+    private static final String KEY_TAGS = "key_tags";
+    public static final String KEY_MEASUREMENT = "measurement";
+
+    private static final String BATCH_SIZE = "batch_size";
+    private static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+    private static final String MAX_RETRIES = "max_retries";
+    private static final String WRITE_TIMEOUT = "write_timeout";
+    private static final String RETRY_BACKOFF_MULTIPLIER_MS = 
"retry_backoff_multiplier_ms";
+    private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
+    private static final String RETENTION_POLICY = "rp";
+    private static final int DEFAULT_BATCH_SIZE = 1024;
+    private static final int DEFAULT_WRITE_TIMEOUT = 5;
+    private static final TimePrecision DEFAULT_TIME_PRECISION = 
TimePrecision.NS;
+
+    private String rp;
+    private String measurement;
+    private int writeTimeout = DEFAULT_WRITE_TIMEOUT;
+    private String keyTime;
+    private List<String> keyTags;
+    private int batchSize = DEFAULT_BATCH_SIZE;
+    private Integer batchIntervalMs;
+    private int maxRetries;
+    private int retryBackoffMultiplierMs;
+    private int maxRetryBackoffMs;
+    private TimePrecision precision = DEFAULT_TIME_PRECISION;
+
+    public static SinkConfig loadConfig(Config config) {
+        SinkConfig sinkConfig = new SinkConfig(config);
+
+        if (config.hasPath(KEY_TIME)) {
+            sinkConfig.setKeyTime(config.getString(KEY_TIME));
+        }
+        if (config.hasPath(KEY_TAGS)) {
+            sinkConfig.setKeyTags(config.getStringList(KEY_TAGS));
+        }
+        if (config.hasPath(BATCH_INTERVAL_MS)) {
+            sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS));
+        }
+        if (config.hasPath(MAX_RETRIES)) {
+            sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES));
+        }
+        if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
+            
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+        }
+        if (config.hasPath(MAX_RETRY_BACKOFF_MS)) {
+            
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS));
+        }
+        if (config.hasPath(WRITE_TIMEOUT)) {
+            sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT));
+        }
+        if (config.hasPath(RETENTION_POLICY)) {
+            sinkConfig.setRp(config.getString(RETENTION_POLICY));
+        }
+        if (config.hasPath(EPOCH)) {
+            
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH)));
+        }
+        sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT));
+        return sinkConfig;
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
new file mode 100644
index 000000000..d0c3fe65e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import java.util.List;
+
+@Getter
+public class SourceConfig extends InfluxDBConfig{
+    public static final String SQL = "sql";
+    public static final String SQL_WHERE = "where";
+    public static final String SPLIT_COLUMN = "split_column";
+    private static final String PARTITION_NUM = "partition_num";
+    private static final String UPPER_BOUND = "upper_bound";
+    private static final String LOWER_BOUND = "lower_bound";
+    public static final String DEFAULT_PARTITIONS = "0";
+    private String sql;
+    private int partitionNum = 0;
+    private String splitKey;
+    private long lowerBound;
+    private long upperBound;
+
+    List<Integer> columnsIndex;
+
+    public SourceConfig(Config config) {
+        super(config);
+    }
+
+    public static SourceConfig loadConfig(Config config) {
+        SourceConfig sourceConfig = new SourceConfig(config);
+
+        sourceConfig.sql = config.getString(SQL);
+
+        if (config.hasPath(PARTITION_NUM)) {
+            sourceConfig.partitionNum = config.getInt(PARTITION_NUM);
+        }
+        if (config.hasPath(UPPER_BOUND)) {
+            sourceConfig.upperBound = config.getInt(UPPER_BOUND);
+        }
+        if (config.hasPath(LOWER_BOUND)) {
+            sourceConfig.lowerBound = config.getInt(LOWER_BOUND);
+        }
+        if (config.hasPath(SPLIT_COLUMN)) {
+            sourceConfig.splitKey = config.getString(SPLIT_COLUMN);
+        }
+        return sourceConfig;
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java
new file mode 100644
index 000000000..18af2cdd6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import java.util.concurrent.TimeUnit;
+
+public enum TimePrecision {
+    NS("NS", TimeUnit.NANOSECONDS),
+    U("U", TimeUnit.MICROSECONDS),
+    MS("MS", TimeUnit.MILLISECONDS),
+    S("S", TimeUnit.SECONDS),
+    M("M", TimeUnit.MINUTES),
+    H("H", TimeUnit.HOURS);
+    private String desc;
+    private TimeUnit precision;
+
+    TimePrecision(String desc, TimeUnit precision) {
+        this.desc = desc;
+        this.precision = precision;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return this.precision;
+    }
+
+    public static TimePrecision getPrecision(String desc) {
+        for (TimePrecision timePrecision : TimePrecision.values()) {
+            if (desc.equals(timePrecision.desc)) {
+                return timePrecision;
+            }
+        }
+        return TimePrecision.NS;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
new file mode 100644
index 000000000..8cc458939
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.google.common.base.Strings;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.dto.Point;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DefaultSerializer implements Serializer {
+    private SeaTunnelRowType seaTunnelRowType;
+
+    private final BiConsumer<SeaTunnelRow, Point.Builder> timestampExtractor;
+    private final BiConsumer<SeaTunnelRow, Point.Builder> fieldExtractor;
+    private final BiConsumer<SeaTunnelRow, Point.Builder> tagExtractor;
+    private String measurement;
+
+    private TimeUnit precision;
+
+    public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit 
precision, List<String> tagKeys,
+                             String timestampKey,
+                             String measurement) {
+        this.measurement = measurement;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.timestampExtractor = createTimestampExtractor(seaTunnelRowType, 
timestampKey);
+        this.tagExtractor = createTagExtractor(seaTunnelRowType, tagKeys);
+        List<String> fieldKeys = getFieldKeys(seaTunnelRowType, timestampKey, 
tagKeys);
+        this.fieldExtractor = createFieldExtractor(seaTunnelRowType, 
fieldKeys);
+        this.precision = precision;
+    }
+
+    @Override
+    public Point serialize(SeaTunnelRow seaTunnelRow) {
+        Point.Builder builder = Point.measurement(measurement);
+        timestampExtractor.accept(seaTunnelRow, builder);
+        tagExtractor.accept(seaTunnelRow, builder);
+        fieldExtractor.accept(seaTunnelRow, builder);
+        return builder.build();
+    }
+
+    private BiConsumer<SeaTunnelRow, Point.Builder> 
createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List<String> fieldKeys) 
{
+        return (row, builder) -> {
+            for (int i = 0; i < fieldKeys.size(); i++) {
+                String field = fieldKeys.get(i);
+                int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field);
+                SeaTunnelDataType dataType = 
seaTunnelRowType.getFieldType(indexOfSeaTunnelRow);
+                Object val = row.getField(indexOfSeaTunnelRow);
+                switch (dataType.getSqlType()) {
+                    case BOOLEAN:
+                        builder.addField(field, Boolean.valueOf((Boolean) 
val));
+                        break;
+                    case SMALLINT:
+                        builder.addField(field, Short.valueOf((Short) val));
+                        break;
+                    case INT:
+                        builder.addField(field, ((Number) val).intValue());
+                        break;
+                    case BIGINT:
+                        // Only timstamp support be bigint,however it is 
processed in specicalField
+                        builder.addField(field, ((Number) val).longValue());
+                        break;
+                    case FLOAT:
+                        builder.addField(field, ((Number) val).floatValue());
+                        break;
+                    case DOUBLE:
+                        builder.addField(field, ((Number) val).doubleValue());
+                        break;
+                    case STRING:
+                        builder.addField(field, val.toString());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unsupported 
dataType: " + dataType);
+                }
+            }
+        };
+    }
+
+    private BiConsumer<SeaTunnelRow, Point.Builder> 
createTimestampExtractor(SeaTunnelRowType seaTunnelRowType,
+                                                                  String 
timeKey) {
+        //not config timeKey, use processing time
+        if (Strings.isNullOrEmpty(timeKey)) {
+            return (row, builder) -> builder.time(System.currentTimeMillis(), 
precision);
+        }
+
+        int timeFieldIndex = seaTunnelRowType.indexOf(timeKey);
+        return (row, builder) -> {
+            Object time = row.getField(timeFieldIndex);
+            if (time == null) {
+                builder.time(System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+            }
+            SeaTunnelDataType<?> timestampFieldType = 
seaTunnelRowType.getFieldType(timeFieldIndex);
+            switch (timestampFieldType.getSqlType()) {
+                case STRING:
+                    builder.time(Long.parseLong((String) time), precision);
+                    break;
+                case TIMESTAMP:
+                    builder.time(LocalDateTime.class.cast(time)
+                        .atZone(ZoneOffset.UTC)
+                        .toInstant()
+                        .toEpochMilli(), precision);
+                    break;
+                case BIGINT:
+                    builder.time((Long) time, precision);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unsupported data 
type: " + timestampFieldType);
+            }
+        };
+    }
+
+    private BiConsumer<SeaTunnelRow, Point.Builder> 
createTagExtractor(SeaTunnelRowType seaTunnelRowType,
+                                                            List<String> 
tagKeys) {
+        //not config tagKeys
+        if (CollectionUtils.isEmpty(tagKeys)) {
+            return (row, builder) -> {};
+        }
+
+        return (row, builder) -> {
+            for (int i = 0; i < tagKeys.size(); i++) {
+                String tagKey = tagKeys.get(i);
+                int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey);
+                builder.tag(tagKey, 
row.getField(indexOfSeaTunnelRow).toString());
+            }
+        };
+    }
+
+    private List<String> getFieldKeys(SeaTunnelRowType seaTunnelRowType,
+                                            String timestampKey,
+                                            List<String> tagKeys) {
+        return Stream.of(seaTunnelRowType.getFieldNames())
+                    .filter(name -> CollectionUtils.isEmpty(tagKeys) || 
!tagKeys.contains(name))
+                    .filter(name -> StringUtils.isEmpty(timestampKey) || 
!name.equals(timestampKey))
+                    .collect(Collectors.toList());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java
new file mode 100644
index 000000000..b910efafd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.influxdb.dto.Point;
+
+public interface Serializer {
+    Point serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
new file mode 100644
index 000000000..8d3eb7290
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public String getPluginName() {
+        return "InfluxDB";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URL, 
KEY_MEASUREMENT);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+        this.pluginConfig = config;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType getConsumedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+        return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
new file mode 100644
index 000000000..809a3eaaa
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
+
+    private final Serializer serializer;
+    private InfluxDB influxDB;
+    private SinkConfig sinkConfig;
+    private final List<Point> batchList;
+    private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> scheduledFuture;
+    private volatile Exception flushException;
+    private final Integer batchIntervalMs;
+
+    public InfluxDBSinkWriter(Config pluginConfig,
+                              SeaTunnelRowType seaTunnelRowType) throws 
ConnectException {
+        this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
+        this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
+        this.serializer = new DefaultSerializer(
+                seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), 
sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
+        this.batchList = new ArrayList<>();
+
+        if (batchIntervalMs != null) {
+            scheduler = Executors.newSingleThreadScheduledExecutor(
+                    new 
ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
+            scheduledFuture = scheduler.scheduleAtFixedRate(
+                () -> {
+                    try {
+                        flush();
+                    } catch (IOException e) {
+                        flushException = e;
+                    }
+                },
+                batchIntervalMs,
+                batchIntervalMs,
+                TimeUnit.MILLISECONDS);
+        }
+
+        connect();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        Point record = serializer.serialize(element);
+        write(record);
+    }
+
+    @SneakyThrows
+    @Override
+    public Optional<Void> prepareCommit() {
+        // Flush to storage before snapshot state is performed
+        flush();
+        return super.prepareCommit();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduler.shutdown();
+        }
+
+        flush();
+
+        if (influxDB != null) {
+            influxDB.close();
+            influxDB = null;
+        }
+    }
+
+    public void write(Point record) throws IOException {
+        checkFlushException();
+
+        batchList.add(record);
+        if (sinkConfig.getBatchSize() > 0
+                && batchList.size() >= sinkConfig.getBatchSize()) {
+            flush();
+        }
+    }
+
+    public void flush() throws IOException {
+        checkFlushException();
+        if (batchList.isEmpty()) {
+            return;
+        }
+        BatchPoints.Builder batchPoints = 
BatchPoints.database(sinkConfig.getDatabase());
+        for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
+            try {
+                batchPoints.points(batchList);
+                influxDB.write(batchPoints.build());
+            } catch (Exception e) {
+                log.error("Writing records to influxdb failed, retry times = 
{}", i, e);
+                if (i >= sinkConfig.getMaxRetries()) {
+                    throw new IOException("Writing records to InfluxDB 
failed.", e);
+                }
+
+                try {
+                    long backoff = 
Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
+                            sinkConfig.getMaxRetryBackoffMs());
+                    Thread.sleep(backoff);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(
+                            "Unable to flush; interrupted while doing another 
attempt.", e);
+                }
+            }
+        }
+
+        batchList.clear();
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to InfluxDB failed.", 
flushException);
+        }
+    }
+
+    public void connect() throws ConnectException {
+        if (influxDB == null) {
+            influxDB = InfluxDBClient.getWriteClient(sinkConfig);
+            String version = influxDB.version();
+            if (!influxDB.ping().isGood()) {
+                String errorMessage =
+                        String.format(
+                                "connect influxdb failed, due to influxdb 
version info is unknown, the url is: {%s}",
+                                sinkConfig.getUrl());
+                throw new ConnectException(errorMessage);
+            }
+            log.info("connect influxdb successful. sever version :{}.", 
version);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index bc971476b..804e804f5 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -33,7 +32,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
-import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -53,7 +52,7 @@ import java.util.stream.Collectors;
 @AutoService(SeaTunnelSource.class)
 public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, 
InfluxDBSourceSplit, InfluxDBSourceState>  {
     private SeaTunnelRowType typeInfo;
-    private InfluxDBConfig influxDBConfig;
+    private SourceConfig sourceConfig;
 
     private List<Integer> columnsIndexList;
 
@@ -66,15 +65,15 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, SQL);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
         try {
-            this.influxDBConfig = new InfluxDBConfig(config);
+            this.sourceConfig = SourceConfig.loadConfig(config);
             SeaTunnelSchema seatunnelSchema = 
SeaTunnelSchema.buildWithConfig(config);
             this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
-            this.columnsIndexList = 
initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig));
+            this.columnsIndexList = 
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
         } catch (Exception e) {
             throw new PrepareFailException("InfluxDB", PluginType.SOURCE, 
e.toString());
         }
@@ -92,26 +91,26 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
 
     @Override
     public SourceReader createReader(SourceReader.Context readerContext) 
throws Exception {
-        return new InfluxdbSourceReader(influxDBConfig, readerContext, 
typeInfo, columnsIndexList);
+        return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo, 
columnsIndexList);
     }
 
     @Override
     public SourceSplitEnumerator 
createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws 
Exception {
-        return new InfluxDBSourceSplitEnumerator(enumeratorContext, 
influxDBConfig);
+        return new InfluxDBSourceSplitEnumerator(enumeratorContext, 
sourceConfig);
     }
 
     @Override
     public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> 
enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
-        return new InfluxDBSourceSplitEnumerator(enumeratorContext, 
checkpointState, influxDBConfig);
+        return new InfluxDBSourceSplitEnumerator(enumeratorContext, 
checkpointState, sourceConfig);
     }
 
     private List<Integer> initColumnsIndex(InfluxDB influxDB)  {
         //query one row to get column info
-        String query = influxDBConfig.getSql() + QUERY_LIMIT;
+        String query = sourceConfig.getSql() + QUERY_LIMIT;
         List<String> fieldNames = new ArrayList<>();
         try {
             QueryResult queryResult = influxDB.query(
-                    new Query(query, influxDBConfig.getDatabase()));
+                    new Query(query, sourceConfig.getDatabase()));
 
             List<QueryResult.Series> serieList = 
queryResult.getResults().get(0).getSeries();
             fieldNames.addAll(serieList.get(0).getColumns());
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index d22eba116..139a6e3ad 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
 
 import lombok.extern.slf4j.Slf4j;
@@ -37,17 +37,17 @@ import java.util.Set;
 
 @Slf4j
 public class InfluxDBSourceSplitEnumerator implements 
SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
-    final InfluxDBConfig config;
+    final SourceConfig config;
     private final Context<InfluxDBSourceSplit> context;
     private final Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
     private final Object stateLock = new Object();
     private volatile boolean shouldEnumerate;
 
-    public 
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
 context, InfluxDBConfig config) {
+    public 
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
 context, SourceConfig config) {
         this(context, null, config);
     }
 
-    public 
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
 context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+    public 
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
 context, InfluxDBSourceState sourceState, SourceConfig config) {
         this.context = context;
         this.config = config;
         this.pendingSplit = new HashMap<>();
@@ -113,7 +113,7 @@ public class InfluxDBSourceSplitEnumerator implements 
SourceSplitEnumerator<Infl
         Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
         // no need numPartitions, use one partition
         if (config.getPartitionNum() == 0) {
-            influxDBSourceSplits.add(new 
InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+            influxDBSourceSplits.add(new 
InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql));
             return influxDBSourceSplits;
         }
         //calculate numRange base on (lowerBound upperBound partitionNum)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
index a3b93102e..652782b06 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
@@ -26,6 +26,7 @@
     <artifactId>connector-influxdb-e2e</artifactId>
 
     <dependencies>
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-influxdb</artifactId>
@@ -34,9 +35,9 @@
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-assert</artifactId>
+            <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java
deleted file mode 100644
index d39aa4f39..000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.connector.influxdb;
-
-import static org.awaitility.Awaitility.given;
-
-import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
-import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.BatchPoints;
-import org.influxdb.dto.Point;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class InfluxDBSourceToAssertIT extends TestSuiteBase implements 
TestResource {
-
-    private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8";
-    private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host";
-    private static final int INFLUXDB_CONTAINER_PORT = 8086;
-    private static final String INFLUXDB_DATABASE = "test";
-    private static final String INFLUXDB_MEASUREMENT = "test";
-
-    private GenericContainer<?> influxDBServer;
-    private  InfluxDB influxDB;
-
-    @BeforeAll
-    @Override
-    public void startUp() {
-        influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE)
-            .withNetwork(NETWORK)
-            .withNetworkAliases(INFLUXDB_CONTAINER_HOST)
-            .withExposedPorts(INFLUXDB_CONTAINER_PORT)
-            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(INFLUXDB_DOCKER_IMAGE)));
-        Startables.deepStart(Stream.of(influxDBServer)).join();
-        log.info("influxdb container started");
-        given().ignoreExceptions()
-            .await()
-            .atLeast(100, TimeUnit.MILLISECONDS)
-            .pollInterval(500, TimeUnit.MILLISECONDS)
-            .atMost(30, TimeUnit.SECONDS)
-            .untilAsserted(() -> initializeInfluxDBClient());
-        batchInsertData();
-    }
-
-    @TestTemplate
-    public void testInfluxDBSource(TestContainer container) throws 
IOException, InterruptedException {
-        Container.ExecResult execResult = 
container.executeJob("/influxdb_source_to_assert.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
-
-    private void initializeInfluxDBClient() throws ConnectException {
-        InfluxDBConfig influxDBConfig = new 
InfluxDBConfig(String.format("http://%s:%s";, influxDBServer.getHost(), 
influxDBServer.getFirstMappedPort()));
-        influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
-    }
-
-    public void batchInsertData() {
-        influxDB.createDatabase(INFLUXDB_DATABASE);
-        BatchPoints batchPoints = BatchPoints
-                .database(INFLUXDB_DATABASE)
-                .build();
-        for (int i = 0; i < 100; i++) {
-            Point point = Point.measurement(INFLUXDB_MEASUREMENT)
-                    .time(new Date().getTime(), TimeUnit.NANOSECONDS)
-                    .tag("label", String.format("label_%s", i))
-                    .addField("f1", String.format("f1_%s", i))
-                    .addField("f2", Double.valueOf(i + 1))
-                    .addField("f3", Long.valueOf(i + 2))
-                    .addField("f4", Float.valueOf(i + 3))
-                    .addField("f5", Integer.valueOf(i))
-                    .addField("f6", (short) (i + 4))
-                    .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE)
-                    .build();
-            batchPoints.point(point);
-        }
-        influxDB.write(batchPoints);
-    }
-
-    @AfterAll
-    @Override
-    public void tearDown() {
-        if (influxDB != null) {
-            influxDB.close();
-        }
-        if (influxDBServer != null) {
-            influxDBServer.stop();
-        }
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
new file mode 100644
index 000000000..20cc6dce0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.influxdb;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+@Slf4j
+public class InfluxdbIT extends TestSuiteBase implements TestResource {
+    private static final String IMAGE = "influxdb:1.8";
+    private static final String HOST = "influxdb-host";
+    private static final int PORT = 8086;
+    private static final String INFLUXDB_DATABASE = "test";
+    private static final String INFLUXDB_SOURCE_MEASUREMENT = "source";
+    private static final String INFLUXDB_SINK_MEASUREMENT = "sink";
+
+
+    private static final Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> 
TEST_DATASET = generateTestDataSet();
+
+    private GenericContainer<?> influxdbContainer;
+    private String influxDBConnectUrl;
+
+    private InfluxDB influxDB;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.influxdbContainer = new 
GenericContainer<>(DockerImageName.parse(IMAGE))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withExposedPorts(PORT)
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+            .waitingFor(new HostPortWaitStrategy()
+                .withStartupTimeout(Duration.ofMinutes(2)));
+        Startables.deepStart(Stream.of(influxdbContainer)).join();
+        influxDBConnectUrl = String.format("http://%s:%s";, 
influxdbContainer.getHost(), influxdbContainer.getFirstMappedPort());
+        log.info("Influxdb container started");
+        this.initializeInfluxDBClient();
+        this.initSourceData();
+    }
+
+    private void initSourceData() {
+        influxDB.createDatabase(INFLUXDB_DATABASE);
+        BatchPoints batchPoints = BatchPoints
+                .database(INFLUXDB_DATABASE)
+                .build();
+        List<SeaTunnelRow> rows = TEST_DATASET._2();
+        SeaTunnelRowType rowType = TEST_DATASET._1();
+
+        for (int i = 0; i < rows.size(); i++) {
+            SeaTunnelRow row = rows.get(i);
+            Point point = Point.measurement(INFLUXDB_SOURCE_MEASUREMENT)
+                    .time((Long) row.getField(0), TimeUnit.NANOSECONDS)
+                    .tag(rowType.getFieldName(1), (String) row.getField(1))
+                    .addField(rowType.getFieldName(2), (String) 
row.getField(2))
+                    .addField(rowType.getFieldName(3), (Double) 
row.getField(3))
+                    .addField(rowType.getFieldName(4), (Long) row.getField(4))
+                    .addField(rowType.getFieldName(5), (Float) row.getField(5))
+                    .addField(rowType.getFieldName(6), (Integer) 
row.getField(6))
+                    .addField(rowType.getFieldName(7), (Short) row.getField(7))
+                    .addField(rowType.getFieldName(8), (Boolean) 
row.getField(8))
+                    .build();
+            batchPoints.point(point);
+        }
+        influxDB.write(batchPoints);
+    }
+
+    private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> 
generateTestDataSet() {
+        SeaTunnelRowType rowType = new SeaTunnelRowType(
+            new String[]{
+                "time",
+                "label",
+                "c_string",
+                "c_double",
+                "c_bigint",
+                "c_float",
+                "c_int",
+                "c_smallint",
+                "c_boolean"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.DOUBLE_TYPE,
+                BasicType.LONG_TYPE,
+                BasicType.FLOAT_TYPE,
+                BasicType.INT_TYPE,
+                BasicType.SHORT_TYPE,
+                BasicType.BOOLEAN_TYPE
+            }
+        );
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                new Object[]{
+                    new Date().getTime(),
+                    String.format("label_%s", i),
+                    String.format("f1_%s", i),
+                    Double.parseDouble("1.1"),
+                    Long.parseLong("1"),
+                    Float.parseFloat("1.1"),
+                    Integer.valueOf(i),
+                    Short.parseShort("1"),
+                    i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE
+                });
+            rows.add(row);
+        }
+        return Tuple2.apply(rowType, rows);
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        influxDB.close();
+        influxdbContainer.stop();
+    }
+
+    @TestTemplate
+    public void testInfluxdb(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/influxdb-to-influxdb.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        String sourceSql = String.format("select * from %s order by time", 
INFLUXDB_SOURCE_MEASUREMENT);
+        String sinkSql = String.format("select * from %s order by time", 
INFLUXDB_SINK_MEASUREMENT);
+        QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, 
INFLUXDB_DATABASE));
+        QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, 
INFLUXDB_DATABASE));
+        //assert data count
+        Assertions.assertEquals(sourceQueryResult.getResults().size(), 
sinkQueryResult.getResults().size());
+        //assert data values
+        List<List<Object>> sourceValues = 
sourceQueryResult.getResults().get(0).getSeries().get(0).getValues();
+        List<List<Object>> sinkValues = 
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
+        int rowSize = sourceValues.size();
+        int colSize = sourceValues.get(0).size();
+
+        for (int row = 0; row < rowSize; row++) {
+            for (int col = 0; col < colSize; col++) {
+                Object sourceColValue = sourceValues.get(row).get(col);
+                Object sinkColValue = sinkValues.get(row).get(col);
+
+                if (!Objects.deepEquals(sourceColValue, sinkColValue)) {
+                    Assertions.assertEquals(sourceColValue, sinkColValue);
+                }
+            }
+
+        }
+    }
+
+    private void initializeInfluxDBClient() throws ConnectException {
+        InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
+        influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
new file mode 100644
index 000000000..f95af29a2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    InfluxDB {
+        url = "http://influxdb-host:8086";
+        sql = "select label, c_string, c_double, c_bigint, c_float, c_int, 
c_smallint, c_boolean from source"
+        database = "test"
+        upper_bound = 99
+        lower_bound = 0
+        partition_num = 4
+        split_column = "c_int"
+        fields {
+            label = STRING
+            c_string = STRING
+            c_double = DOUBLE
+            c_bigint = BIGINT
+            c_float = FLOAT
+            c_int = INT
+            c_smallint = SMALLINT
+            c_boolean = BOOLEAN
+            time = BIGINT
+            }
+    }
+}
+
+transform {
+}
+
+sink {
+    InfluxDB {
+        url = "http://influxdb-host:8086";
+        database = "test"
+        measurement = "sink"
+        key_time = "time"
+        key_tags = ["label"]
+        batch_size = 1
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf
deleted file mode 100644
index ea0e6e177..000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf
+++ /dev/null
@@ -1,188 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
-
-env {
-  execution.parallelism = 1
-  job.mode = "BATCH"
-
-  # You can set spark configuration here
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-    InfluxDB {
-        url = "http://influxdb-host:8086";
-        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
-        database = "test"
-        upper_bound = 99
-        lower_bound = 0
-        partition_num = 4
-        split_column = "f5"
-        fields {
-            label = STRING
-            f1 = STRING
-            f2 = DOUBLE
-            f3 = BIGINT
-            f4 = FLOAT
-            f5 = INT
-            f6 = SMALLINT
-            f7 = BOOLEAN
-            }
-    }
-}
-
-sink {
-     Assert {
-         rules =
-          {
-            row_rules = [
-              {
-                rule_type = MAX_ROW
-                rule_value = 100
-              },
-              {
-                rule_type = MIN_ROW
-                rule_value = 100
-              }
-            ],
-           field_rules =  [{
-                 field_name = f1
-                 field_type = string
-                 field_value = [
-                     {
-                         rule_type = NOT_NULL
-                     },
-                     {
-                         rule_type = MIN_LENGTH
-                         rule_value = 4
-                     },
-                     {
-                          rule_type = MAX_LENGTH
-                          rule_value = 5
-                     }
-                 ]
-             },{
-                 field_name = f2
-                 field_type = double
-                 field_value = [
-                     {
-                         rule_type = NOT_NULL
-                     },
-                     {
-                         rule_type = MIN
-                         rule_value = 1
-                     },
-                     {
-                          rule_type = MAX
-                          rule_value = 100
-                     }
-                 ]
-             },{
-                 field_name = f3
-                 field_type = long
-                 field_value = [
-                      {
-                           rule_type = NOT_NULL
-                      },
-                      {
-                           rule_type = MIN
-                           rule_value = 2
-                       },
-                       {
-                            rule_type = MAX
-                            rule_value = 101
-                       }
-                 ]
-             },{
-                 field_name = f4
-                 field_type = float
-                 field_value = [
-                      {
-                           rule_type = NOT_NULL
-                      },
-                      {
-                           rule_type = MIN
-                           rule_value = 3
-                       },
-                       {
-                            rule_type = MAX
-                            rule_value = 102
-                       }
-                 ]
-             },{
-                 field_name = f5
-                 field_type = int
-                 field_value = [
-                      {
-                           rule_type = NOT_NULL
-                      },
-                      {
-                           rule_type = MIN
-                           rule_value = 0
-                       },
-                       {
-                            rule_type = MAX
-                            rule_value = 99
-                       }
-                 ]
-             },{
-                 field_name = f6
-                 field_type = short
-                 field_value = [
-                      {
-                           rule_type = NOT_NULL
-                      },
-                      {
-                           rule_type = MIN
-                           rule_value = 4
-                       },
-                       {
-                            rule_type = MAX
-                            rule_value = 103
-                       }
-                 ]
-             },{
-                 field_name = f7
-                 field_type = boolean
-                 field_value = [
-                      {
-                           rule_type = NOT_NULL
-                      },
-                      {
-                           rule_type = MIN
-                           rule_value = 0
-                       },
-                       {
-                            rule_type = MAX
-                            rule_value = 1
-                       }
-                 ]
-             }
-             ]
-        }
-     }
-  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/category/sink-v2
-}
\ No newline at end of file

Reply via email to