This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 468e6e8a17 [Fix][Connector-V2] Fix JDBC Statement resource leaks in 
StarRocksCatalog (#10629)
468e6e8a17 is described below

commit 468e6e8a17cc177f75954fc70f7d18b3b95fd3fb
Author: ic4y <[email protected]>
AuthorDate: Fri Mar 20 20:19:00 2026 +0800

    [Fix][Connector-V2] Fix JDBC Statement resource leaks in StarRocksCatalog 
(#10629)
---
 .../starrocks/catalog/StarRocksCatalog.java        | 121 +++++++++++----------
 1 file changed, 65 insertions(+), 56 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 963bebe928..1b35a76c3a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -153,46 +153,47 @@ public class StarRocksCatalog implements Catalog {
             Optional<PrimaryKey> primaryKey =
                     getPrimaryKey(tablePath.getDatabaseName(), 
tablePath.getTableName());
 
-            PreparedStatement ps =
+            try (PreparedStatement ps =
                     conn.prepareStatement(
                             String.format(
                                     "SELECT * FROM %s WHERE 1 = 0;",
-                                    tablePath.getFullNameWithQuoted()));
-
-            ResultSetMetaData tableMetaData = ps.getMetaData();
-
-            TableSchema.Builder builder = TableSchema.builder();
-            buildColumnsWithErrorCheck(
-                    tablePath,
-                    builder,
-                    IntStream.range(1, tableMetaData.getColumnCount() + 
1).iterator(),
-                    i -> {
-                        try {
-                            SeaTunnelDataType<?> type = 
fromJdbcType(tableMetaData, i);
-                            // TODO add default value and test it
-                            return PhysicalColumn.of(
-                                    tableMetaData.getColumnName(i),
-                                    type,
-                                    tableMetaData.getColumnDisplaySize(i),
-                                    tableMetaData.isNullable(i) == 
ResultSetMetaData.columnNullable,
-                                    null,
-                                    tableMetaData.getColumnLabel(i));
-                        } catch (SQLException e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
-
-            primaryKey.ifPresent(builder::primaryKey);
-
-            TableIdentifier tableIdentifier =
-                    TableIdentifier.of(
-                            catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
-            return CatalogTable.of(
-                    tableIdentifier,
-                    builder.build(),
-                    buildConnectorOptions(tablePath),
-                    Collections.emptyList(),
-                    "");
+                                    tablePath.getFullNameWithQuoted()))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+
+                TableSchema.Builder builder = TableSchema.builder();
+                buildColumnsWithErrorCheck(
+                        tablePath,
+                        builder,
+                        IntStream.range(1, tableMetaData.getColumnCount() + 
1).iterator(),
+                        i -> {
+                            try {
+                                SeaTunnelDataType<?> type = 
fromJdbcType(tableMetaData, i);
+                                // TODO add default value and test it
+                                return PhysicalColumn.of(
+                                        tableMetaData.getColumnName(i),
+                                        type,
+                                        tableMetaData.getColumnDisplaySize(i),
+                                        tableMetaData.isNullable(i)
+                                                == 
ResultSetMetaData.columnNullable,
+                                        null,
+                                        tableMetaData.getColumnLabel(i));
+                            } catch (SQLException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+                primaryKey.ifPresent(builder::primaryKey);
+
+                TableIdentifier tableIdentifier =
+                        TableIdentifier.of(
+                                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
+                return CatalogTable.of(
+                        tableIdentifier,
+                        builder.build(),
+                        buildConnectorOptions(tablePath),
+                        Collections.emptyList(),
+                        "");
+            }
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed getting table %s", 
tablePath.getFullName()), e);
@@ -216,10 +217,11 @@ public class StarRocksCatalog implements Catalog {
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
         try {
-            conn.createStatement()
-                    .execute(
-                            StarRocksSaveModeUtil.INSTANCE.getDropTableSql(
-                                    tablePath, ignoreIfNotExists));
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute(
+                        StarRocksSaveModeUtil.INSTANCE.getDropTableSql(
+                                tablePath, ignoreIfNotExists));
+            }
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -230,8 +232,9 @@ public class StarRocksCatalog implements Catalog {
             throws TableNotExistException, CatalogException {
         try {
             if (ignoreIfNotExists) {
-                conn.createStatement()
-                        
.execute(StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
+                try (Statement stmt = conn.createStatement()) {
+                    
stmt.execute(StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
+                }
             }
         } catch (Exception e) {
             throw new CatalogException(
@@ -242,7 +245,9 @@ public class StarRocksCatalog implements Catalog {
 
     public void executeSql(TablePath tablePath, String sql) {
         try {
-            conn.createStatement().execute(sql);
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute(sql);
+            }
         } catch (Exception e) {
             throw new CatalogException(String.format("Failed EXECUTE SQL in 
catalog %s", sql), e);
         }
@@ -266,10 +271,11 @@ public class StarRocksCatalog implements Catalog {
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
         try {
-            conn.createStatement()
-                    .execute(
-                            
StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
-                                    tablePath.getDatabaseName(), 
ignoreIfExists));
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute(
+                        StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
+                                tablePath.getDatabaseName(), ignoreIfExists));
+            }
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -280,10 +286,11 @@ public class StarRocksCatalog implements Catalog {
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
         try {
-            conn.createStatement()
-                    .execute(
-                            StarRocksSaveModeUtil.INSTANCE.getDropDatabaseSql(
-                                    tablePath.getDatabaseName(), 
ignoreIfNotExists));
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute(
+                        StarRocksSaveModeUtil.INSTANCE.getDropDatabaseSql(
+                                tablePath.getDatabaseName(), 
ignoreIfNotExists));
+            }
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -371,7 +378,9 @@ public class StarRocksCatalog implements Catalog {
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         try {
             log.info("create table sql is :{}", sql);
-            conn.createStatement().execute(sql);
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute(sql);
+            }
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed create table in catalog %s, sql 
:[%s]", catalogName, sql),
@@ -449,9 +458,9 @@ public class StarRocksCatalog implements Catalog {
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) 
throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
-        try (ResultSet rs =
-                conn.createStatement()
-                        .executeQuery(
+        try (Statement stmt = conn.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
                                 String.format(
                                         "SELECT COLUMN_NAME FROM 
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND 
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
                                         schema, table))) {

Reply via email to