This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6ebbd47b2 [Bugfix][TDengine] Fix the degree of multiple parallelism 
affects driver loading (#6020)
b6ebbd47b2 is described below

commit b6ebbd47b274b2fe8efd720ddf4ca25f600bb865
Author: happyboy1024 <[email protected]>
AuthorDate: Mon Dec 18 13:34:15 2023 +0800

    [Bugfix][TDengine] Fix the degree of multiple parallelism affects driver 
loading (#6020)
    
    
    
    ---------
    
    Co-authored-by: dengjunjie <[email protected]>
---
 .../exception/TDengineConnectorErrorCode.java      | 42 ++++++++++++++++
 .../tdengine/sink/TDengineSinkWriter.java          |  4 ++
 .../seatunnel/tdengine/utils/TDengineUtil.java     | 58 ++++++++++++++++++++++
 .../tdengine/tdengine_source_to_sink.conf          |  5 +-
 4 files changed, 105 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java
new file mode 100644
index 0000000000..7d4c64e63b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode {
+    LOAD_DRIVER_FAILED("TDengine-01", "Fail to create driver of class");
+
+    private final String code;
+    private final String description;
+
+    TDengineConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index e341e26ed1..5c7b13c550 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -46,6 +46,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
+
 @Slf4j
 public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
 
@@ -66,6 +68,8 @@ public class TDengineSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                         config.getUsername(),
                         "&password=",
                         config.getPassword());
+        // check td driver whether exist and if not, try to register
+        checkDriverExist(jdbcUrl);
         conn = DriverManager.getConnection(jdbcUrl);
         try (Statement statement = conn.createStatement()) {
             final ResultSet metaResultSet =
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/utils/TDengineUtil.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/utils/TDengineUtil.java
new file mode 100644
index 0000000000..de6d2fe998
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/utils/TDengineUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.utils;
+
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+@Slf4j
+public class TDengineUtil {
+
+    public static synchronized void checkDriverExist(String jdbcUrl) {
+        try {
+            DriverManager.getDriver(jdbcUrl);
+        } catch (SQLException e) {
+            log.warn("no available driver found for this {}, waiting for it to 
load", jdbcUrl);
+        }
+
+        String driverName;
+        if (jdbcUrl.startsWith("jdbc:TAOS-RS://")) {
+            driverName = "com.taosdata.jdbc.rs.RestfulDriver";
+        } else {
+            driverName = "com.taosdata.jdbc.TSDBDriver";
+        }
+
+        try {
+            Class<?> clazz =
+                    Class.forName(driverName, true, 
Thread.currentThread().getContextClassLoader());
+            Driver driver = (Driver) 
clazz.getDeclaredConstructor().newInstance();
+            DriverManager.registerDriver(driver);
+        } catch (Exception ex) {
+            throw new TDengineConnectorException(
+                    TDengineConnectorErrorCode.LOAD_DRIVER_FAILED,
+                    "Fail to create driver of class " + driverName,
+                    ex);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
index 057c65ee2b..69e6e23ad9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
@@ -19,11 +19,8 @@
 ######
 
 env {
-  # You can set flink configuration here
-  execution.parallelism = 2
+  parallelism = 2
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
 source {

Reply via email to