This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 468e6e8a17 [Fix][Connector-V2] Fix JDBC Statement resource leaks in
StarRocksCatalog (#10629)
468e6e8a17 is described below
commit 468e6e8a17cc177f75954fc70f7d18b3b95fd3fb
Author: ic4y <[email protected]>
AuthorDate: Fri Mar 20 20:19:00 2026 +0800
[Fix][Connector-V2] Fix JDBC Statement resource leaks in StarRocksCatalog
(#10629)
---
.../starrocks/catalog/StarRocksCatalog.java | 121 +++++++++++----------
1 file changed, 65 insertions(+), 56 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 963bebe928..1b35a76c3a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -153,46 +153,47 @@ public class StarRocksCatalog implements Catalog {
Optional<PrimaryKey> primaryKey =
getPrimaryKey(tablePath.getDatabaseName(),
tablePath.getTableName());
- PreparedStatement ps =
+ try (PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT * FROM %s WHERE 1 = 0;",
- tablePath.getFullNameWithQuoted()));
-
- ResultSetMetaData tableMetaData = ps.getMetaData();
-
- TableSchema.Builder builder = TableSchema.builder();
- buildColumnsWithErrorCheck(
- tablePath,
- builder,
- IntStream.range(1, tableMetaData.getColumnCount() +
1).iterator(),
- i -> {
- try {
- SeaTunnelDataType<?> type =
fromJdbcType(tableMetaData, i);
- // TODO add default value and test it
- return PhysicalColumn.of(
- tableMetaData.getColumnName(i),
- type,
- tableMetaData.getColumnDisplaySize(i),
- tableMetaData.isNullable(i) ==
ResultSetMetaData.columnNullable,
- null,
- tableMetaData.getColumnLabel(i));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- });
-
- primaryKey.ifPresent(builder::primaryKey);
-
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "");
+ tablePath.getFullNameWithQuoted()))) {
+ ResultSetMetaData tableMetaData = ps.getMetaData();
+
+ TableSchema.Builder builder = TableSchema.builder();
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ IntStream.range(1, tableMetaData.getColumnCount() +
1).iterator(),
+ i -> {
+ try {
+ SeaTunnelDataType<?> type =
fromJdbcType(tableMetaData, i);
+ // TODO add default value and test it
+ return PhysicalColumn.of(
+ tableMetaData.getColumnName(i),
+ type,
+ tableMetaData.getColumnDisplaySize(i),
+ tableMetaData.isNullable(i)
+ ==
ResultSetMetaData.columnNullable,
+ null,
+ tableMetaData.getColumnLabel(i));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ primaryKey.ifPresent(builder::primaryKey);
+
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
+ return CatalogTable.of(
+ tableIdentifier,
+ builder.build(),
+ buildConnectorOptions(tablePath),
+ Collections.emptyList(),
+ "");
+ }
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s",
tablePath.getFullName()), e);
@@ -216,10 +217,11 @@ public class StarRocksCatalog implements Catalog {
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
- conn.createStatement()
- .execute(
- StarRocksSaveModeUtil.INSTANCE.getDropTableSql(
- tablePath, ignoreIfNotExists));
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ StarRocksSaveModeUtil.INSTANCE.getDropTableSql(
+ tablePath, ignoreIfNotExists));
+ }
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
@@ -230,8 +232,9 @@ public class StarRocksCatalog implements Catalog {
throws TableNotExistException, CatalogException {
try {
if (ignoreIfNotExists) {
- conn.createStatement()
-
.execute(StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
+ try (Statement stmt = conn.createStatement()) {
+
stmt.execute(StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
+ }
}
} catch (Exception e) {
throw new CatalogException(
@@ -242,7 +245,9 @@ public class StarRocksCatalog implements Catalog {
public void executeSql(TablePath tablePath, String sql) {
try {
- conn.createStatement().execute(sql);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
} catch (Exception e) {
throw new CatalogException(String.format("Failed EXECUTE SQL in
catalog %s", sql), e);
}
@@ -266,10 +271,11 @@ public class StarRocksCatalog implements Catalog {
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
try {
- conn.createStatement()
- .execute(
-
StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
- tablePath.getDatabaseName(),
ignoreIfExists));
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
+ tablePath.getDatabaseName(), ignoreIfExists));
+ }
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
@@ -280,10 +286,11 @@ public class StarRocksCatalog implements Catalog {
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
try {
- conn.createStatement()
- .execute(
- StarRocksSaveModeUtil.INSTANCE.getDropDatabaseSql(
- tablePath.getDatabaseName(),
ignoreIfNotExists));
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ StarRocksSaveModeUtil.INSTANCE.getDropDatabaseSql(
+ tablePath.getDatabaseName(),
ignoreIfNotExists));
+ }
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
@@ -371,7 +378,9 @@ public class StarRocksCatalog implements Catalog {
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
try {
log.info("create table sql is :{}", sql);
- conn.createStatement().execute(sql);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
} catch (Exception e) {
throw new CatalogException(
String.format("Failed create table in catalog %s, sql
:[%s]", catalogName, sql),
@@ -449,9 +458,9 @@ public class StarRocksCatalog implements Catalog {
protected Optional<PrimaryKey> getPrimaryKey(String schema, String table)
throws SQLException {
List<String> pkFields = new ArrayList<>();
- try (ResultSet rs =
- conn.createStatement()
- .executeQuery(
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
String.format(
"SELECT COLUMN_NAME FROM
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
schema, table))) {