This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 56f13e920d Support multi-table sink feature for influxdb (#6278)
56f13e920d is described below
commit 56f13e920d9e4fa03855a00c438a0b51e18ae575
Author: Bibo <[email protected]>
AuthorDate: Tue Feb 27 11:37:59 2024 +0800
Support multi-table sink feature for influxdb (#6278)
---
.../seatunnel/influxdb/config/SinkConfig.java | 24 +++---
.../seatunnel/influxdb/sink/InfluxDBSink.java | 45 +++---------
.../influxdb/sink/InfluxDBSinkFactory.java | 26 ++++++-
.../influxdb/sink/InfluxDBSinkWriter.java | 12 +--
.../e2e/connector/influxdb/InfluxdbIT.java | 62 ++++++++++++++++
.../fake_to_infuxdb_with_multipletable.conf | 85 ++++++++++++++++++++++
6 files changed, 200 insertions(+), 54 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
index 806309bffe..071e3c235f 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -34,6 +34,7 @@ import java.util.List;
public class SinkConfig extends InfluxDBConfig {
public SinkConfig(Config config) {
super(config);
+ loadConfig(config);
}
public static final Option<String> KEY_TIME =
@@ -103,35 +104,32 @@ public class SinkConfig extends InfluxDBConfig {
private int maxRetryBackoffMs;
private TimePrecision precision = DEFAULT_TIME_PRECISION;
- public static SinkConfig loadConfig(Config config) {
- SinkConfig sinkConfig = new SinkConfig(config);
+ public void loadConfig(Config config) {
if (config.hasPath(KEY_TIME.key())) {
- sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
+ setKeyTime(config.getString(KEY_TIME.key()));
}
if (config.hasPath(KEY_TAGS.key())) {
- sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
+ setKeyTags(config.getStringList(KEY_TAGS.key()));
}
if (config.hasPath(MAX_RETRIES.key())) {
- sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
+ setMaxRetries(config.getInt(MAX_RETRIES.key()));
}
if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
- sinkConfig.setRetryBackoffMultiplierMs(
- config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
+
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
}
if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
+ setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
}
if (config.hasPath(WRITE_TIMEOUT.key())) {
- sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
+ setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
}
if (config.hasPath(RETENTION_POLICY.key())) {
- sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
+ setRp(config.getString(RETENTION_POLICY.key()));
}
if (config.hasPath(EPOCH.key())) {
-
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
+
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
}
- sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
- return sinkConfig;
+ setMeasurement(config.getString(KEY_MEASUREMENT.key()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 9cc03272d1..da7ba20f91 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -17,61 +17,36 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import java.io.IOException;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
+public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
-@AutoService(SeaTunnelSink.class)
-public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-
- private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
+ private SinkConfig sinkConfig;
@Override
public String getPluginName() {
return "InfluxDB";
}
- @Override
- public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(config, URL.key(),
KEY_MEASUREMENT.key());
- if (!result.isSuccess()) {
- throw new InfluxdbConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- this.pluginConfig = config;
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
+ this.sinkConfig = sinkConfig;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
+ return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
index 3d44158e78..81a294e95b 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java
@@ -17,11 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
@@ -36,6 +45,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConf
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
@AutoService(Factory.class)
+@Slf4j
public class InfluxDBSinkFactory implements TableSinkFactory {
@Override
@@ -46,10 +56,11 @@ public class InfluxDBSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(URL, DATABASES, KEY_MEASUREMENT)
+ .required(URL, DATABASES)
.bundled(USERNAME, PASSWORD)
.optional(
CONNECT_TIMEOUT_MS,
+ KEY_MEASUREMENT,
KEY_TAGS,
KEY_TIME,
BATCH_SIZE,
@@ -57,4 +68,17 @@ public class InfluxDBSinkFactory implements TableSinkFactory
{
RETRY_BACKOFF_MULTIPLIER_MS)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+ if (!config.getOptional(KEY_MEASUREMENT).isPresent()) {
+ Map<String, String> map = config.toMap();
+ map.put(KEY_MEASUREMENT.key(),
catalogTable.getTableId().toTablePath().getFullName());
+ config = ReadonlyConfig.fromMap(new HashMap<>(map));
+ }
+ SinkConfig sinkConfig = new SinkConfig(config.toConfig());
+ return () -> new InfluxDBSink(sinkConfig, catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
index f2d401db51..b0d23c7e79 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
@@ -44,7 +44,8 @@ import java.util.List;
import java.util.Optional;
@Slf4j
-public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
{
+public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter {
private final Serializer serializer;
private InfluxDB influxdb;
@@ -52,9 +53,10 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final List<Point> batchList;
private volatile Exception flushException;
- public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType
seaTunnelRowType)
+ public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType
seaTunnelRowType)
throws ConnectException {
- this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
+ this.sinkConfig = sinkConfig;
+ log.info("sinkConfig is {}", JsonUtils.toJsonString(sinkConfig));
this.serializer =
new DefaultSerializer(
seaTunnelRowType,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
index ddc7afadac..c139afc5e8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -25,7 +25,9 @@ import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
@@ -51,10 +53,12 @@ import java.io.IOException;
import java.net.ConnectException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
@@ -242,6 +246,64 @@ public class InfluxdbIT extends TestSuiteBase implements
TestResource {
}
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple
table read")
+ public void testInfluxdbMultipleWrite(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/fake_to_infuxdb_with_multipletable.conf");
+
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertAll(
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+ Arrays.asList(
+ 1627529632356l,
+ "label_1",
+ "sink_1",
+ 4.3,
+ 200,
+ 2.5,
+ 2,
+ 5,
+ true))
+ .collect(Collectors.toList()),
+ readData("infulxdb_sink_1"));
+ },
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+ Arrays.asList(
+ 1627529632357l,
+ "label_2",
+ "sink_2",
+ 4.3,
+ 200,
+ 2.5,
+ 2,
+ 5,
+ true))
+ .collect(Collectors.toList()),
+ readData("infulxdb_sink_2"));
+ });
+ }
+
+ public List<List<Object>> readData(String tableName) {
+ String sinkSql =
+ String.format(
+ "select time, label, c_string, c_double, c_bigint,
c_float,c_int, c_smallint, c_boolean from %s order by time",
+ tableName);
+ QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql,
INFLUXDB_DATABASE));
+
+ List<List<Object>> sinkValues =
+
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
+ return sinkValues;
+ }
+
private void initializeInfluxDBClient() throws ConnectException {
InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf
new file mode 100644
index 0000000000..eda13ff704
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "infulxdb_sink_1"
+ fields {
+ label = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true,
1627529632356]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "infulxdb_sink_2"
+ fields {
+ label = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true,
1627529632357]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+sink {
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ database = "test"
+ key_time = "time"
+ batch_size = 1
+ }
+}
\ No newline at end of file