This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 56f13e920d Support multi-table sink feature for influxdb (#6278)
56f13e920d is described below

commit 56f13e920d9e4fa03855a00c438a0b51e18ae575
Author: Bibo <[email protected]>
AuthorDate: Tue Feb 27 11:37:59 2024 +0800

    Support multi-table sink feature for influxdb (#6278)
---
 .../seatunnel/influxdb/config/SinkConfig.java      | 24 +++---
 .../seatunnel/influxdb/sink/InfluxDBSink.java      | 45 +++---------
 .../influxdb/sink/InfluxDBSinkFactory.java         | 26 ++++++-
 .../influxdb/sink/InfluxDBSinkWriter.java          | 12 +--
 .../e2e/connector/influxdb/InfluxdbIT.java         | 62 ++++++++++++++++
 .../fake_to_infuxdb_with_multipletable.conf        | 85 ++++++++++++++++++++++
 6 files changed, 200 insertions(+), 54 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
index 806309bffe..071e3c235f 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -34,6 +34,7 @@ import java.util.List;
 public class SinkConfig extends InfluxDBConfig {
     public SinkConfig(Config config) {
         super(config);
+        loadConfig(config);
     }
 
     public static final Option<String> KEY_TIME =
@@ -103,35 +104,32 @@ public class SinkConfig extends InfluxDBConfig {
     private int maxRetryBackoffMs;
     private TimePrecision precision = DEFAULT_TIME_PRECISION;
 
-    public static SinkConfig loadConfig(Config config) {
-        SinkConfig sinkConfig = new SinkConfig(config);
+    public void loadConfig(Config config) {
 
         if (config.hasPath(KEY_TIME.key())) {
-            sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
+            setKeyTime(config.getString(KEY_TIME.key()));
         }
         if (config.hasPath(KEY_TAGS.key())) {
-            sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
+            setKeyTags(config.getStringList(KEY_TAGS.key()));
         }
         if (config.hasPath(MAX_RETRIES.key())) {
-            sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
+            setMaxRetries(config.getInt(MAX_RETRIES.key()));
         }
         if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-            sinkConfig.setRetryBackoffMultiplierMs(
-                    config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
+            
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
         }
         if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-            
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
+            setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
         }
         if (config.hasPath(WRITE_TIMEOUT.key())) {
-            sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
+            setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
         }
         if (config.hasPath(RETENTION_POLICY.key())) {
-            sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
+            setRp(config.getString(RETENTION_POLICY.key()));
         }
         if (config.hasPath(EPOCH.key())) {
-            
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
+            
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
         }
-        sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
-        return sinkConfig;
+        setMeasurement(config.getString(KEY_MEASUREMENT.key()));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 9cc03272d1..da7ba20f91 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -17,61 +17,36 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
 
 import java.io.IOException;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
+public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
 
-@AutoService(SeaTunnelSink.class)
-public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-
-    private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
+    private SinkConfig sinkConfig;
 
     @Override
     public String getPluginName() {
         return "InfluxDB";
     }
 
-    @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, URL.key(), 
KEY_MEASUREMENT.key());
-        if (!result.isSuccess()) {
-            throw new InfluxdbConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        this.pluginConfig = config;
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
+        this.sinkConfig = sinkConfig;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
+        return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
index 3d44158e78..81a294e95b 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -17,11 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
@@ -36,6 +45,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConf
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
 
 @AutoService(Factory.class)
+@Slf4j
 public class InfluxDBSinkFactory implements TableSinkFactory {
 
     @Override
@@ -46,10 +56,11 @@ public class InfluxDBSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(URL, DATABASES, KEY_MEASUREMENT)
+                .required(URL, DATABASES)
                 .bundled(USERNAME, PASSWORD)
                 .optional(
                         CONNECT_TIMEOUT_MS,
+                        KEY_MEASUREMENT,
                         KEY_TAGS,
                         KEY_TIME,
                         BATCH_SIZE,
@@ -57,4 +68,17 @@ public class InfluxDBSinkFactory implements TableSinkFactory 
{
                         RETRY_BACKOFF_MULTIPLIER_MS)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig config = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+        if (!config.getOptional(KEY_MEASUREMENT).isPresent()) {
+            Map<String, String> map = config.toMap();
+            map.put(KEY_MEASUREMENT.key(), 
catalogTable.getTableId().toTablePath().getFullName());
+            config = ReadonlyConfig.fromMap(new HashMap<>(map));
+        }
+        SinkConfig sinkConfig = new SinkConfig(config.toConfig());
+        return () -> new InfluxDBSink(sinkConfig, catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
index f2d401db51..b0d23c7e79 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
@@ -44,7 +44,8 @@ import java.util.List;
 import java.util.Optional;
 
 @Slf4j
-public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
+public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter {
 
     private final Serializer serializer;
     private InfluxDB influxdb;
@@ -52,9 +53,10 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     private final List<Point> batchList;
     private volatile Exception flushException;
 
-    public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType 
seaTunnelRowType)
+    public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType 
seaTunnelRowType)
             throws ConnectException {
-        this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
+        this.sinkConfig = sinkConfig;
+        log.info("sinkConfig is {}", JsonUtils.toJsonString(sinkConfig));
         this.serializer =
                 new DefaultSerializer(
                         seaTunnelRowType,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
index ddc7afadac..c139afc5e8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -25,7 +25,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.influxdb.InfluxDB;
 import org.influxdb.dto.BatchPoints;
@@ -51,10 +53,12 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Slf4j
@@ -242,6 +246,64 @@ public class InfluxdbIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple 
table read")
+    public void testInfluxdbMultipleWrite(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/fake_to_infuxdb_with_multipletable.conf");
+
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertAll(
+                () -> {
+                    Assertions.assertIterableEquals(
+                            Stream.<List<Object>>of(
+                                            Arrays.asList(
+                                                    1627529632356l,
+                                                    "label_1",
+                                                    "sink_1",
+                                                    4.3,
+                                                    200,
+                                                    2.5,
+                                                    2,
+                                                    5,
+                                                    true))
+                                    .collect(Collectors.toList()),
+                            readData("infulxdb_sink_1"));
+                },
+                () -> {
+                    Assertions.assertIterableEquals(
+                            Stream.<List<Object>>of(
+                                            Arrays.asList(
+                                                    1627529632357l,
+                                                    "label_2",
+                                                    "sink_2",
+                                                    4.3,
+                                                    200,
+                                                    2.5,
+                                                    2,
+                                                    5,
+                                                    true))
+                                    .collect(Collectors.toList()),
+                            readData("infulxdb_sink_2"));
+                });
+    }
+
+    public List<List<Object>> readData(String tableName) {
+        String sinkSql =
+                String.format(
+                        "select time, label, c_string, c_double, c_bigint, 
c_float,c_int, c_smallint, c_boolean from %s order by time",
+                        tableName);
+        QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, 
INFLUXDB_DATABASE));
+
+        List<List<Object>> sinkValues =
+                
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
+        return sinkValues;
+    }
+
     private void initializeInfluxDBClient() throws ConnectException {
         InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
         influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf
new file mode 100644
index 0000000000..eda13ff704
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "infulxdb_sink_1"
+         fields {
+                    label = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+           }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 
1627529632356]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "infulxdb_sink_2"
+              fields {
+                    label = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 
1627529632357]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+sink {
+  InfluxDB {
+    url = "http://influxdb-host:8086";
+    database = "test"
+    key_time = "time"
+    batch_size = 1
+  }
+}
\ No newline at end of file

Reply via email to