This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f9f12d4 [Improve]Release JDBC connection resources (#474)
1f9f12d4 is described below

commit 1f9f12d4fb0b605d3873cb2865f093aae1ab785e
Author: wudongliang <[email protected]>
AuthorDate: Fri Aug 30 18:14:28 2024 +0800

    [Improve]Release JDBC connection resources (#474)
---
 .../doris/flink/catalog/doris/DorisSystem.java     | 32 ++++++++++++----------
 .../connection/SimpleJdbcConnectionProvider.java   |  2 +-
 2 files changed, 19 insertions(+), 15 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 427eb8b3..9f7ed8d5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Statement;
@@ -103,10 +104,11 @@ public class DorisSystem implements Serializable {
     }
 
     public void execute(String sql) {
-        try (Statement statement =
-                
jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
+        try (Connection connection = 
jdbcConnectionProvider.getOrEstablishConnection();
+                Statement statement = connection.createStatement()) {
             statement.execute(sql);
         } catch (Exception e) {
+            LOG.error("SQL query could not be executed: {}", sql, e);
             throw new DorisSystemException(
                     String.format("SQL query could not be executed: %s", sql), 
e);
         }
@@ -116,18 +118,19 @@ public class DorisSystem implements Serializable {
             String sql, int columnIndex, Predicate<String> filterFunc, 
Object... params) {
 
         List<String> columnValues = Lists.newArrayList();
-        try (PreparedStatement ps =
-                
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+        try (Connection connection = 
jdbcConnectionProvider.getOrEstablishConnection();
+                PreparedStatement ps = connection.prepareStatement(sql)) {
             if (Objects.nonNull(params) && params.length > 0) {
                 for (int i = 0; i < params.length; i++) {
                     ps.setObject(i + 1, params[i]);
                 }
             }
-            ResultSet rs = ps.executeQuery();
-            while (rs.next()) {
-                String columnValue = rs.getString(columnIndex);
-                if (Objects.isNull(filterFunc) || 
filterFunc.test(columnValue)) {
-                    columnValues.add(columnValue);
+            try (ResultSet rs = ps.executeQuery()) {
+                while (rs.next()) {
+                    String columnValue = rs.getString(columnIndex);
+                    if (filterFunc == null || filterFunc.test(columnValue)) {
+                        columnValues.add(columnValue);
+                    }
                 }
             }
             return columnValues;
@@ -152,16 +155,17 @@ public class DorisSystem implements Serializable {
                         databaseName, tableName);
 
         Map<String, String> columnValues = new HashMap<>();
-        try (PreparedStatement ps =
-                
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
-            ResultSet rs = ps.executeQuery();
+        try (Connection connection = 
jdbcConnectionProvider.getOrEstablishConnection();
+                PreparedStatement ps = connection.prepareStatement(sql);
+                ResultSet rs = ps.executeQuery()) {
             while (rs.next()) {
-                String filedName = rs.getString(1);
+                String fieldName = rs.getString(1);
                 String datatype = rs.getString(2);
-                columnValues.put(filedName, datatype);
+                columnValues.put(fieldName, datatype);
             }
             return columnValues;
         } catch (Exception e) {
+            LOG.error("SQL query could not be executed: {}", sql, e);
             throw new DorisSystemException(
                     String.format("The following SQL query could not be 
executed: %s", sql), e);
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
index 68a92310..03156847 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -42,7 +42,7 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
 
     @Override
     public Connection getOrEstablishConnection() throws 
ClassNotFoundException, SQLException {
-        if (connection != null && !connection.isClosed() && 
connection.isValid(10000)) {
+        if (connection != null && !connection.isClosed() && 
connection.isValid(10)) {
             return connection;
         }
         try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to