This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 98b593f095 [Feature][Connector-V2] Support multi-table sink feature
for TDengine (#9215)
98b593f095 is described below
commit 98b593f095b049a6c115a57d375df0beeb68fc93
Author: jiazhang <[email protected]>
AuthorDate: Wed Apr 23 20:32:12 2025 +0800
[Feature][Connector-V2] Support multi-table sink feature for TDengine
(#9215)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../tdengine/config/TDengineCommonOptions.java | 56 +++++++++
.../tdengine/config/TDengineSinkConfig.java | 56 +++++++++
.../tdengine/config/TDengineSinkOptions.java | 35 ++++++
.../tdengine/config/TDengineSourceOptions.java | 41 +++++++
.../seatunnel/tdengine/sink/TDengineSink.java | 36 +++---
.../tdengine/sink/TDengineSinkFactory.java | 58 +++++++++
.../tdengine/sink/TDengineSinkWriter.java | 14 ++-
.../tdengine/source/TDengineSourceFactory.java | 54 +++++++++
.../tdengine/sink/TDengineSinkWriterTest.java | 24 ++--
.../e2e/connector/tdengine/TDengineIT.java | 38 +++++-
.../tdengine/tdengine_fake_to_sink_multitable.conf | 130 +++++++++++++++++++++
.../starter/seatunnel/SeaTunnelConnectorTest.java | 1 -
13 files changed, 499 insertions(+), 46 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 24566951d3..9da7933f7a 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,7 +195,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("PulsarSinkOptions");
whiteList.add("SlsSinkOptions");
whiteList.add("Neo4jSinkOptions");
- whiteList.add("TDengineSourceOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("MongodbSinkOptions");
whiteList.add("SlsSourceOptions");
@@ -207,7 +206,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("RocketMqSourceOptions");
whiteList.add("TablestoreSinkOptions");
whiteList.add("TableStoreDBSourceOptions");
- whiteList.add("TDengineSinkOptions");
whiteList.add("Neo4jSourceOptions");
whiteList.add("QdrantSourceOptions");
whiteList.add("SocketSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
new file mode 100644
index 0000000000..2f9ae2d051
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Data;
+
+@Data
+public abstract class TDengineCommonOptions {
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The TDengine server URL, format:
jdbc:TAOS-RS://host:port");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The username for TDengine
authentication");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The password for TDengine
authentication");
+
+ public static final Option<String> DATABASE =
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The TDengine database name");
+
+ public static final Option<String> STABLE =
+ Options.key("stable")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The TDengine super table name");
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
new file mode 100644
index 0000000000..b2892c61d6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+@Data
+@Builder(builderClassName = "Builder")
+public class TDengineSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String url;
+ private String username;
+ private String password;
+ private String database;
+ private String stable;
+ private String timezone;
+
+ public static TDengineSinkConfig of(ReadonlyConfig config) {
+ Builder builder = TDengineSinkConfig.builder();
+
+ builder.url(config.get(TDengineSinkOptions.URL));
+ builder.username(config.get(TDengineSinkOptions.USERNAME));
+ builder.password(config.get(TDengineSinkOptions.PASSWORD));
+ builder.database(config.get(TDengineSinkOptions.DATABASE));
+ builder.stable(config.get(TDengineSinkOptions.STABLE));
+
+ Optional<String> optionalTimezone =
config.getOptional(TDengineSinkOptions.TIMEZONE);
+
+
builder.timezone(optionalTimezone.orElseGet(TDengineSinkOptions.TIMEZONE::defaultValue));
+
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
new file mode 100644
index 0000000000..7f64177656
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class TDengineSinkOptions extends TDengineCommonOptions {
+
+ public static final Option<String> TIMEZONE =
+ Options.key("timezone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription("The timezone used for timestamp
conversion, default is UTC");
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
new file mode 100644
index 0000000000..58ce50f92f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class TDengineSourceOptions extends TDengineCommonOptions {
+
+ public static final Option<String> LOWER_BOUND =
+ Options.key("lowerBound")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The lower bound for data query range");
+
+ public static final Option<String> UPPER_BOUND =
+ Options.key("upperBound")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The upper bound for data query range");
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
index 194c94dda5..ec7319e95d 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -17,40 +17,34 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-
-import com.google.auto.service.AutoService;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
import java.io.IOException;
import java.util.Optional;
-@AutoService(SeaTunnelSink.class)
-public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private SeaTunnelRowType seaTunnelRowType;
- private Config pluginConfig;
+public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
+ private final TDengineSinkConfig tdengineSinkConfig;
+ private final CatalogTable catalogTable;
- @Override
- public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
- throws IOException {
- return new TDengineSinkWriter(pluginConfig, seaTunnelRowType);
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public TDengineSink(TDengineSinkConfig tdengineSinkConfig, CatalogTable
catalogTable) {
+ this.tdengineSinkConfig = tdengineSinkConfig;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
@Override
- public void prepare(Config pluginConfig) {
- this.pluginConfig = pluginConfig;
+ public TDengineSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
+ return new TDengineSinkWriter(tdengineSinkConfig, seaTunnelRowType);
}
@Override
@@ -60,6 +54,6 @@ public class TDengineSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkFactory.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkFactory.java
new file mode 100644
index 0000000000..976f5a8728
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class TDengineSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "TDengine";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ TDengineSinkOptions.URL,
+ TDengineSinkOptions.USERNAME,
+ TDengineSinkOptions.PASSWORD,
+ TDengineSinkOptions.DATABASE,
+ TDengineSinkOptions.STABLE)
+ .optional(
+ TDengineSinkOptions.TIMEZONE,
+ SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ TDengineSinkConfig tdengineSinkConfig =
TDengineSinkConfig.of(context.getOptions());
+ return () -> new TDengineSink(tdengineSinkConfig,
context.getCatalogTable());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index ba352357f2..4137becdb9 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -20,13 +20,13 @@ package
org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.base.Throwables;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
import org.apache.commons.lang3.ArrayUtils;
@@ -50,17 +50,19 @@ import java.util.Objects;
import static
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
@Slf4j
-public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
{
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private final Connection conn;
- private final TDengineSourceConfig config;
+
+ private final TDengineSinkConfig config;
private int tagsNum;
@SneakyThrows
- public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType
seaTunnelRowType) {
- config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+ public TDengineSinkWriter(TDengineSinkConfig config, SeaTunnelRowType
seaTunnelRowType) {
+ this.config = config;
String jdbcUrl =
StringUtils.join(
config.getUrl(),
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
new file mode 100644
index 0000000000..921e4d0d70
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.LOWER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.STABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.UPPER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.USERNAME;
+
+@AutoService(Factory.class)
+public class TDengineSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "TDengine";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(URL, USERNAME, PASSWORD, DATABASE, STABLE,
LOWER_BOUND, UPPER_BOUND)
+ .build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return TDengineSource.class;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
index 174ad55f91..53c794c274 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
@@ -18,13 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -49,7 +46,8 @@ class TDengineSinkWriterTest {
@BeforeEach
public void setup() {
SeaTunnelRowType rowType;
- Config config;
+
+ TDengineSinkConfig config;
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
String[] fieldNames = new String[] {"id", "name", "description",
"weight"};
SeaTunnelDataType<?>[] dataTypes =
@@ -61,14 +59,14 @@ class TDengineSinkWriterTest {
};
rowType = new SeaTunnelRowType(fieldNames, dataTypes);
config =
- ConfigFactory.empty()
- .withValue(
- "url",
ConfigValueFactory.fromAnyRef("jdbc:TAOS://localhost:6030/"))
- .withValue("database",
ConfigValueFactory.fromAnyRef("test_db"))
- .withValue("stable",
ConfigValueFactory.fromAnyRef("test_stable"))
- .withValue("username",
ConfigValueFactory.fromAnyRef("root"))
- .withValue("password",
ConfigValueFactory.fromAnyRef("taosdata"))
- .withValue("timezone",
ConfigValueFactory.fromAnyRef("UTC"));
+ TDengineSinkConfig.builder()
+ .url("jdbc:TAOS://localhost:6030/")
+ .database("test_db")
+ .stable("test_stable")
+ .username("root")
+ .password("taosdata")
+ .timezone("UTC")
+ .build();
// Mock JDBC objects
Connection mockConnection = Mockito.mock(Connection.class);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
index 7d91e32f54..6199da00aa 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -59,6 +59,8 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
private Connection connection1;
private Connection connection2;
private int testDataCount;
+ private final int testDataCountMulti_Table1 = 5;
+ private final int testDataCountMulti_Table2 = 7;
@BeforeAll
@Override
@@ -120,6 +122,18 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
"CREATE STABLE power2.meters2 (ts TIMESTAMP, current
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+ "TAGS (location BINARY(64), groupId INT)");
}
+ // create power2.meter3 for multi write test
+ try (Statement stmt = connection2.createStatement()) {
+ stmt.execute(
+ "CREATE STABLE power2.meters3 (ts TIMESTAMP, current
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+ + "TAGS (location BINARY(64), groupId INT)");
+ }
+ // create power2.meter4 for multi write test
+ try (Statement stmt = connection2.createStatement()) {
+ stmt.execute(
+ "CREATE STABLE power2.meters4 (ts TIMESTAMP, current
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+ + "TAGS (location BINARY(64), groupId INT)");
+ }
return rowCount;
}
@@ -129,15 +143,33 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
container.executeJob("/tdengine/tdengine_source_to_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
- long rowCountInserted = readSinkDataset();
+ long rowCountInserted = readSinkDataset("meters2");
Assertions.assertEquals(rowCountInserted, testDataCount);
}
+ @TestTemplate
+ public void testTDengineMultiWrite(TestContainer container) throws
Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/tdengine/tdengine_fake_to_sink_multitable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long rowCountInserted = readSinkDataset("meters3");
+ long rowCountInserted2 = readSinkDataset("meters4");
+ Assertions.assertEquals(rowCountInserted, testDataCountMulti_Table1);
+ Assertions.assertEquals(rowCountInserted2, testDataCountMulti_Table2);
+ }
+
@SneakyThrows
- private long readSinkDataset() {
+ private long readSinkDataset(String stableName) {
+ // Validate table name
+ if (stableName == null || !stableName.matches("^[a-zA-Z0-9_]+$")) {
+ throw new IllegalArgumentException("Invalid table name provided: "
+ stableName);
+ }
+
long rowCount;
+ String sql = String.format("SELECT COUNT(1) FROM power2.%s;",
stableName);
try (Statement stmt = connection2.createStatement();
- ResultSet resultSet = stmt.executeQuery("select count(1) from
power2.meters2;"); ) {
+ ResultSet resultSet = stmt.executeQuery(sql); ) {
resultSet.next();
rowCount = resultSet.getLong(1);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_fake_to_sink_multitable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_fake_to_sink_multitable.conf
new file mode 100644
index 0000000000..a47d4c722a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_fake_to_sink_multitable.conf
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ tables_configs = [
+ {
+ schema = {
+ table = "meters3"
+ fields {
+ device_id = "string"
+ event_time = "timestamp"
+ metric1 = "float"
+ metric2 = "int"
+ metric3 = "float"
+ status_flag = "boolean"
+ notes = "string"
+ location_tag = "string"
+ group_tag = "int"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["d2001", "2023-04-22T14:38:05", 10.3, 219, 0.31, true,
"nc", "California.SanFrancisco", 2]
+ },
+ {
+ kind = INSERT
+ fields = ["d2002", "2023-04-22T15:42:15", 11.8, 221, 0.28, false,
"nc", "California.LosAngeles", 3]
+ },
+ {
+ kind = INSERT
+ fields = ["d2003", "2023-04-22T16:15:30", 12.5, 220, 0.33, true,
"nc", "California.SanDiego", 2]
+ },
+ {
+ kind = INSERT
+ fields = ["d2004", "2023-04-22T17:20:45", 10.7, 218, 0.25, true,
"nc", "California.SanFrancisco", 3]
+ },
+ {
+ kind = INSERT
+ fields = ["d2001", "2023-04-22T18:30:10", 13.2, 222, 0.35, false,
"nc", "California.LosAngeles", 2]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "meters4"
+ fields {
+ device_id = "string"
+ event_time = "timestamp"
+ metric1 = "float"
+ metric2 = "int"
+ metric3 = "float"
+ status_flag = "boolean"
+ notes = "string"
+ location_tag = "string"
+ group_tag = "int"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["d1005", "2023-04-22T14:38:05", 110.3, 219, 0.31, true,
"nc", "California.SanFrancisco", 2]
+ },
+ {
+ kind = INSERT
+ fields = ["d1006", "2023-04-22T15:42:15", 211.8, 221, 0.28, false,
"nc", "California.LosAngeles", 3]
+ },
+ {
+ kind = INSERT
+ fields = ["d1007", "2023-04-22T16:15:30", 312.5, 220, 0.33, true,
"nc", "California.SanDiego", 2]
+ },
+ {
+ kind = INSERT
+ fields = ["d1008", "2023-04-22T17:20:45", 410.7, 218, 0.25, true,
"nc", "California.SanFrancisco", 3]
+ },
+ {
+ kind = INSERT
+ fields = ["d1005", "2023-04-22T18:30:10", 410.2, 410, 0.35, false,
"nc", "California.LosAngeles", 2]
+ },
+ {
+ kind = INSERT
+ fields = ["d1008", "2023-04-22T18:30:10", 533.2, 220, 0.35, false,
"nc", "California.LosAngeles", 3]
+ },
+ {
+ kind = INSERT
+ fields = ["d1007", "2023-04-22T18:30:10", 513.2, 222, 0.35, false,
"nc", "California.LosAngeles", 2]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ TDengine {
+ url: "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
+ username: "root"
+ password: "taosdata"
+ database: "power2"
+ stable: "${table_name}"
+ timezone: "UTC"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
index 75abdd73c0..1a900d8c8b 100644
---
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
+++
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
@@ -67,7 +67,6 @@ public class SeaTunnelConnectorTest extends TestSuiteBase
implements TestResourc
private static final Set<String> EXCLUDE_CONNECTOR =
new HashSet() {
{
- add("TDengine");
add("SelectDBCloud");
}
};