This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b6ebbd47b2 [Bugfix][TDengine] Fix the degree of multiple parallelism
affects driver loading (#6020)
b6ebbd47b2 is described below
commit b6ebbd47b274b2fe8efd720ddf4ca25f600bb865
Author: happyboy1024 <[email protected]>
AuthorDate: Mon Dec 18 13:34:15 2023 +0800
[Bugfix][TDengine] Fix the degree of multiple parallelism affects driver
loading (#6020)
---------
Co-authored-by: dengjunjie <[email protected]>
---
.../exception/TDengineConnectorErrorCode.java | 42 ++++++++++++++++
.../tdengine/sink/TDengineSinkWriter.java | 4 ++
.../seatunnel/tdengine/utils/TDengineUtil.java | 58 ++++++++++++++++++++++
.../tdengine/tdengine_source_to_sink.conf | 5 +-
4 files changed, 105 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java
new file mode 100644
index 0000000000..7d4c64e63b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode {
+ LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class");
+
+ private final String code;
+ private final String description;
+
+ TDengineConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index e341e26ed1..5c7b13c550 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -46,6 +46,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
+import static
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
+
@Slf4j
public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
{
@@ -66,6 +68,8 @@ public class TDengineSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
config.getUsername(),
"&password=",
config.getPassword());
+ // check td driver whether exist and if not, try to register
+ checkDriverExist(jdbcUrl);
conn = DriverManager.getConnection(jdbcUrl);
try (Statement statement = conn.createStatement()) {
final ResultSet metaResultSet =
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/utils/TDengineUtil.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/utils/TDengineUtil.java
new file mode 100644
index 0000000000..de6d2fe998
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/utils/TDengineUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.utils;
+
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+@Slf4j
+public class TDengineUtil {
+
+ public static synchronized void checkDriverExist(String jdbcUrl) {
+ try {
+ DriverManager.getDriver(jdbcUrl);
+ } catch (SQLException e) {
+ log.warn("no available driver found for this {}, waiting for it to
load", jdbcUrl);
+ }
+
+ String driverName;
+ if (jdbcUrl.startsWith("jdbc:TAOS-RS://")) {
+ driverName = "com.taosdata.jdbc.rs.RestfulDriver";
+ } else {
+ driverName = "com.taosdata.jdbc.TSDBDriver";
+ }
+
+ try {
+ Class<?> clazz =
+ Class.forName(driverName, true,
Thread.currentThread().getContextClassLoader());
+ Driver driver = (Driver)
clazz.getDeclaredConstructor().newInstance();
+ DriverManager.registerDriver(driver);
+ } catch (Exception ex) {
+ throw new TDengineConnectorException(
+ TDengineConnectorErrorCode.LOAD_DRIVER_FAILED,
+ "Fail to create driver of class " + driverName,
+ ex);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
index 057c65ee2b..69e6e23ad9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
@@ -19,11 +19,8 @@
######
env {
- # You can set flink configuration here
- execution.parallelism = 2
+ parallelism = 2
job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {