This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 69cd4ae1a2 [Improve][Connector-V2] Fix SqlServer cdc memory leak
(#8083)
69cd4ae1a2 is described below
commit 69cd4ae1a21be5e54a5f722d7f125b4ff75b3367
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 19 21:35:31 2024 +0800
[Improve][Connector-V2] Fix SqlServer cdc memory leak (#8083)
---
.../connector/sqlserver/SqlServerConnection.java | 45 ++++++++++++++++++++++
1 file changed, 45 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
index 19c0cc0cbd..2c38dd1a26 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
@@ -492,6 +492,51 @@ public class SqlServerConnection extends JdbcConnection {
prepareQuery(queries, preparers, consumer);
}
+ /** Overridden to make sure the prepared statement is closed after the
query is executed. */
+ @Override
+ public JdbcConnection prepareQuery(
+ String[] multiQuery,
+ StatementPreparer[] preparers,
+ BlockingMultiResultSetConsumer resultConsumer)
+ throws SQLException, InterruptedException {
+ final ResultSet[] resultSets = new ResultSet[multiQuery.length];
+ final PreparedStatement[] preparedStatements = new
PreparedStatement[multiQuery.length];
+
+ try {
+ for (int i = 0; i < multiQuery.length; i++) {
+ final String query = multiQuery[i];
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("running '{}'", query);
+ }
+ final PreparedStatement statement =
connection().prepareStatement(query);
+ preparedStatements[i] = statement;
+ preparers[i].accept(statement);
+ resultSets[i] = statement.executeQuery();
+ }
+ if (resultConsumer != null) {
+ resultConsumer.accept(resultSets);
+ }
+ } finally {
+ for (ResultSet rs : resultSets) {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (Exception ei) {
+ }
+ }
+ }
+ for (PreparedStatement ps : preparedStatements) {
+ if (ps != null) {
+ try {
+ ps.close();
+ } catch (Exception ei) {
+ }
+ }
+ }
+ }
+ return this;
+ }
+
private Lsn getFromLsn(
String databaseName, SqlServerChangeTable changeTable, Lsn
intervalFromLsn)
throws SQLException {