This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 630e88479 [Feature][Connector-V2] influxdb sink connector (#3174)
630e88479 is described below
commit 630e88479138e54224227e2fa974835a23b4bb31
Author: Bibo <[email protected]>
AuthorDate: Tue Nov 8 18:53:18 2022 +0800
[Feature][Connector-V2] influxdb sink connector (#3174)
* [Feature][Connector-V2] Add influxDB connector sink
* fix doc style
* remove old e2e for influxdb
* fix e2e and License header
* add Changelog
* delete useless log4j file
* mv scheduler to constructor of InfluxDBSinkWriter
* remove InfluxDBSinkWriter useless synchronized
---
docs/en/connector-v2/sink/InfluxDB.md | 104 +++++++++++
plugin-mapping.properties | 1 +
.../seatunnel/influxdb/client/InfluxDBClient.java | 15 ++
.../seatunnel/influxdb/config/InfluxDBConfig.java | 36 +---
.../seatunnel/influxdb/config/SinkConfig.java | 97 ++++++++++
.../seatunnel/influxdb/config/SourceConfig.java | 67 +++++++
.../seatunnel/influxdb/config/TimePrecision.java | 49 +++++
.../influxdb/serialize/DefaultSerializer.java | 162 ++++++++++++++++
.../seatunnel/influxdb/serialize/Serializer.java | 26 +++
.../seatunnel/influxdb/sink/InfluxDBSink.java | 75 ++++++++
.../influxdb/sink/InfluxDBSinkWriter.java | 176 ++++++++++++++++++
.../seatunnel/influxdb/source/InfluxDBSource.java | 23 ++-
.../source/InfluxDBSourceSplitEnumerator.java | 12 +-
.../connector-influxdb-e2e/pom.xml | 5 +-
.../influxdb/InfluxDBSourceToAssertIT.java | 122 ------------
.../e2e/connector/influxdb/InfluxdbIT.java | 205 +++++++++++++++++++++
.../src/test/resources/influxdb-to-influxdb.conf | 58 ++++++
.../test/resources/influxdb_source_to_assert.conf | 188 -------------------
18 files changed, 1056 insertions(+), 365 deletions(-)
diff --git a/docs/en/connector-v2/sink/InfluxDB.md
b/docs/en/connector-v2/sink/InfluxDB.md
new file mode 100644
index 000000000..cff9787f9
--- /dev/null
+++ b/docs/en/connector-v2/sink/InfluxDB.md
@@ -0,0 +1,104 @@
+# InfluxDB
+
+> InfluxDB sink connector
+
+## Description
+
+Write data to InfluxDB.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value
|
+|-----------------------------|----------|----------|-------------------------------|
+| url | string | yes | -
|
+| database | string | yes |
|
+| measurement | string | yes |
|
+| username | string | no | -
|
+| password | string | no | -
|
+| key_time | string | yes | processing time
|
+| key_tags | array | no | exclude `field` &
`key_time` |
+| batch_size | int | no | 1024
|
+| batch_interval_ms | int | no | -
|
+| max_retries | int | no | -
|
+| retry_backoff_multiplier_ms | int | no | -
|
+| connect_timeout_ms | long | no | 15000
|
+
+### url
+the url to connect to influxDB e.g.
+```
+http://influxdb-host:8086
+```
+
+### database [string]
+
+The name of `influxDB` database
+
+### measurement [string]
+
+The name of `influxDB` measurement
+
+### username [string]
+
+`influxDB` user username
+
+### password [string]
+
+`influxDB` user password
+
+### key_time [string]
+
+Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If
not specified, use processing-time as timestamp
+
+### key_tags [array]
+
+Specify field-name of the `influxDB` measurement tags in SeaTunnelRow.
+If not specified, include all fields with `influxDB` measurement field
+
+### batch_size [int]
+
+For batch writing, when the number of buffers reaches the number of
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed
into the influxDB
+
+### batch_interval_ms [int]
+
+For batch writing, when the number of buffers reaches the number of
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed
into the influxDB
+
+### max_retries [int]
+
+The number of retries to flush failed
+
+### retry_backoff_multiplier_ms [int]
+
+Using as a multiplier for generating the next delay for backoff
+
+### max_retry_backoff_ms [int]
+
+The amount of time to wait before attempting to retry a request to `influxDB`
+
+### connect_timeout_ms [long]
+the timeout for connecting to InfluxDB, in milliseconds
+
+## Examples
+```hocon
+sink {
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ database = "test"
+ measurement = "sink"
+ key_time = "time"
+ key_tags = ["label"]
+ batch_size = 1
+ }
+}
+
+```
+
+## Changelog
+
+### next version
+
+- Add InfluxDB Sink Connector
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 11d7fd4a9..1ac1e2687 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -140,3 +140,4 @@ seatunnel.sink.S3File = connector-file-s3
seatunnel.source.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.StarRocks = connector-starrocks
+seatunnel.sink.InfluxDB = connector-influxdb
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
index 8743d5aa2..3ad3a99d5 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.client;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
@@ -75,4 +76,18 @@ public class InfluxDBClient {
log.info("connect influxdb successful. sever version :{}.", version);
return influxDB;
}
+
+ public static void setWriteProperty(InfluxDB influxDB, SinkConfig
sinkConfig) {
+ String rp = sinkConfig.getRp();
+ if (!StringUtils.isEmpty(rp)) {
+ influxDB.setRetentionPolicy(rp);
+ }
+ }
+
+ public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws
ConnectException {
+ InfluxDB influxDB = getInfluxDB(sinkConfig);
+ influxDB.setDatabase(sinkConfig.getDatabase());
+ setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
+ return influxDB;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
index 9a04e7d4a..1332c5cab 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import java.io.Serializable;
-import java.util.List;
@Data
public class InfluxDBConfig implements Serializable {
@@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable {
public static final String URL = "url";
private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";
-
- public static final String SQL = "sql";
- public static final String SQL_WHERE = "where";
-
public static final String DATABASES = "database";
- public static final String SPLIT_COLUMN = "split_column";
- private static final String PARTITION_NUM = "partition_num";
- private static final String UPPER_BOUND = "upper_bound";
- private static final String LOWER_BOUND = "lower_bound";
-
-
private static final String DEFAULT_FORMAT = "MSGPACK";
- private static final String EPOCH = "epoch";
-
- public static final String DEFAULT_PARTITIONS = "0";
+ protected static final String EPOCH = "epoch";
private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;
-
private static final String DEFAULT_EPOCH = "n";
private String url;
private String username;
private String password;
- private String sql;
- private int partitionNum = 0;
- private String splitKey;
- private long lowerBound;
- private long upperBound;
private String database;
private String format = DEFAULT_FORMAT;
@@ -69,11 +50,8 @@ public class InfluxDBConfig implements Serializable {
private String epoch = DEFAULT_EPOCH;
- List<Integer> columnsIndex;
-
public InfluxDBConfig(Config config) {
this.url = config.getString(URL);
- this.sql = config.getString(SQL);
if (config.hasPath(USERNAME)) {
this.username = config.getString(USERNAME);
@@ -81,18 +59,6 @@ public class InfluxDBConfig implements Serializable {
if (config.hasPath(PASSWORD)) {
this.password = config.getString(PASSWORD);
}
- if (config.hasPath(PARTITION_NUM)) {
- this.partitionNum = config.getInt(PARTITION_NUM);
- }
- if (config.hasPath(UPPER_BOUND)) {
- this.upperBound = config.getInt(UPPER_BOUND);
- }
- if (config.hasPath(LOWER_BOUND)) {
- this.lowerBound = config.getInt(LOWER_BOUND);
- }
- if (config.hasPath(SPLIT_COLUMN)) {
- this.splitKey = config.getString(SPLIT_COLUMN);
- }
if (config.hasPath(DATABASES)) {
this.database = config.getString(DATABASES);
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
new file mode 100644
index 000000000..c97d807fa
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.List;
+
+@Setter
+@Getter
+@ToString
+public class SinkConfig extends InfluxDBConfig{
+ public SinkConfig(Config config) {
+ super(config);
+ }
+
+ private static final String KEY_TIME = "key_time";
+ private static final String KEY_TAGS = "key_tags";
+ public static final String KEY_MEASUREMENT = "measurement";
+
+ private static final String BATCH_SIZE = "batch_size";
+ private static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+ private static final String MAX_RETRIES = "max_retries";
+ private static final String WRITE_TIMEOUT = "write_timeout";
+ private static final String RETRY_BACKOFF_MULTIPLIER_MS =
"retry_backoff_multiplier_ms";
+ private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
+ private static final String RETENTION_POLICY = "rp";
+ private static final int DEFAULT_BATCH_SIZE = 1024;
+ private static final int DEFAULT_WRITE_TIMEOUT = 5;
+ private static final TimePrecision DEFAULT_TIME_PRECISION =
TimePrecision.NS;
+
+ private String rp;
+ private String measurement;
+ private int writeTimeout = DEFAULT_WRITE_TIMEOUT;
+ private String keyTime;
+ private List<String> keyTags;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private Integer batchIntervalMs;
+ private int maxRetries;
+ private int retryBackoffMultiplierMs;
+ private int maxRetryBackoffMs;
+ private TimePrecision precision = DEFAULT_TIME_PRECISION;
+
+ public static SinkConfig loadConfig(Config config) {
+ SinkConfig sinkConfig = new SinkConfig(config);
+
+ if (config.hasPath(KEY_TIME)) {
+ sinkConfig.setKeyTime(config.getString(KEY_TIME));
+ }
+ if (config.hasPath(KEY_TAGS)) {
+ sinkConfig.setKeyTags(config.getStringList(KEY_TAGS));
+ }
+ if (config.hasPath(BATCH_INTERVAL_MS)) {
+ sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS));
+ }
+ if (config.hasPath(MAX_RETRIES)) {
+ sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES));
+ }
+ if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
+
sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+ }
+ if (config.hasPath(MAX_RETRY_BACKOFF_MS)) {
+
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS));
+ }
+ if (config.hasPath(WRITE_TIMEOUT)) {
+ sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT));
+ }
+ if (config.hasPath(RETENTION_POLICY)) {
+ sinkConfig.setRp(config.getString(RETENTION_POLICY));
+ }
+ if (config.hasPath(EPOCH)) {
+
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH)));
+ }
+ sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT));
+ return sinkConfig;
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
new file mode 100644
index 000000000..d0c3fe65e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+
+import java.util.List;
+
+@Getter
+public class SourceConfig extends InfluxDBConfig{
+ public static final String SQL = "sql";
+ public static final String SQL_WHERE = "where";
+ public static final String SPLIT_COLUMN = "split_column";
+ private static final String PARTITION_NUM = "partition_num";
+ private static final String UPPER_BOUND = "upper_bound";
+ private static final String LOWER_BOUND = "lower_bound";
+ public static final String DEFAULT_PARTITIONS = "0";
+ private String sql;
+ private int partitionNum = 0;
+ private String splitKey;
+ private long lowerBound;
+ private long upperBound;
+
+ List<Integer> columnsIndex;
+
+ public SourceConfig(Config config) {
+ super(config);
+ }
+
+ public static SourceConfig loadConfig(Config config) {
+ SourceConfig sourceConfig = new SourceConfig(config);
+
+ sourceConfig.sql = config.getString(SQL);
+
+ if (config.hasPath(PARTITION_NUM)) {
+ sourceConfig.partitionNum = config.getInt(PARTITION_NUM);
+ }
+ if (config.hasPath(UPPER_BOUND)) {
+ sourceConfig.upperBound = config.getInt(UPPER_BOUND);
+ }
+ if (config.hasPath(LOWER_BOUND)) {
+ sourceConfig.lowerBound = config.getInt(LOWER_BOUND);
+ }
+ if (config.hasPath(SPLIT_COLUMN)) {
+ sourceConfig.splitKey = config.getString(SPLIT_COLUMN);
+ }
+ return sourceConfig;
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java
new file mode 100644
index 000000000..18af2cdd6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import java.util.concurrent.TimeUnit;
+
+public enum TimePrecision {
+ NS("NS", TimeUnit.NANOSECONDS),
+ U("U", TimeUnit.MICROSECONDS),
+ MS("MS", TimeUnit.MILLISECONDS),
+ S("S", TimeUnit.SECONDS),
+ M("M", TimeUnit.MINUTES),
+ H("H", TimeUnit.HOURS);
+ private String desc;
+ private TimeUnit precision;
+
+ TimePrecision(String desc, TimeUnit precision) {
+ this.desc = desc;
+ this.precision = precision;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return this.precision;
+ }
+
+ public static TimePrecision getPrecision(String desc) {
+ for (TimePrecision timePrecision : TimePrecision.values()) {
+ if (desc.equals(timePrecision.desc)) {
+ return timePrecision;
+ }
+ }
+ return TimePrecision.NS;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
new file mode 100644
index 000000000..8cc458939
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.google.common.base.Strings;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.dto.Point;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DefaultSerializer implements Serializer {
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private final BiConsumer<SeaTunnelRow, Point.Builder> timestampExtractor;
+ private final BiConsumer<SeaTunnelRow, Point.Builder> fieldExtractor;
+ private final BiConsumer<SeaTunnelRow, Point.Builder> tagExtractor;
+ private String measurement;
+
+ private TimeUnit precision;
+
+ public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit
precision, List<String> tagKeys,
+ String timestampKey,
+ String measurement) {
+ this.measurement = measurement;
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.timestampExtractor = createTimestampExtractor(seaTunnelRowType,
timestampKey);
+ this.tagExtractor = createTagExtractor(seaTunnelRowType, tagKeys);
+ List<String> fieldKeys = getFieldKeys(seaTunnelRowType, timestampKey,
tagKeys);
+ this.fieldExtractor = createFieldExtractor(seaTunnelRowType,
fieldKeys);
+ this.precision = precision;
+ }
+
+ @Override
+ public Point serialize(SeaTunnelRow seaTunnelRow) {
+ Point.Builder builder = Point.measurement(measurement);
+ timestampExtractor.accept(seaTunnelRow, builder);
+ tagExtractor.accept(seaTunnelRow, builder);
+ fieldExtractor.accept(seaTunnelRow, builder);
+ return builder.build();
+ }
+
+ private BiConsumer<SeaTunnelRow, Point.Builder>
createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List<String> fieldKeys)
{
+ return (row, builder) -> {
+ for (int i = 0; i < fieldKeys.size(); i++) {
+ String field = fieldKeys.get(i);
+ int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field);
+ SeaTunnelDataType dataType =
seaTunnelRowType.getFieldType(indexOfSeaTunnelRow);
+ Object val = row.getField(indexOfSeaTunnelRow);
+ switch (dataType.getSqlType()) {
+ case BOOLEAN:
+ builder.addField(field, Boolean.valueOf((Boolean)
val));
+ break;
+ case SMALLINT:
+ builder.addField(field, Short.valueOf((Short) val));
+ break;
+ case INT:
+ builder.addField(field, ((Number) val).intValue());
+ break;
+ case BIGINT:
+ // Only timstamp support be bigint,however it is
processed in specicalField
+ builder.addField(field, ((Number) val).longValue());
+ break;
+ case FLOAT:
+ builder.addField(field, ((Number) val).floatValue());
+ break;
+ case DOUBLE:
+ builder.addField(field, ((Number) val).doubleValue());
+ break;
+ case STRING:
+ builder.addField(field, val.toString());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported
dataType: " + dataType);
+ }
+ }
+ };
+ }
+
+ private BiConsumer<SeaTunnelRow, Point.Builder>
createTimestampExtractor(SeaTunnelRowType seaTunnelRowType,
+ String
timeKey) {
+ //not config timeKey, use processing time
+ if (Strings.isNullOrEmpty(timeKey)) {
+ return (row, builder) -> builder.time(System.currentTimeMillis(),
precision);
+ }
+
+ int timeFieldIndex = seaTunnelRowType.indexOf(timeKey);
+ return (row, builder) -> {
+ Object time = row.getField(timeFieldIndex);
+ if (time == null) {
+ builder.time(System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ }
+ SeaTunnelDataType<?> timestampFieldType =
seaTunnelRowType.getFieldType(timeFieldIndex);
+ switch (timestampFieldType.getSqlType()) {
+ case STRING:
+ builder.time(Long.parseLong((String) time), precision);
+ break;
+ case TIMESTAMP:
+ builder.time(LocalDateTime.class.cast(time)
+ .atZone(ZoneOffset.UTC)
+ .toInstant()
+ .toEpochMilli(), precision);
+ break;
+ case BIGINT:
+ builder.time((Long) time, precision);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported data
type: " + timestampFieldType);
+ }
+ };
+ }
+
+ private BiConsumer<SeaTunnelRow, Point.Builder>
createTagExtractor(SeaTunnelRowType seaTunnelRowType,
+ List<String>
tagKeys) {
+ //not config tagKeys
+ if (CollectionUtils.isEmpty(tagKeys)) {
+ return (row, builder) -> {};
+ }
+
+ return (row, builder) -> {
+ for (int i = 0; i < tagKeys.size(); i++) {
+ String tagKey = tagKeys.get(i);
+ int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey);
+ builder.tag(tagKey,
row.getField(indexOfSeaTunnelRow).toString());
+ }
+ };
+ }
+
+ private List<String> getFieldKeys(SeaTunnelRowType seaTunnelRowType,
+ String timestampKey,
+ List<String> tagKeys) {
+ return Stream.of(seaTunnelRowType.getFieldNames())
+ .filter(name -> CollectionUtils.isEmpty(tagKeys) ||
!tagKeys.contains(name))
+ .filter(name -> StringUtils.isEmpty(timestampKey) ||
!name.equals(timestampKey))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java
new file mode 100644
index 000000000..b910efafd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.influxdb.dto.Point;
+
+public interface Serializer {
+ Point serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
new file mode 100644
index 000000000..8d3eb7290
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public String getPluginName() {
+ return "InfluxDB";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(config, URL,
KEY_MEASUREMENT);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ this.pluginConfig = config;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
+ return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
new file mode 100644
index 000000000..809a3eaaa
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
{
+
+ private final Serializer serializer;
+ private InfluxDB influxDB;
+ private SinkConfig sinkConfig;
+ private final List<Point> batchList;
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture<?> scheduledFuture;
+ private volatile Exception flushException;
+ private final Integer batchIntervalMs;
+
+ public InfluxDBSinkWriter(Config pluginConfig,
+ SeaTunnelRowType seaTunnelRowType) throws
ConnectException {
+ this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
+ this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
+ this.serializer = new DefaultSerializer(
+ seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(),
sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
+ this.batchList = new ArrayList<>();
+
+ if (batchIntervalMs != null) {
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
+ scheduledFuture = scheduler.scheduleAtFixedRate(
+ () -> {
+ try {
+ flush();
+ } catch (IOException e) {
+ flushException = e;
+ }
+ },
+ batchIntervalMs,
+ batchIntervalMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ connect();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ Point record = serializer.serialize(element);
+ write(record);
+ }
+
+ @SneakyThrows
+ @Override
+ public Optional<Void> prepareCommit() {
+ // Flush to storage before snapshot state is performed
+ flush();
+ return super.prepareCommit();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduler.shutdown();
+ }
+
+ flush();
+
+ if (influxDB != null) {
+ influxDB.close();
+ influxDB = null;
+ }
+ }
+
+ public void write(Point record) throws IOException {
+ checkFlushException();
+
+ batchList.add(record);
+ if (sinkConfig.getBatchSize() > 0
+ && batchList.size() >= sinkConfig.getBatchSize()) {
+ flush();
+ }
+ }
+
+ public void flush() throws IOException {
+ checkFlushException();
+ if (batchList.isEmpty()) {
+ return;
+ }
+ BatchPoints.Builder batchPoints =
BatchPoints.database(sinkConfig.getDatabase());
+ for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
+ try {
+ batchPoints.points(batchList);
+ influxDB.write(batchPoints.build());
+ } catch (Exception e) {
+ log.error("Writing records to influxdb failed, retry times =
{}", i, e);
+ if (i >= sinkConfig.getMaxRetries()) {
+ throw new IOException("Writing records to InfluxDB
failed.", e);
+ }
+
+ try {
+ long backoff =
Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
+ sinkConfig.getMaxRetryBackoffMs());
+ Thread.sleep(backoff);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Unable to flush; interrupted while doing another
attempt.", e);
+ }
+ }
+ }
+
+ batchList.clear();
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to InfluxDB failed.",
flushException);
+ }
+ }
+
+ public void connect() throws ConnectException {
+ if (influxDB == null) {
+ influxDB = InfluxDBClient.getWriteClient(sinkConfig);
+ String version = influxDB.version();
+ if (!influxDB.ping().isGood()) {
+ String errorMessage =
+ String.format(
+ "connect influxdb failed, due to influxdb
version info is unknown, the url is: {%s}",
+ sinkConfig.getUrl());
+ throw new ConnectException(errorMessage);
+ }
+ log.info("connect influxdb successful. sever version :{}.",
version);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index bc971476b..804e804f5 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.Boundedness;
@@ -33,7 +32,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
-import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -53,7 +52,7 @@ import java.util.stream.Collectors;
@AutoService(SeaTunnelSource.class)
public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow,
InfluxDBSourceSplit, InfluxDBSourceState> {
private SeaTunnelRowType typeInfo;
- private InfluxDBConfig influxDBConfig;
+ private SourceConfig sourceConfig;
private List<Integer> columnsIndexList;
@@ -66,15 +65,15 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL);
+ CheckResult result = CheckConfigUtil.checkAllExists(config, SQL);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
try {
- this.influxDBConfig = new InfluxDBConfig(config);
+ this.sourceConfig = SourceConfig.loadConfig(config);
SeaTunnelSchema seatunnelSchema =
SeaTunnelSchema.buildWithConfig(config);
this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
- this.columnsIndexList =
initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig));
+ this.columnsIndexList =
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
} catch (Exception e) {
throw new PrepareFailException("InfluxDB", PluginType.SOURCE,
e.toString());
}
@@ -92,26 +91,26 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
@Override
public SourceReader createReader(SourceReader.Context readerContext)
throws Exception {
- return new InfluxdbSourceReader(influxDBConfig, readerContext,
typeInfo, columnsIndexList);
+ return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo,
columnsIndexList);
}
@Override
public SourceSplitEnumerator
createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws
Exception {
- return new InfluxDBSourceSplitEnumerator(enumeratorContext,
influxDBConfig);
+ return new InfluxDBSourceSplitEnumerator(enumeratorContext,
sourceConfig);
}
@Override
public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
- return new InfluxDBSourceSplitEnumerator(enumeratorContext,
checkpointState, influxDBConfig);
+ return new InfluxDBSourceSplitEnumerator(enumeratorContext,
checkpointState, sourceConfig);
}
private List<Integer> initColumnsIndex(InfluxDB influxDB) {
//query one row to get column info
- String query = influxDBConfig.getSql() + QUERY_LIMIT;
+ String query = sourceConfig.getSql() + QUERY_LIMIT;
List<String> fieldNames = new ArrayList<>();
try {
QueryResult queryResult = influxDB.query(
- new Query(query, influxDBConfig.getDatabase()));
+ new Query(query, sourceConfig.getDatabase()));
List<QueryResult.Series> serieList =
queryResult.getResults().get(0).getSeries();
fieldNames.addAll(serieList.get(0).getColumns());
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index d22eba116..139a6e3ad 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
-import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import lombok.extern.slf4j.Slf4j;
@@ -37,17 +37,17 @@ import java.util.Set;
@Slf4j
public class InfluxDBSourceSplitEnumerator implements
SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
- final InfluxDBConfig config;
+ final SourceConfig config;
private final Context<InfluxDBSourceSplit> context;
private final Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
private final Object stateLock = new Object();
private volatile boolean shouldEnumerate;
- public
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
context, InfluxDBConfig config) {
+ public
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
context, SourceConfig config) {
this(context, null, config);
}
- public
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+ public
InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
context, InfluxDBSourceState sourceState, SourceConfig config) {
this.context = context;
this.config = config;
this.pendingSplit = new HashMap<>();
@@ -113,7 +113,7 @@ public class InfluxDBSourceSplitEnumerator implements
SourceSplitEnumerator<Infl
Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
// no need numPartitions, use one partition
if (config.getPartitionNum() == 0) {
- influxDBSourceSplits.add(new
InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+ influxDBSourceSplits.add(new
InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql));
return influxDBSourceSplits;
}
//calculate numRange base on (lowerBound upperBound partitionNum)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
index a3b93102e..652782b06 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml
@@ -26,6 +26,7 @@
<artifactId>connector-influxdb-e2e</artifactId>
<dependencies>
+ <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-influxdb</artifactId>
@@ -34,9 +35,9 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-assert</artifactId>
+ <artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java
deleted file mode 100644
index d39aa4f39..000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.connector.influxdb;
-
-import static org.awaitility.Awaitility.given;
-
-import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
-import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.BatchPoints;
-import org.influxdb.dto.Point;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class InfluxDBSourceToAssertIT extends TestSuiteBase implements
TestResource {
-
- private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8";
- private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host";
- private static final int INFLUXDB_CONTAINER_PORT = 8086;
- private static final String INFLUXDB_DATABASE = "test";
- private static final String INFLUXDB_MEASUREMENT = "test";
-
- private GenericContainer<?> influxDBServer;
- private InfluxDB influxDB;
-
- @BeforeAll
- @Override
- public void startUp() {
- influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(INFLUXDB_CONTAINER_HOST)
- .withExposedPorts(INFLUXDB_CONTAINER_PORT)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(INFLUXDB_DOCKER_IMAGE)));
- Startables.deepStart(Stream.of(influxDBServer)).join();
- log.info("influxdb container started");
- given().ignoreExceptions()
- .await()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(30, TimeUnit.SECONDS)
- .untilAsserted(() -> initializeInfluxDBClient());
- batchInsertData();
- }
-
- @TestTemplate
- public void testInfluxDBSource(TestContainer container) throws
IOException, InterruptedException {
- Container.ExecResult execResult =
container.executeJob("/influxdb_source_to_assert.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- private void initializeInfluxDBClient() throws ConnectException {
- InfluxDBConfig influxDBConfig = new
InfluxDBConfig(String.format("http://%s:%s", influxDBServer.getHost(),
influxDBServer.getFirstMappedPort()));
- influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
- }
-
- public void batchInsertData() {
- influxDB.createDatabase(INFLUXDB_DATABASE);
- BatchPoints batchPoints = BatchPoints
- .database(INFLUXDB_DATABASE)
- .build();
- for (int i = 0; i < 100; i++) {
- Point point = Point.measurement(INFLUXDB_MEASUREMENT)
- .time(new Date().getTime(), TimeUnit.NANOSECONDS)
- .tag("label", String.format("label_%s", i))
- .addField("f1", String.format("f1_%s", i))
- .addField("f2", Double.valueOf(i + 1))
- .addField("f3", Long.valueOf(i + 2))
- .addField("f4", Float.valueOf(i + 3))
- .addField("f5", Integer.valueOf(i))
- .addField("f6", (short) (i + 4))
- .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE)
- .build();
- batchPoints.point(point);
- }
- influxDB.write(batchPoints);
- }
-
- @AfterAll
- @Override
- public void tearDown() {
- if (influxDB != null) {
- influxDB.close();
- }
- if (influxDBServer != null) {
- influxDBServer.stop();
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
new file mode 100644
index 000000000..20cc6dce0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.influxdb;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+@Slf4j
+public class InfluxdbIT extends TestSuiteBase implements TestResource {
+ private static final String IMAGE = "influxdb:1.8";
+ private static final String HOST = "influxdb-host";
+ private static final int PORT = 8086;
+ private static final String INFLUXDB_DATABASE = "test";
+ private static final String INFLUXDB_SOURCE_MEASUREMENT = "source";
+ private static final String INFLUXDB_SINK_MEASUREMENT = "sink";
+
+
+ private static final Tuple2<SeaTunnelRowType, List<SeaTunnelRow>>
TEST_DATASET = generateTestDataSet();
+
+ private GenericContainer<?> influxdbContainer;
+ private String influxDBConnectUrl;
+
+ private InfluxDB influxDB;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.influxdbContainer = new
GenericContainer<>(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withExposedPorts(PORT)
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+ .waitingFor(new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ Startables.deepStart(Stream.of(influxdbContainer)).join();
+ influxDBConnectUrl = String.format("http://%s:%s",
influxdbContainer.getHost(), influxdbContainer.getFirstMappedPort());
+ log.info("Influxdb container started");
+ this.initializeInfluxDBClient();
+ this.initSourceData();
+ }
+
+ private void initSourceData() {
+ influxDB.createDatabase(INFLUXDB_DATABASE);
+ BatchPoints batchPoints = BatchPoints
+ .database(INFLUXDB_DATABASE)
+ .build();
+ List<SeaTunnelRow> rows = TEST_DATASET._2();
+ SeaTunnelRowType rowType = TEST_DATASET._1();
+
+ for (int i = 0; i < rows.size(); i++) {
+ SeaTunnelRow row = rows.get(i);
+ Point point = Point.measurement(INFLUXDB_SOURCE_MEASUREMENT)
+ .time((Long) row.getField(0), TimeUnit.NANOSECONDS)
+ .tag(rowType.getFieldName(1), (String) row.getField(1))
+ .addField(rowType.getFieldName(2), (String)
row.getField(2))
+ .addField(rowType.getFieldName(3), (Double)
row.getField(3))
+ .addField(rowType.getFieldName(4), (Long) row.getField(4))
+ .addField(rowType.getFieldName(5), (Float) row.getField(5))
+ .addField(rowType.getFieldName(6), (Integer)
row.getField(6))
+ .addField(rowType.getFieldName(7), (Short) row.getField(7))
+ .addField(rowType.getFieldName(8), (Boolean)
row.getField(8))
+ .build();
+ batchPoints.point(point);
+ }
+ influxDB.write(batchPoints);
+ }
+
+ private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>>
generateTestDataSet() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(
+ new String[]{
+ "time",
+ "label",
+ "c_string",
+ "c_double",
+ "c_bigint",
+ "c_float",
+ "c_int",
+ "c_smallint",
+ "c_boolean"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.BOOLEAN_TYPE
+ }
+ );
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ new Date().getTime(),
+ String.format("label_%s", i),
+ String.format("f1_%s", i),
+ Double.parseDouble("1.1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Integer.valueOf(i),
+ Short.parseShort("1"),
+ i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE
+ });
+ rows.add(row);
+ }
+ return Tuple2.apply(rowType, rows);
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ influxDB.close();
+ influxdbContainer.stop();
+ }
+
+ @TestTemplate
+ public void testInfluxdb(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/influxdb-to-influxdb.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ String sourceSql = String.format("select * from %s order by time",
INFLUXDB_SOURCE_MEASUREMENT);
+ String sinkSql = String.format("select * from %s order by time",
INFLUXDB_SINK_MEASUREMENT);
+ QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql,
INFLUXDB_DATABASE));
+ QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql,
INFLUXDB_DATABASE));
+ //assert data count
+ Assertions.assertEquals(sourceQueryResult.getResults().size(),
sinkQueryResult.getResults().size());
+ //assert data values
+ List<List<Object>> sourceValues =
sourceQueryResult.getResults().get(0).getSeries().get(0).getValues();
+ List<List<Object>> sinkValues =
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
+ int rowSize = sourceValues.size();
+ int colSize = sourceValues.get(0).size();
+
+ for (int row = 0; row < rowSize; row++) {
+ for (int col = 0; col < colSize; col++) {
+ Object sourceColValue = sourceValues.get(row).get(col);
+ Object sinkColValue = sinkValues.get(row).get(col);
+
+ if (!Objects.deepEquals(sourceColValue, sinkColValue)) {
+ Assertions.assertEquals(sourceColValue, sinkColValue);
+ }
+ }
+
+ }
+ }
+
+ private void initializeInfluxDBClient() throws ConnectException {
+ InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
+ influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
new file mode 100644
index 000000000..f95af29a2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ sql = "select label, c_string, c_double, c_bigint, c_float, c_int,
c_smallint, c_boolean from source"
+ database = "test"
+ upper_bound = 99
+ lower_bound = 0
+ partition_num = 4
+ split_column = "c_int"
+ fields {
+ label = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ database = "test"
+ measurement = "sink"
+ key_time = "time"
+ key_tags = ["label"]
+ batch_size = 1
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf
deleted file mode 100644
index ea0e6e177..000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf
+++ /dev/null
@@ -1,188 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- execution.parallelism = 1
- job.mode = "BATCH"
-
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- # This is a example source plugin **only for test and demonstrate the
feature source plugin**
- InfluxDB {
- url = "http://influxdb-host:8086"
- sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
- database = "test"
- upper_bound = 99
- lower_bound = 0
- partition_num = 4
- split_column = "f5"
- fields {
- label = STRING
- f1 = STRING
- f2 = DOUBLE
- f3 = BIGINT
- f4 = FLOAT
- f5 = INT
- f6 = SMALLINT
- f7 = BOOLEAN
- }
- }
-}
-
-sink {
- Assert {
- rules =
- {
- row_rules = [
- {
- rule_type = MAX_ROW
- rule_value = 100
- },
- {
- rule_type = MIN_ROW
- rule_value = 100
- }
- ],
- field_rules = [{
- field_name = f1
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN_LENGTH
- rule_value = 4
- },
- {
- rule_type = MAX_LENGTH
- rule_value = 5
- }
- ]
- },{
- field_name = f2
- field_type = double
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 1
- },
- {
- rule_type = MAX
- rule_value = 100
- }
- ]
- },{
- field_name = f3
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 2
- },
- {
- rule_type = MAX
- rule_value = 101
- }
- ]
- },{
- field_name = f4
- field_type = float
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 3
- },
- {
- rule_type = MAX
- rule_value = 102
- }
- ]
- },{
- field_name = f5
- field_type = int
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- },{
- field_name = f6
- field_type = short
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 4
- },
- {
- rule_type = MAX
- rule_value = 103
- }
- ]
- },{
- field_name = f7
- field_type = boolean
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 1
- }
- ]
- }
- ]
- }
- }
- # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/category/sink-v2
-}
\ No newline at end of file