This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 69cd4ae1a2 [Improve][Connector-V2] Fix SqlServer cdc memory leak 
(#8083)
69cd4ae1a2 is described below

commit 69cd4ae1a21be5e54a5f722d7f125b4ff75b3367
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 19 21:35:31 2024 +0800

    [Improve][Connector-V2] Fix SqlServer cdc memory leak (#8083)
---
 .../connector/sqlserver/SqlServerConnection.java   | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
index 19c0cc0cbd..2c38dd1a26 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
@@ -492,6 +492,51 @@ public class SqlServerConnection extends JdbcConnection {
         prepareQuery(queries, preparers, consumer);
     }
 
+    /** Overridden to make sure the prepared statement is closed after the 
query is executed. */
+    @Override
+    public JdbcConnection prepareQuery(
+            String[] multiQuery,
+            StatementPreparer[] preparers,
+            BlockingMultiResultSetConsumer resultConsumer)
+            throws SQLException, InterruptedException {
+        final ResultSet[] resultSets = new ResultSet[multiQuery.length];
+        final PreparedStatement[] preparedStatements = new 
PreparedStatement[multiQuery.length];
+
+        try {
+            for (int i = 0; i < multiQuery.length; i++) {
+                final String query = multiQuery[i];
+                if (LOGGER.isTraceEnabled()) {
+                    LOGGER.trace("running '{}'", query);
+                }
+                final PreparedStatement statement = 
connection().prepareStatement(query);
+                preparedStatements[i] = statement;
+                preparers[i].accept(statement);
+                resultSets[i] = statement.executeQuery();
+            }
+            if (resultConsumer != null) {
+                resultConsumer.accept(resultSets);
+            }
+        } finally {
+            for (ResultSet rs : resultSets) {
+                if (rs != null) {
+                    try {
+                        rs.close();
+                    } catch (Exception ei) {
+                    }
+                }
+            }
+            for (PreparedStatement ps : preparedStatements) {
+                if (ps != null) {
+                    try {
+                        ps.close();
+                    } catch (Exception ei) {
+                    }
+                }
+            }
+        }
+        return this;
+    }
+
     private Lsn getFromLsn(
             String databaseName, SqlServerChangeTable changeTable, Lsn 
intervalFromLsn)
             throws SQLException {

Reply via email to