This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 853e973212 [Improve][Connector-V2] Close all ResultSet after used
(#7389)
853e973212 is described below
commit 853e973212956751250d8b1212f413cfcf3ca0fd
Author: Jia Fan <[email protected]>
AuthorDate: Fri Aug 16 15:41:59 2024 +0800
[Improve][Connector-V2] Close all ResultSet after used (#7389)
* [Improve][Connector-V2] Close all ResultSet after used
* update
---
.../cdc/base/dialect/JdbcDataSourceDialect.java | 93 +++++++++++-----------
.../sink/client/ClickhouseSinkWriter.java | 5 +-
.../connectors/doris/catalog/DorisCatalog.java | 80 ++++++++++---------
.../jdbc/catalog/AbstractJdbcCatalog.java | 9 ++-
.../catalog/oceanbase/OceanBaseMySqlCatalog.java | 10 ++-
.../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 88 ++++++++++----------
.../jdbc/catalog/utils/JdbcColumnConverter.java | 34 +++++---
.../jdbc/internal/dialect/hive/HiveDialect.java | 7 +-
.../jdbc/internal/dialect/iris/IrisDialect.java | 6 +-
.../dialect/tablestore/TablestoreDialect.java | 6 +-
.../jdbc/internal/dialect/xugu/XuguDialect.java | 6 +-
.../starrocks/catalog/StarRocksCatalog.java | 39 +++++----
.../tdengine/sink/TDengineSinkWriter.java | 9 ++-
.../src/test/java/mongodb/MongodbCDCIT.java | 5 +-
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 6 +-
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 5 +-
.../seatunnel/clickhouse/ClickhouseIT.java | 59 +++++++-------
.../clickhouse/ClickhouseSinkCDCChangelogIT.java | 8 +-
.../e2e/connector/doris/AbstractDorisIT.java | 5 +-
.../e2e/connector/doris/DorisCDCSinkIT.java | 8 +-
.../seatunnel/e2e/connector/doris/DorisIT.java | 15 ++--
.../seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 5 +-
.../seatunnel/jdbc/JdbcOceanBaseITBase.java | 9 +--
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 4 +-
.../seatunnel/jdbc/JdbcSinkCDCChangelogIT.java | 4 +-
.../seatunnel/jdbc/JdbcMySqlCreateTableIT.java | 15 ++--
.../seatunnel/jdbc/JdbcSqlServerCreateTableIT.java | 15 ++--
.../connectors/seatunnel/jdbc/JdbcDorisIT.java | 6 +-
.../connectors/seatunnel/jdbc/JdbcDorisdbIT.java | 6 +-
.../connectors/seatunnel/jdbc/JdbcIrisIT.java | 8 +-
.../e2e/connector/kafka/KafkaFormatIT.java | 7 +-
.../e2e/connector/pulsar/CanalToPulsarIT.java | 5 +-
.../connector/starrocks/StarRocksCDCSinkIT.java | 4 +-
.../e2e/connector/starrocks/StarRocksIT.java | 6 +-
.../e2e/connector/tdengine/TDengineIT.java | 4 +-
35 files changed, 326 insertions(+), 275 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 80c9c81cca..05e9a89c04 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -77,22 +77,24 @@ public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfi
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
+ // seq -> column name
+ List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
+ String pkName = null;
+
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
- ResultSet rs =
- metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(),
tableId.table());
- // seq -> column name
- List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
- String pkName = null;
- while (rs.next()) {
- // all the PK_NAME should be the same
- pkName = rs.getString("PK_NAME");
- String columnName = rs.getString("COLUMN_NAME");
- int keySeq = rs.getInt("KEY_SEQ");
- // KEY_SEQ is 1-based index
- primaryKeyColumns.add(Pair.of(keySeq, columnName));
+ try (ResultSet rs =
+ metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(),
tableId.table())) {
+ while (rs.next()) {
+ // all the PK_NAME should be the same
+ pkName = rs.getString("PK_NAME");
+ String columnName = rs.getString("COLUMN_NAME");
+ int keySeq = rs.getInt("KEY_SEQ");
+ // KEY_SEQ is 1-based index
+ primaryKeyColumns.add(Pair.of(keySeq, columnName));
+ }
}
// initialize size
List<String> pkFields =
@@ -121,41 +123,42 @@ public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfi
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
- ResultSet resultSet =
+ try (ResultSet resultSet =
metaData.getIndexInfo(
- tableId.catalog(), tableId.schema(), tableId.table(),
false, false);
- // index name -> index
- Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
- while (resultSet.next()) {
- String columnName = resultSet.getString("COLUMN_NAME");
- if (columnName == null) {
- continue;
+ tableId.catalog(), tableId.schema(), tableId.table(),
false, false)) {
+ // index name -> index
+ Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ if (columnName == null) {
+ continue;
+ }
+
+ String indexName = resultSet.getString("INDEX_NAME");
+ boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+
+ ConstraintKey constraintKey =
+ constraintKeyMap.computeIfAbsent(
+ indexName,
+ s -> {
+ ConstraintKey.ConstraintType
constraintType =
+
ConstraintKey.ConstraintType.INDEX_KEY;
+ if (!noUnique) {
+ constraintType =
ConstraintKey.ConstraintType.UNIQUE_KEY;
+ }
+ return ConstraintKey.of(
+ constraintType, indexName, new
ArrayList<>());
+ });
+
+ ConstraintKey.ColumnSortType sortType =
+ "A".equals(resultSet.getString("ASC_OR_DESC"))
+ ? ConstraintKey.ColumnSortType.ASC
+ : ConstraintKey.ColumnSortType.DESC;
+ ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+ new ConstraintKey.ConstraintKeyColumn(columnName,
sortType);
+ constraintKey.getColumnNames().add(constraintKeyColumn);
}
-
- String indexName = resultSet.getString("INDEX_NAME");
- boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
-
- ConstraintKey constraintKey =
- constraintKeyMap.computeIfAbsent(
- indexName,
- s -> {
- ConstraintKey.ConstraintType constraintType =
- ConstraintKey.ConstraintType.INDEX_KEY;
- if (!noUnique) {
- constraintType =
ConstraintKey.ConstraintType.UNIQUE_KEY;
- }
- return ConstraintKey.of(
- constraintType, indexName, new
ArrayList<>());
- });
-
- ConstraintKey.ColumnSortType sortType =
- "A".equals(resultSet.getString("ASC_OR_DESC"))
- ? ConstraintKey.ColumnSortType.ASC
- : ConstraintKey.ColumnSortType.DESC;
- ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
- new ConstraintKey.ConstraintKeyColumn(columnName,
sortType);
- constraintKey.getColumnNames().add(constraintKeyColumn);
+ return new ArrayList<>(constraintKeyMap.values());
}
- return new ArrayList<>(constraintKeyMap.values());
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 74119d5b46..b5f1505d11 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -210,8 +210,9 @@ public class ClickhouseSinkWriter
return false;
}
String configKey = "allow_experimental_lightweight_delete";
- try (Statement stmt = clickhouseConnection.createStatement()) {
- ResultSet resultSet = stmt.executeQuery("SHOW SETTINGS ILIKE '%" +
configKey + "%'");
+ try (Statement stmt = clickhouseConnection.createStatement();
+ ResultSet resultSet =
+ stmt.executeQuery("SHOW SETTINGS ILIKE '%" + configKey
+ "%'")) {
while (resultSet.next()) {
String name = resultSet.getString("name");
if (name.equalsIgnoreCase(configKey)) {
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index e4d5aea5d6..146d364652 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -148,8 +148,9 @@ public class DorisCatalog implements Catalog {
private String getDorisVersion() throws SQLException {
String dorisVersion = null;
try (PreparedStatement preparedStatement =
-
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY)) {
- ResultSet resultSet = preparedStatement.executeQuery();
+
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+
while (resultSet.next()) {
dorisVersion = resultSet.getString(2);
}
@@ -180,8 +181,9 @@ public class DorisCatalog implements Catalog {
public boolean databaseExists(String databaseName) throws CatalogException
{
try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.DATABASE_QUERY)) {
ps.setString(1, databaseName);
- ResultSet rs = ps.executeQuery();
- return rs.next();
+ try (ResultSet rs = ps.executeQuery()) {
+ return rs.next();
+ }
} catch (SQLException e) {
throw new CatalogException("check database exists failed", e);
}
@@ -190,8 +192,8 @@ public class DorisCatalog implements Catalog {
@Override
public List<String> listDatabases() throws CatalogException {
List<String> databases = new ArrayList<>();
- try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY)) {
- ResultSet rs = ps.executeQuery();
+ try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY);
+ ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String database = rs.getString(1);
databases.add(database);
@@ -210,10 +212,11 @@ public class DorisCatalog implements Catalog {
try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_DATABASE_QUERY)) {
ps.setString(1, databaseName);
- ResultSet rs = ps.executeQuery();
- while (rs.next()) {
- String table = rs.getString(1);
- tables.add(table);
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ String table = rs.getString(1);
+ tables.add(table);
+ }
}
} catch (SQLException e) {
throw new CatalogException(
@@ -229,8 +232,9 @@ public class DorisCatalog implements Catalog {
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_IDENTIFIER_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
- ResultSet rs = ps.executeQuery();
- return rs.next();
+ try (ResultSet rs = ps.executeQuery()) {
+ return rs.next();
+ }
} catch (SQLException e) {
throw new CatalogException(
String.format("check table [%s] exists failed",
tablePath.getFullName()), e);
@@ -248,18 +252,19 @@ public class DorisCatalog implements Catalog {
try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
- ResultSet rs = ps.executeQuery();
- Map<String, String> options = connectorOptions();
- buildTableSchemaWithErrorCheck(
- tablePath, rs, builder, options, Collections.emptyList());
- return CatalogTable.of(
- TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
- builder.build(),
- options,
- Collections.emptyList(),
- "",
- catalogName);
+ try (ResultSet rs = ps.executeQuery()) {
+ Map<String, String> options = connectorOptions();
+ buildTableSchemaWithErrorCheck(
+ tablePath, rs, builder, options,
Collections.emptyList());
+ return CatalogTable.of(
+ TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
+ builder.build(),
+ options,
+ Collections.emptyList(),
+ "",
+ catalogName);
+ }
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
@@ -279,17 +284,18 @@ public class DorisCatalog implements Catalog {
try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
- ResultSet rs = ps.executeQuery();
- Map<String, String> options = connectorOptions();
- buildTableSchemaWithErrorCheck(tablePath, rs, builder, options,
fieldNames);
- return CatalogTable.of(
- TableIdentifier.of(
- catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
- builder.build(),
- options,
- Collections.emptyList(),
- "",
- catalogName);
+ try (ResultSet rs = ps.executeQuery()) {
+ Map<String, String> options = connectorOptions();
+ buildTableSchemaWithErrorCheck(tablePath, rs, builder,
options, fieldNames);
+ return CatalogTable.of(
+ TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
+ builder.build(),
+ options,
+ Collections.emptyList(),
+ "",
+ catalogName);
+ }
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
@@ -480,8 +486,8 @@ public class DorisCatalog implements Catalog {
public boolean isExistsData(TablePath tablePath) {
String tableName = tablePath.getFullName();
String sql = String.format("select * from %s limit 1;", tableName);
- try (PreparedStatement ps = conn.prepareStatement(sql)) {
- ResultSet resultSet = ps.executeQuery();
+ try (PreparedStatement ps = conn.prepareStatement(sql);
+ ResultSet resultSet = ps.executeQuery()) {
return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error
%s", sql), e);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 210bb779e0..247a68c62d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -570,9 +570,9 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
protected List<String> queryString(String url, String sql,
ResultSetConsumer<String> consumer)
throws SQLException {
- try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
+ try (PreparedStatement ps = getConnection(url).prepareStatement(sql);
+ ResultSet rs = ps.executeQuery()) {
List<String> result = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
while (rs.next()) {
String value = consumer.apply(rs);
if (value != null) {
@@ -643,8 +643,9 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
Connection connection = getConnection(dbUrl);
String sql = getExistDataSql(tablePath);
- try (PreparedStatement ps = connection.prepareStatement(sql)) {
- ResultSet resultSet = ps.executeQuery();
+ try (PreparedStatement ps = connection.prepareStatement(sql);
+ ResultSet resultSet = ps.executeQuery()) {
+
return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error
%s", sql), e);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index b876e33cc8..ceeff2587f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -208,10 +208,12 @@ public class OceanBaseMySqlCatalog extends
AbstractJdbcCatalog {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
- Statement statement = defaultConnection.createStatement();
- ResultSetMetaData metaData =
statement.executeQuery(sqlQuery).getMetaData();
- return CatalogUtils.getCatalogTable(
- metaData, new OceanBaseMySqlTypeMapper(typeConverter),
sqlQuery);
+ try (Statement statement = defaultConnection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sqlQuery)) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ return CatalogUtils.getCatalogTable(
+ metaData, new OceanBaseMySqlTypeMapper(typeConverter),
sqlQuery);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index 6f8574401f..bb224c4624 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -106,22 +106,23 @@ public class CatalogUtils {
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
- ResultSet rs =
+ // seq -> column name
+ List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
+ String pkName = null;
+ try (ResultSet rs =
metaData.getPrimaryKeys(
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
- tablePath.getTableName());
+ tablePath.getTableName())) {
- // seq -> column name
- List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
- String pkName = null;
- while (rs.next()) {
- String columnName = rs.getString("COLUMN_NAME");
- // all the PK_NAME should be the same
- pkName = cleanKeyName(rs.getString("PK_NAME"));
- int keySeq = rs.getInt("KEY_SEQ");
- // KEY_SEQ is 1-based index
- primaryKeyColumns.add(Pair.of(keySeq, columnName));
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ // all the PK_NAME should be the same
+ pkName = cleanKeyName(rs.getString("PK_NAME"));
+ int keySeq = rs.getInt("KEY_SEQ");
+ // KEY_SEQ is 1-based index
+ primaryKeyColumns.add(Pair.of(keySeq, columnName));
+ }
}
// initialize size
List<String> pkFields =
@@ -139,45 +140,46 @@ public class CatalogUtils {
public static List<ConstraintKey> getConstraintKeys(
DatabaseMetaData metadata, TablePath tablePath) throws
SQLException {
// We set approximate to true to avoid querying the statistics table,
which is slow.
- ResultSet resultSet =
+ try (ResultSet resultSet =
metadata.getIndexInfo(
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName(),
false,
- true);
- // index name -> index
- Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
- while (resultSet.next()) {
- String columnName = resultSet.getString("COLUMN_NAME");
- if (columnName == null) {
- continue;
- }
- String indexName = cleanKeyName(resultSet.getString("INDEX_NAME"));
- boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+ true)) {
+ // index name -> index
+ Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ if (columnName == null) {
+ continue;
+ }
+ String indexName =
cleanKeyName(resultSet.getString("INDEX_NAME"));
+ boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
- ConstraintKey constraintKey =
- constraintKeyMap.computeIfAbsent(
- indexName,
- s -> {
- ConstraintKey.ConstraintType constraintType =
- ConstraintKey.ConstraintType.INDEX_KEY;
- if (!noUnique) {
- constraintType =
ConstraintKey.ConstraintType.UNIQUE_KEY;
- }
- return ConstraintKey.of(
- constraintType, indexName, new
ArrayList<>());
- });
+ ConstraintKey constraintKey =
+ constraintKeyMap.computeIfAbsent(
+ indexName,
+ s -> {
+ ConstraintKey.ConstraintType
constraintType =
+
ConstraintKey.ConstraintType.INDEX_KEY;
+ if (!noUnique) {
+ constraintType =
ConstraintKey.ConstraintType.UNIQUE_KEY;
+ }
+ return ConstraintKey.of(
+ constraintType, indexName, new
ArrayList<>());
+ });
- ConstraintKey.ColumnSortType sortType =
- "A".equals(resultSet.getString("ASC_OR_DESC"))
- ? ConstraintKey.ColumnSortType.ASC
- : ConstraintKey.ColumnSortType.DESC;
- ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
- new ConstraintKey.ConstraintKeyColumn(columnName,
sortType);
- constraintKey.getColumnNames().add(constraintKeyColumn);
+ ConstraintKey.ColumnSortType sortType =
+ "A".equals(resultSet.getString("ASC_OR_DESC"))
+ ? ConstraintKey.ColumnSortType.ASC
+ : ConstraintKey.ColumnSortType.DESC;
+ ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+ new ConstraintKey.ConstraintKeyColumn(columnName,
sortType);
+ constraintKey.getColumnNames().add(constraintKeyColumn);
+ }
+ return new ArrayList<>(constraintKeyMap.values());
}
- return new ArrayList<>(constraintKeyMap.values());
}
private static String cleanKeyName(String keyName) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
index 87b0f54d20..664141b450 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
@@ -72,25 +72,33 @@ public class JdbcColumnConverter {
public static List<Column> convert(DatabaseMetaData metadata, TablePath
tablePath)
throws SQLException {
- ResultSet columnsResultSet =
+ List<Column> columns = new ArrayList<>();
+
+ try (ResultSet columnsResultSet =
metadata.getColumns(
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName(),
- null);
+ null)) {
- List<Column> columns = new ArrayList<>();
- while (columnsResultSet.next()) {
- String columnName = columnsResultSet.getString("COLUMN_NAME");
- int jdbcType = columnsResultSet.getInt("DATA_TYPE");
- String nativeType = columnsResultSet.getString("TYPE_NAME");
- int columnSize = columnsResultSet.getInt("COLUMN_SIZE");
- int decimalDigits = columnsResultSet.getInt("DECIMAL_DIGITS");
- int nullable = columnsResultSet.getInt("NULLABLE");
+ while (columnsResultSet.next()) {
+ String columnName = columnsResultSet.getString("COLUMN_NAME");
+ int jdbcType = columnsResultSet.getInt("DATA_TYPE");
+ String nativeType = columnsResultSet.getString("TYPE_NAME");
+ int columnSize = columnsResultSet.getInt("COLUMN_SIZE");
+ int decimalDigits = columnsResultSet.getInt("DECIMAL_DIGITS");
+ int nullable = columnsResultSet.getInt("NULLABLE");
- Column column =
- convert(columnName, jdbcType, nativeType, nullable,
columnSize, decimalDigits);
- columns.add(column);
+ Column column =
+ convert(
+ columnName,
+ jdbcType,
+ nativeType,
+ nullable,
+ columnSize,
+ decimalDigits);
+ columns.add(column);
+ }
}
return columns;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
index 08e68632f7..adee0ba38f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
@@ -25,6 +25,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Optional;
@@ -55,7 +57,10 @@ public class HiveDialect implements JdbcDialect {
@Override
public ResultSetMetaData getResultSetMetaData(Connection conn, String
query)
throws SQLException {
- return conn.prepareStatement(query).executeQuery().getMetaData();
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(query);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.getMetaData();
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
index 5be550cdef..e64974081d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
@@ -193,7 +193,9 @@ public class IrisDialect implements JdbcDialect {
@Override
public ResultSetMetaData getResultSetMetaData(Connection conn, String
query)
throws SQLException {
- PreparedStatement ps = conn.prepareStatement(query);
- return ps.executeQuery().getMetaData();
+ try (PreparedStatement ps = conn.prepareStatement(query);
+ ResultSet resultSet = ps.executeQuery()) {
+ return resultSet.getMetaData();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
index 7a1edbeee7..9506ffd997 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Optional;
@@ -61,6 +62,9 @@ public class TablestoreDialect implements JdbcDialect {
@Override
public ResultSetMetaData getResultSetMetaData(Connection conn, String
query)
throws SQLException {
- return conn.prepareStatement(query).executeQuery().getMetaData();
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(query);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.getMetaData();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
index 7340b099b8..adf2cf2190 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
@@ -225,7 +225,9 @@ public class XuguDialect implements JdbcDialect {
@Override
public ResultSetMetaData getResultSetMetaData(Connection conn, String
query)
throws SQLException {
- PreparedStatement ps = conn.prepareStatement(query);
- return ps.executeQuery().getMetaData();
+ try (PreparedStatement ps = conn.prepareStatement(query);
+ ResultSet resultSet = ps.executeQuery()) {
+ return resultSet.getMetaData();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 8a14b08efe..e71aae291a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -56,6 +56,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -107,12 +108,10 @@ public class StarRocksCatalog implements Catalog {
@Override
public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
-
- PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
-
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
+ PreparedStatement ps = conn.prepareStatement("SHOW
DATABASES;");
+ ResultSet rs = ps.executeQuery()) {
List<String> databases = new ArrayList<>();
- ResultSet rs = ps.executeQuery();
while (rs.next()) {
String databaseName = rs.getString(1);
@@ -136,11 +135,10 @@ public class StarRocksCatalog implements Catalog {
}
try (Connection conn =
- DriverManager.getConnection(
- urlInfo.getUrlWithDatabase(databaseName), username,
pwd)) {
- PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
-
- ResultSet rs = ps.executeQuery();
+ DriverManager.getConnection(
+ urlInfo.getUrlWithDatabase(databaseName),
username, pwd);
+ PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
+ ResultSet rs = ps.executeQuery()) {
List<String> tables = new ArrayList<>();
@@ -259,9 +257,10 @@ public class StarRocksCatalog implements Catalog {
}
public boolean isExistsData(TablePath tablePath) {
- try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd)) {
- String sql = String.format("select * from %s limit 1",
tablePath.getFullName());
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ String sql = String.format("select * from %s limit 1",
tablePath.getFullName());
+ try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd);
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet == null) {
return false;
}
@@ -455,13 +454,13 @@ public class StarRocksCatalog implements Catalog {
protected Optional<PrimaryKey> getPrimaryKey(String schema, String table)
throws SQLException {
List<String> pkFields = new ArrayList<>();
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
- ResultSet rs =
- conn.createStatement()
- .executeQuery(
- String.format(
- "SELECT COLUMN_NAME FROM
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
- schema, table));
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
+ ResultSet rs =
+ conn.createStatement()
+ .executeQuery(
+ String.format(
+ "SELECT COLUMN_NAME FROM
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
+ schema, table))) {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
pkFields.add(columnName);
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index 5c7b13c550..ed05e64937 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -71,10 +71,11 @@ public class TDengineSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
// check td driver whether exist and if not, try to register
checkDriverExist(jdbcUrl);
conn = DriverManager.getConnection(jdbcUrl);
- try (Statement statement = conn.createStatement()) {
- final ResultSet metaResultSet =
- statement.executeQuery(
- "desc " + config.getDatabase() + "." +
config.getStable());
+ try (Statement statement = conn.createStatement();
+ final ResultSet metaResultSet =
+ statement.executeQuery(
+ "desc " + config.getDatabase() + "." +
config.getStable())) {
+
while (metaResultSet.next()) {
if (StringUtils.equals("TAG",
metaResultSet.getString("note"))) {
tagsNum++;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 01a4c0a0f5..3789731354 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -233,8 +233,9 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
}
private List<List<Object>> querySql() {
- try (Connection connection = getJdbcConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(MongodbCDCIT.SINK_SQL);
+ try (Connection connection = getJdbcConnection();
+ ResultSet resultSet =
+
connection.createStatement().executeQuery(MongodbCDCIT.SINK_SQL)) {
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 7fab60f9fc..a98a5cd4d2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -44,6 +44,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -500,8 +501,9 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
}
private List<List<Object>> query(String sql) {
- try (Connection connection = getJdbcConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 1216c69645..8a1814e6ae 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -375,8 +375,9 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index c0a4254739..66ee281740 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -357,39 +357,40 @@ public class ClickhouseIT extends TestSuiteBase
implements TestResource {
List<String> columnList =
Arrays.stream(generateTestDataSet().getKey().getFieldNames())
.collect(Collectors.toList());
- Statement sourceStatement = connection.createStatement();
- Statement sinkStatement = connection.createStatement();
- ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
- ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
- Assertions.assertEquals(
- sourceResultSet.getMetaData().getColumnCount(),
- sinkResultSet.getMetaData().getColumnCount());
- while (sourceResultSet.next()) {
- if (sinkResultSet.next()) {
- for (String column : columnList) {
- Object source = sourceResultSet.getObject(column);
- Object sink = sinkResultSet.getObject(column);
- if (!Objects.deepEquals(source, sink)) {
- InputStream sourceAsciiStream =
sourceResultSet.getBinaryStream(column);
- InputStream sinkAsciiStream =
sinkResultSet.getBinaryStream(column);
- String sourceValue =
- IOUtils.toString(sourceAsciiStream,
StandardCharsets.UTF_8);
- String sinkValue =
- IOUtils.toString(sinkAsciiStream,
StandardCharsets.UTF_8);
- Assertions.assertEquals(sourceValue, sinkValue);
+ try (Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet =
sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql))
{
+ Assertions.assertEquals(
+ sourceResultSet.getMetaData().getColumnCount(),
+ sinkResultSet.getMetaData().getColumnCount());
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columnList) {
+ Object source = sourceResultSet.getObject(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream =
sourceResultSet.getBinaryStream(column);
+ InputStream sinkAsciiStream =
sinkResultSet.getBinaryStream(column);
+ String sourceValue =
+ IOUtils.toString(sourceAsciiStream,
StandardCharsets.UTF_8);
+ String sinkValue =
+ IOUtils.toString(sinkAsciiStream,
StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ Assertions.assertTrue(true);
}
- Assertions.assertTrue(true);
}
}
+ String columns = String.join(",",
generateTestDataSet().getKey().getFieldNames());
+ Assertions.assertTrue(
+ compare(String.format(CONFIG.getString(COMPARE_SQL),
columns, columns)));
}
- String columns = String.join(",",
generateTestDataSet().getKey().getFieldNames());
- Assertions.assertTrue(
- compare(String.format(CONFIG.getString(COMPARE_SQL), columns,
columns)));
}
private Boolean compare(String sql) {
- try (Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery(sql);
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
return !resultSet.next();
} catch (SQLException e) {
throw new RuntimeException("result compare error", e);
@@ -397,9 +398,9 @@ public class ClickhouseIT extends TestSuiteBase implements
TestResource {
}
private void assertHasData(String table) {
- try (Statement statement = connection.createStatement()) {
- String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
- ResultSet source = statement.executeQuery(sql);
+ String sql = String.format("select * from %s.%s limit 1", DATABASE,
table);
+ try (Statement statement = connection.createStatement();
+ ResultSet source = statement.executeQuery(sql); ) {
Assertions.assertTrue(source.next());
} catch (SQLException e) {
throw new RuntimeException("test clickhouse server image error",
e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
index 3d46ac8c55..5d9c1c848b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
@@ -189,10 +189,10 @@ public class ClickhouseSinkCDCChangelogIT extends
TestSuiteBase implements TestR
private void checkSinkTableRows() throws SQLException {
Set<List<Object>> actual = new HashSet<>();
- try (Statement statement = connection.createStatement()) {
- ResultSet resultSet =
- statement.executeQuery(
- String.format("select * from %s.%s", DATABASE,
SINK_TABLE));
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format("select * from %s.%s", DATABASE,
SINK_TABLE))) {
while (resultSet.next()) {
List<Object> row =
Arrays.asList(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index 458a900b4b..8392f9ae33 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -94,8 +94,11 @@ public abstract class AbstractDorisIT extends TestSuiteBase
implements TestResou
DriverManager.getConnection(String.format(URL,
container.getHost()), props);
try (Statement statement = jdbcConnection.createStatement()) {
statement.execute(SET_SQL);
- ResultSet resultSet;
+ ResultSet resultSet = null;
do {
+ if (resultSet != null) {
+ resultSet.close();
+ }
resultSet = statement.executeQuery(SHOW_BE);
} while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
index 33108b8b8e..7fa699d998 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -155,8 +155,8 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
.untilAsserted(
() -> {
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement =
jdbcConnection.createStatement()) {
- ResultSet sinkResultSet =
sinkStatement.executeQuery(sinkSql);
+ try (Statement sinkStatement =
jdbcConnection.createStatement();
+ ResultSet sinkResultSet =
sinkStatement.executeQuery(sinkSql)) {
while (sinkResultSet.next()) {
List<Object> row =
Arrays.asList(
@@ -178,8 +178,8 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
.untilAsserted(
() -> {
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement =
jdbcConnection.createStatement()) {
- ResultSet sinkResultSet =
sinkStatement.executeQuery(sinkSql);
+ try (Statement sinkStatement =
jdbcConnection.createStatement();
+ ResultSet sinkResultSet =
sinkStatement.executeQuery(sinkSql)) {
while (sinkResultSet.next()) {
List<Object> row =
Arrays.asList(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index 6b7a3a7f48..e9b81100de 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -203,13 +203,14 @@ public class DorisIT extends AbstractDorisIT {
conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, sinkDB);
ps.setString(2, DUPLICATE_TABLE);
- ResultSet resultSet = ps.executeQuery();
- while (resultSet.next()) {
- String columnName = resultSet.getString("COLUMN_NAME");
- String columnType = resultSet.getString("COLUMN_TYPE");
- Assertions.assertEquals(
-
checkColumnTypeMap.get(columnName).toUpperCase(Locale.ROOT),
- columnType.toUpperCase(Locale.ROOT));
+ try (ResultSet resultSet = ps.executeQuery()) {
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ String columnType = resultSet.getString("COLUMN_TYPE");
+ Assertions.assertEquals(
+
checkColumnTypeMap.get(columnName).toUpperCase(Locale.ROOT),
+ columnType.toUpperCase(Locale.ROOT));
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
index a7094044aa..c4037ecce6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
@@ -345,8 +345,9 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase
implements TestResou
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index 6cdc38780a..844c2fc00c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -51,11 +51,10 @@ public abstract class JdbcOceanBaseITBase extends
AbstractJdbcIT {
String.format("select * from %s order by 1",
getFullTableName(OCEANBASE_SOURCE));
String sinkSql =
String.format("select * from %s order by 1",
getFullTableName(OCEANBASE_SINK));
- try {
- Statement sourceStatement = connection.createStatement();
- Statement sinkStatement = connection.createStatement();
- ResultSet sourceResultSet =
sourceStatement.executeQuery(sourceSql);
- ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ try (Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet =
sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql))
{
Assertions.assertEquals(
sourceResultSet.getMetaData().getColumnCount(),
sinkResultSet.getMetaData().getColumnCount());
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 6993b99336..d357d238b7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -491,8 +491,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcConnection();
+ ResultSet resultSet =
connection.createStatement().executeQuery(sql)) {
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
index dd812efb12..cea5099e3b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
@@ -110,8 +110,8 @@ public class JdbcSinkCDCChangelogIT extends TestSuiteBase
implements TestResourc
postgreSQLContainer.getJdbcUrl(),
postgreSQLContainer.getUsername(),
postgreSQLContainer.getPassword())) {
- try (Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery("select * from
sink");
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("select *
from sink")) {
while (resultSet.next()) {
List<Object> row =
Arrays.asList(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
index 30c6783897..bf3c9f654f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
@@ -356,8 +356,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase
implements TestResourc
}
private boolean checkMysql(String sql) {
- try (Connection connection = getJdbcMySqlConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcMySqlConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
boolean tableExists = false;
if (resultSet.next()) {
tableExists = resultSet.getBoolean(1);
@@ -369,8 +370,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase
implements TestResourc
}
private boolean checkPG(String sql) {
- try (Connection connection = getJdbcPgConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcPgConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
boolean tableExists = false;
if (resultSet.next()) {
tableExists = resultSet.getBoolean(1);
@@ -382,8 +384,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase
implements TestResourc
}
private boolean checkSqlServer(String sql) {
- try (Connection connection = getJdbcSqlServerConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcSqlServerConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
boolean tableExists = false;
if (resultSet.next()) {
tableExists = resultSet.getInt(1) == 1;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
index ae2e625b15..9c8639160f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
@@ -362,8 +362,9 @@ public class JdbcSqlServerCreateTableIT extends
TestSuiteBase implements TestRes
}
private boolean checkMysql(String sql) {
- try (Connection connection = getJdbcMySqlConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcMySqlConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
boolean tableExists = false;
if (resultSet.next()) {
tableExists = resultSet.getBoolean(1);
@@ -375,8 +376,9 @@ public class JdbcSqlServerCreateTableIT extends
TestSuiteBase implements TestRes
}
private boolean checkPG(String sql) {
- try (Connection connection = getJdbcPgConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcPgConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
boolean tableExists = false;
if (resultSet.next()) {
tableExists = resultSet.getBoolean(1);
@@ -388,8 +390,9 @@ public class JdbcSqlServerCreateTableIT extends
TestSuiteBase implements TestRes
}
private boolean checkSqlServer(String sql) {
- try (Connection connection = getJdbcSqlServerConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ try (Connection connection = getJdbcSqlServerConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
boolean tableExists = false;
if (resultSet.next()) {
tableExists = resultSet.getInt(1) == 1;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
index 24bd606775..c4477ae9be 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
@@ -340,9 +340,9 @@ public class JdbcDorisIT extends TestSuiteBase implements
TestResource {
}
private void assertHasData(String table) {
- try (Statement statement = jdbcConnection.createStatement()) {
- String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
- ResultSet source = statement.executeQuery(sql);
+ String sql = String.format("select * from %s.%s limit 1", DATABASE,
table);
+ try (Statement statement = jdbcConnection.createStatement();
+ ResultSet source = statement.executeQuery(sql); ) {
Assertions.assertTrue(source.next());
} catch (Exception e) {
throw new RuntimeException("Test doris server image error", e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
index 4c6aaa245c..ef8bc78821 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
@@ -332,9 +332,9 @@ public class JdbcDorisdbIT extends TestSuiteBase implements
TestResource {
}
private void assertHasData(String table) {
- try (Statement statement = jdbcConnection.createStatement()) {
- String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
- ResultSet source = statement.executeQuery(sql);
+ String sql = String.format("select * from %s.%s limit 1", DATABASE,
table);
+ try (Statement statement = jdbcConnection.createStatement();
+ ResultSet source = statement.executeQuery(sql)) {
Assertions.assertTrue(source.next());
} catch (Exception e) {
throw new RuntimeException("Test doris server image error", e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
index b99c823de8..2a48e16fa5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
@@ -326,10 +326,10 @@ public class JdbcIrisIT extends AbstractJdbcIT {
public void testUpsert(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
container.executeJob("/jdbc_iris_upsert.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- try (Statement statement = connection.createStatement()) {
- ResultSet sink =
- statement.executeQuery(
- "SELECT * FROM test.e2e_upsert_table_sink ORDER BY
pk_id");
+ try (Statement statement = connection.createStatement();
+ ResultSet sink =
+ statement.executeQuery(
+ "SELECT * FROM test.e2e_upsert_table_sink
ORDER BY pk_id")) {
String[] fieldNames = new String[] {"pk_id", "name", "score"};
Object[] sinkResult = toArrayResult(sink, fieldNames);
Assertions.assertEquals(2, sinkResult.length);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index ec7b0173ff..bbd92de12e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -996,11 +996,10 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
POSTGRESQL_CONTAINER.getJdbcUrl(),
POSTGRESQL_CONTAINER.getUsername(),
POSTGRESQL_CONTAINER.getPassword())) {
- try (Statement statement = connection.createStatement()) {
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " +
tableName + " order by id")) {
PostgresJdbcRowConverter postgresJdbcRowConverter = new
PostgresJdbcRowConverter();
- ResultSet resultSet =
- statement.executeQuery("select * from " + tableName +
" order by id");
-
while (resultSet.next()) {
SeaTunnelRow row =
postgresJdbcRowConverter.toInternal(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
index f273296ec8..d9a5774d88 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
@@ -309,8 +309,9 @@ public class CanalToPulsarIT extends TestSuiteBase
implements TestResource {
POSTGRESQL_CONTAINER.getJdbcUrl(),
POSTGRESQL_CONTAINER.getUsername(),
POSTGRESQL_CONTAINER.getPassword())) {
- try (Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery("SELECT * FROM
sink ORDER BY id");
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("SELECT * FROM sink ORDER
BY id"); ) {
while (resultSet.next()) {
List<Object> row =
Arrays.asList(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
index 1a16662f99..9b83254cc6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
@@ -144,8 +144,8 @@ public class StarRocksCDCSinkIT extends TestSuiteBase
implements TestResource {
String sinkSql = String.format("select * from %s.%s", DATABASE,
SINK_TABLE);
Set<List<Object>> actual = new HashSet<>();
- try (Statement sinkStatement = jdbcConnection.createStatement()) {
- ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ try (Statement sinkStatement = jdbcConnection.createStatement();
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
) {
while (sinkResultSet.next()) {
List<Object> row =
Arrays.asList(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index a536cf0231..6f9e41ba02 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -334,9 +334,9 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
}
private void assertHasData(String table) {
- try (Statement statement = jdbcConnection.createStatement()) {
- String sql = String.format("select * from %s.%s limit 1",
DATABASE, table);
- ResultSet source = statement.executeQuery(sql);
+ String sql = String.format("select * from %s.%s limit 1", DATABASE,
table);
+ try (Statement statement = jdbcConnection.createStatement();
+ ResultSet source = statement.executeQuery(sql)) {
Assertions.assertTrue(source.next());
} catch (Exception e) {
throw new RuntimeException("test starrocks server image error", e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
index 10724ac2c6..a96d34bd0c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -136,8 +136,8 @@ public class TDengineIT extends TestSuiteBase implements
TestResource {
@SneakyThrows
private long readSinkDataset() {
long rowCount;
- try (Statement stmt = connection2.createStatement()) {
- ResultSet resultSet = stmt.executeQuery("select count(1) from
power2.meters2;");
+ try (Statement stmt = connection2.createStatement();
+ ResultSet resultSet = stmt.executeQuery("select count(1) from
power2.meters2;"); ) {
resultSet.next();
rowCount = resultSet.getLong(1);
}