This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 98b593f095 [Feature][Connector-V2] Support multi-table sink feature 
for TDengine (#9215)
98b593f095 is described below

commit 98b593f095b049a6c115a57d375df0beeb68fc93
Author: jiazhang <[email protected]>
AuthorDate: Wed Apr 23 20:32:12 2025 +0800

    [Feature][Connector-V2] Support multi-table sink feature for TDengine 
(#9215)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   2 -
 .../tdengine/config/TDengineCommonOptions.java     |  56 +++++++++
 .../tdengine/config/TDengineSinkConfig.java        |  56 +++++++++
 .../tdengine/config/TDengineSinkOptions.java       |  35 ++++++
 .../tdengine/config/TDengineSourceOptions.java     |  41 +++++++
 .../seatunnel/tdengine/sink/TDengineSink.java      |  36 +++---
 .../tdengine/sink/TDengineSinkFactory.java         |  58 +++++++++
 .../tdengine/sink/TDengineSinkWriter.java          |  14 ++-
 .../tdengine/source/TDengineSourceFactory.java     |  54 +++++++++
 .../tdengine/sink/TDengineSinkWriterTest.java      |  24 ++--
 .../e2e/connector/tdengine/TDengineIT.java         |  38 +++++-
 .../tdengine/tdengine_fake_to_sink_multitable.conf | 130 +++++++++++++++++++++
 .../starter/seatunnel/SeaTunnelConnectorTest.java  |   1 -
 13 files changed, 499 insertions(+), 46 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 24566951d3..9da7933f7a 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,7 +195,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("PulsarSinkOptions");
         whiteList.add("SlsSinkOptions");
         whiteList.add("Neo4jSinkOptions");
-        whiteList.add("TDengineSourceOptions");
         whiteList.add("PulsarSourceOptions");
         whiteList.add("MongodbSinkOptions");
         whiteList.add("SlsSourceOptions");
@@ -207,7 +206,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("RocketMqSourceOptions");
         whiteList.add("TablestoreSinkOptions");
         whiteList.add("TableStoreDBSourceOptions");
-        whiteList.add("TDengineSinkOptions");
         whiteList.add("Neo4jSourceOptions");
         whiteList.add("QdrantSourceOptions");
         whiteList.add("SocketSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
new file mode 100644
index 0000000000..2f9ae2d051
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Data;
+
+@Data
+public abstract class TDengineCommonOptions {
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The TDengine server URL, format: 
jdbc:TAOS-RS://host:port");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The username for TDengine 
authentication");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The password for TDengine 
authentication");
+
+    public static final Option<String> DATABASE =
+            Options.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The TDengine database name");
+
+    public static final Option<String> STABLE =
+            Options.key("stable")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The TDengine super table name");
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
new file mode 100644
index 0000000000..b2892c61d6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+@Data
+@Builder(builderClassName = "Builder")
+public class TDengineSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String url;
+    private String username;
+    private String password;
+    private String database;
+    private String stable;
+    private String timezone;
+
+    public static TDengineSinkConfig of(ReadonlyConfig config) {
+        Builder builder = TDengineSinkConfig.builder();
+
+        builder.url(config.get(TDengineSinkOptions.URL));
+        builder.username(config.get(TDengineSinkOptions.USERNAME));
+        builder.password(config.get(TDengineSinkOptions.PASSWORD));
+        builder.database(config.get(TDengineSinkOptions.DATABASE));
+        builder.stable(config.get(TDengineSinkOptions.STABLE));
+
+        Optional<String> optionalTimezone = 
config.getOptional(TDengineSinkOptions.TIMEZONE);
+
+        
builder.timezone(optionalTimezone.orElseGet(TDengineSinkOptions.TIMEZONE::defaultValue));
+
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
new file mode 100644
index 0000000000..7f64177656
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class TDengineSinkOptions extends TDengineCommonOptions {
+
+    public static final Option<String> TIMEZONE =
+            Options.key("timezone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The timezone used for timestamp 
conversion, default is UTC");
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
new file mode 100644
index 0000000000..58ce50f92f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class TDengineSourceOptions extends TDengineCommonOptions {
+
+    public static final Option<String> LOWER_BOUND =
+            Options.key("lowerBound")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The lower bound for data query range");
+
+    public static final Option<String> UPPER_BOUND =
+            Options.key("upperBound")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The upper bound for data query range");
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
index 194c94dda5..ec7319e95d 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -17,40 +17,34 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-
-import com.google.auto.service.AutoService;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
 
 import java.io.IOException;
 import java.util.Optional;
 
-@AutoService(SeaTunnelSink.class)
-public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-    private SeaTunnelRowType seaTunnelRowType;
-    private Config pluginConfig;
+public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
+    private final TDengineSinkConfig tdengineSinkConfig;
+    private final CatalogTable catalogTable;
 
-    @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
-            throws IOException {
-        return new TDengineSinkWriter(pluginConfig, seaTunnelRowType);
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    public TDengineSink(TDengineSinkConfig tdengineSinkConfig, CatalogTable 
catalogTable) {
+        this.tdengineSinkConfig = tdengineSinkConfig;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
 
     @Override
-    public void prepare(Config pluginConfig) {
-        this.pluginConfig = pluginConfig;
+    public TDengineSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+        return new TDengineSinkWriter(tdengineSinkConfig, seaTunnelRowType);
     }
 
     @Override
@@ -60,6 +54,6 @@ public class TDengineSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkFactory.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkFactory.java
new file mode 100644
index 0000000000..976f5a8728
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class TDengineSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "TDengine";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        TDengineSinkOptions.URL,
+                        TDengineSinkOptions.USERNAME,
+                        TDengineSinkOptions.PASSWORD,
+                        TDengineSinkOptions.DATABASE,
+                        TDengineSinkOptions.STABLE)
+                .optional(
+                        TDengineSinkOptions.TIMEZONE,
+                        SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        TDengineSinkConfig tdengineSinkConfig = 
TDengineSinkConfig.of(context.getOptions());
+        return () -> new TDengineSink(tdengineSinkConfig, 
context.getCatalogTable());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index ba352357f2..4137becdb9 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -20,13 +20,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
 import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.seatunnel.shade.com.google.common.base.Throwables;
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -50,17 +50,19 @@ import java.util.Objects;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
 
 @Slf4j
-public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
 
     private static final DateTimeFormatter FORMATTER =
             DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
     private final Connection conn;
-    private final TDengineSourceConfig config;
+
+    private final TDengineSinkConfig config;
     private int tagsNum;
 
     @SneakyThrows
-    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType 
seaTunnelRowType) {
-        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+    public TDengineSinkWriter(TDengineSinkConfig config, SeaTunnelRowType 
seaTunnelRowType) {
+        this.config = config;
         String jdbcUrl =
                 StringUtils.join(
                         config.getUrl(),
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
new file mode 100644
index 0000000000..921e4d0d70
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.LOWER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.STABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.UPPER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceOptions.USERNAME;
+
+@AutoService(Factory.class)
+public class TDengineSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "TDengine";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(URL, USERNAME, PASSWORD, DATABASE, STABLE, 
LOWER_BOUND, UPPER_BOUND)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return TDengineSource.class;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
index 174ad55f91..53c794c274 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriterTest.java
@@ -18,13 +18,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSinkConfig;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -49,7 +46,8 @@ class TDengineSinkWriterTest {
     @BeforeEach
     public void setup() {
         SeaTunnelRowType rowType;
-        Config config;
+
+        TDengineSinkConfig config;
         TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
         String[] fieldNames = new String[] {"id", "name", "description", 
"weight"};
         SeaTunnelDataType<?>[] dataTypes =
@@ -61,14 +59,14 @@ class TDengineSinkWriterTest {
                 };
         rowType = new SeaTunnelRowType(fieldNames, dataTypes);
         config =
-                ConfigFactory.empty()
-                        .withValue(
-                                "url", 
ConfigValueFactory.fromAnyRef("jdbc:TAOS://localhost:6030/"))
-                        .withValue("database", 
ConfigValueFactory.fromAnyRef("test_db"))
-                        .withValue("stable", 
ConfigValueFactory.fromAnyRef("test_stable"))
-                        .withValue("username", 
ConfigValueFactory.fromAnyRef("root"))
-                        .withValue("password", 
ConfigValueFactory.fromAnyRef("taosdata"))
-                        .withValue("timezone", 
ConfigValueFactory.fromAnyRef("UTC"));
+                TDengineSinkConfig.builder()
+                        .url("jdbc:TAOS://localhost:6030/")
+                        .database("test_db")
+                        .stable("test_stable")
+                        .username("root")
+                        .password("taosdata")
+                        .timezone("UTC")
+                        .build();
 
         // Mock JDBC objects
         Connection mockConnection = Mockito.mock(Connection.class);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
index 7d91e32f54..6199da00aa 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -59,6 +59,8 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
     private Connection connection1;
     private Connection connection2;
     private int testDataCount;
+    private final int testDataCountMulti_Table1 = 5;
+    private final int testDataCountMulti_Table2 = 7;
 
     @BeforeAll
     @Override
@@ -120,6 +122,18 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
                     "CREATE STABLE power2.meters2 (ts TIMESTAMP, current 
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
                             + "TAGS (location BINARY(64), groupId INT)");
         }
+        // create power2.meter3 for multi write test
+        try (Statement stmt = connection2.createStatement()) {
+            stmt.execute(
+                    "CREATE STABLE power2.meters3 (ts TIMESTAMP, current 
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+                            + "TAGS (location BINARY(64), groupId INT)");
+        }
+        // create power2.meter4 for multi write test
+        try (Statement stmt = connection2.createStatement()) {
+            stmt.execute(
+                    "CREATE STABLE power2.meters4 (ts TIMESTAMP, current 
FLOAT, voltage INT, phase FLOAT, off BOOL, nc NCHAR(10)) "
+                            + "TAGS (location BINARY(64), groupId INT)");
+        }
         return rowCount;
     }
 
@@ -129,15 +143,33 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
                 container.executeJob("/tdengine/tdengine_source_to_sink.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
 
-        long rowCountInserted = readSinkDataset();
+        long rowCountInserted = readSinkDataset("meters2");
         Assertions.assertEquals(rowCountInserted, testDataCount);
     }
 
+    @TestTemplate
+    public void testTDengineMultiWrite(TestContainer container) throws 
Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/tdengine/tdengine_fake_to_sink_multitable.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        long rowCountInserted = readSinkDataset("meters3");
+        long rowCountInserted2 = readSinkDataset("meters4");
+        Assertions.assertEquals(rowCountInserted, testDataCountMulti_Table1);
+        Assertions.assertEquals(rowCountInserted2, testDataCountMulti_Table2);
+    }
+
     @SneakyThrows
-    private long readSinkDataset() {
+    private long readSinkDataset(String stableName) {
+        // Validate table name
+        if (stableName == null || !stableName.matches("^[a-zA-Z0-9_]+$")) {
+            throw new IllegalArgumentException("Invalid table name provided: " 
+ stableName);
+        }
+
         long rowCount;
+        String sql = String.format("SELECT COUNT(1) FROM power2.%s;", 
stableName);
         try (Statement stmt = connection2.createStatement();
-                ResultSet resultSet = stmt.executeQuery("select count(1) from 
power2.meters2;"); ) {
+                ResultSet resultSet = stmt.executeQuery(sql); ) {
             resultSet.next();
             rowCount = resultSet.getLong(1);
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_fake_to_sink_multitable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_fake_to_sink_multitable.conf
new file mode 100644
index 0000000000..a47d4c722a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_fake_to_sink_multitable.conf
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    tables_configs = [
+      {
+        schema = {
+          table = "meters3"
+          fields {
+            device_id = "string"
+            event_time = "timestamp"
+            metric1 = "float"
+            metric2 = "int"
+            metric3 = "float"
+            status_flag = "boolean"
+            notes = "string"
+            location_tag = "string"
+            group_tag = "int"
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = ["d2001", "2023-04-22T14:38:05", 10.3, 219, 0.31, true, 
"nc", "California.SanFrancisco", 2]
+          },
+          {
+            kind = INSERT
+            fields = ["d2002", "2023-04-22T15:42:15", 11.8, 221, 0.28, false, 
"nc", "California.LosAngeles", 3]
+          },
+          {
+            kind = INSERT
+            fields = ["d2003", "2023-04-22T16:15:30", 12.5, 220, 0.33, true, 
"nc", "California.SanDiego", 2]
+          },
+          {
+            kind = INSERT
+            fields = ["d2004", "2023-04-22T17:20:45", 10.7, 218, 0.25, true, 
"nc", "California.SanFrancisco", 3]
+          },
+          {
+            kind = INSERT
+            fields = ["d2001", "2023-04-22T18:30:10", 13.2, 222, 0.35, false, 
"nc", "California.LosAngeles", 2]
+          }
+        ]
+      },
+      {                  
+        schema = {
+          table = "meters4"
+          fields {
+            device_id = "string"
+            event_time = "timestamp"
+            metric1 = "float"
+            metric2 = "int"
+            metric3 = "float"
+            status_flag = "boolean"
+            notes = "string"
+            location_tag = "string"
+            group_tag = "int"
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = ["d1005", "2023-04-22T14:38:05", 110.3, 219, 0.31, true, 
"nc", "California.SanFrancisco", 2]
+          },
+          {
+            kind = INSERT
+            fields = ["d1006", "2023-04-22T15:42:15", 211.8, 221, 0.28, false, 
"nc", "California.LosAngeles", 3]
+          },
+          {
+            kind = INSERT
+            fields = ["d1007", "2023-04-22T16:15:30", 312.5, 220, 0.33, true, 
"nc", "California.SanDiego", 2]
+          },
+          {
+            kind = INSERT
+            fields = ["d1008", "2023-04-22T17:20:45", 410.7, 218, 0.25, true, 
"nc", "California.SanFrancisco", 3]
+          },
+          {
+            kind = INSERT
+            fields = ["d1005", "2023-04-22T18:30:10", 410.2, 410, 0.35, false, 
"nc", "California.LosAngeles", 2]
+          },
+          {
+            kind = INSERT
+            fields = ["d1008", "2023-04-22T18:30:10", 533.2, 220, 0.35, false, 
"nc", "California.LosAngeles", 3]
+          },
+          {
+            kind = INSERT
+            fields = ["d1007", "2023-04-22T18:30:10", 513.2, 222, 0.35, false, 
"nc", "California.LosAngeles", 2]
+          }
+        ]
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  TDengine {
+    url: "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
+    username: "root"
+    password: "taosdata"
+    database: "power2"
+    stable: "${table_name}"
+    timezone: "UTC"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
 
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
index 75abdd73c0..1a900d8c8b 100644
--- 
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
+++ 
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
@@ -67,7 +67,6 @@ public class SeaTunnelConnectorTest extends TestSuiteBase 
implements TestResourc
     private static final Set<String> EXCLUDE_CONNECTOR =
             new HashSet() {
                 {
-                    add("TDengine");
                     add("SelectDBCloud");
                 }
             };


Reply via email to