This is an automated email from the ASF dual-hosted git repository.
zhfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new d30d9985b47 CAMEL-19256: camel-jdbc - close stmt in the finnal block
(#9827)
d30d9985b47 is described below
commit d30d9985b479cf5f445b5ced1af510595ff06e37
Author: Zheng Feng <[email protected]>
AuthorDate: Tue Apr 11 19:36:40 2023 +0800
CAMEL-19256: camel-jdbc - close stmt in the finnal block (#9827)
---
.../apache/camel/component/jdbc/JdbcProducer.java | 96 ++++++++++++++--------
1 file changed, 63 insertions(+), 33 deletions(-)
diff --git
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
index 03edc8fb1ab..7e5716fae11 100644
---
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
+++
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
@@ -31,7 +31,6 @@ import java.util.Map;
import javax.sql.DataSource;
import org.apache.camel.Exchange;
-import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.support.SynchronizationAdapter;
@@ -102,6 +101,21 @@ public class JdbcProducer extends DefaultProducer {
if (shouldCloseResources) {
resetAutoCommit(conn, autoCommit);
closeQuietly(conn);
+ } else {
+ final Connection finalConn = conn;
+ final boolean finalAutoCommit = autoCommit;
+ exchange.getExchangeExtension().addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ resetAutoCommit(finalConn, finalAutoCommit);
+ closeQuietly(finalConn);
+ }
+
+ @Override
+ public int getOrder() {
+ return LOWEST + 200;
+ }
+ });
}
}
}
@@ -117,6 +131,19 @@ public class JdbcProducer extends DefaultProducer {
} finally {
if (shouldCloseResources &&
!connectionStrategy.isConnectionTransactional(conn, dataSource)) {
closeQuietly(conn);
+ } else {
+ final Connection finalConn = conn;
+ exchange.getExchangeExtension().addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ closeQuietly(finalConn);
+ }
+
+ @Override
+ public int getOrder() {
+ return LOWEST + 200;
+ }
+ });
}
}
}
@@ -187,6 +214,22 @@ public class JdbcProducer extends DefaultProducer {
if (shouldCloseResources) {
closeQuietly(rs);
closeQuietly(ps);
+ } else {
+ final Statement finalPs = ps;
+ final ResultSet finalRs = rs;
+ exchange.getExchangeExtension().addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ closeQuietly(finalRs);
+ closeQuietly(finalPs);
+ }
+
+ @Override
+ public int getOrder() {
+ // Make sure it happens before close Connection.
+ return LOWEST + 100;
+ }
+ });
}
}
return shouldCloseResources;
@@ -194,21 +237,12 @@ public class JdbcProducer extends DefaultProducer {
private boolean doCreateAndExecuteSqlStatement(Exchange exchange, String
sql, Connection conn) throws Exception {
+ Statement stmt = null;
ResultSet rs = null;
boolean shouldCloseResources = true;
try {
- // We might need to leave it open to allow post-processing of the
result set. This is why we
- // are not using try-with-resources here.
- final Statement stmt = conn.createStatement();
- // ensure statement is closed (to not leak) when exchange is done
- exchange.getExchangeExtension().addOnCompletion(new
SynchronizationAdapter() {
- @Override
- public void onDone(Exchange exchange) {
- closeQuietly(stmt);
- }
- });
-
+ stmt = conn.createStatement();
bindParameters(exchange, stmt);
LOG.debug("Executing JDBC Statement: {}", sql);
@@ -249,6 +283,23 @@ public class JdbcProducer extends DefaultProducer {
} finally {
if (shouldCloseResources) {
closeQuietly(rs);
+ closeQuietly(stmt);
+ } else {
+ final Statement finalStmt = stmt;
+ final ResultSet finalRs = rs;
+ exchange.getExchangeExtension().addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ closeQuietly(finalRs);
+ closeQuietly(finalStmt);
+ }
+
+ @Override
+ public int getOrder() {
+ // Make sure it happens before close Connection.
+ return LOWEST + 100;
+ }
+ });
}
}
return shouldCloseResources;
@@ -345,7 +396,6 @@ public class JdbcProducer extends DefaultProducer {
.setBody(new StreamListIterator(
getEndpoint().getCamelContext(),
getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(),
iterator));
- exchange.getExchangeExtension().addOnCompletion(new
ResultSetIteratorCompletion(iterator));
// do not close resources as we are in streaming mode
answer = false;
} else if (outputType == JdbcOutputType.SelectList) {
@@ -394,24 +444,4 @@ public class JdbcProducer extends DefaultProducer {
return row;
}
}
-
- private static final class ResultSetIteratorCompletion implements
Synchronization {
- private final ResultSetIterator iterator;
-
- private ResultSetIteratorCompletion(ResultSetIterator iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public void onComplete(Exchange exchange) {
- iterator.close();
- iterator.closeConnection();
- }
-
- @Override
- public void onFailure(Exchange exchange) {
- iterator.close();
- iterator.closeConnection();
- }
- }
}