Author: ningjiang
Date: Tue Jul 26 15:05:42 2011
New Revision: 1151126
URL: http://svn.apache.org/viewvc?rev=1151126&view=rev
Log:
CAMEL-4272 camel-jdbc should provide a option not set the autoCommit flag
Modified:
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
Modified:
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java?rev=1151126&r1=1151125&r2=1151126&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
(original)
+++
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
Tue Jul 26 15:05:42 2011
@@ -31,6 +31,7 @@ import org.apache.camel.impl.DefaultEndp
public class JdbcEndpoint extends DefaultEndpoint {
private int readSize;
private boolean transacted;
+ private boolean resetAutoCommit = true;
private DataSource dataSource;
private Map<String, Object> parameters;
private boolean useJDBC4ColumnNameAndLabelSemantics = true;
@@ -71,6 +72,14 @@ public class JdbcEndpoint extends Defaul
this.transacted = transacted;
}
+ public boolean isResetAutoCommit() {
+ return resetAutoCommit;
+ }
+
+ public void setResetAutoCommit(boolean resetAutoCommit) {
+ this.resetAutoCommit = resetAutoCommit;
+ }
+
public DataSource getDataSource() {
return dataSource;
}
Modified:
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java?rev=1151126&r1=1151125&r2=1151126&view=diff
==============================================================================
---
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
(original)
+++
camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
Tue Jul 26 15:05:42 2011
@@ -58,23 +58,34 @@ public class JdbcProducer extends Defaul
* Execute sql of exchange and set results on output
*/
public void process(Exchange exchange) throws Exception {
+ if (getEndpoint().isResetAutoCommit()) {
+ processingSqlBySettingAutoCommit(exchange);
+ } else {
+ processingSqlWithoutSettingAutoCommit(exchange);
+ }
+ // populate headers
+ exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ }
+
+ private void processingSqlBySettingAutoCommit(Exchange exchange) throws
Exception {
String sql = exchange.getIn().getBody(String.class);
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
Boolean autoCommit = null;
-
try {
conn = dataSource.getConnection();
autoCommit = conn.getAutoCommit();
- conn.setAutoCommit(false);
-
+ if (autoCommit) {
+ conn.setAutoCommit(false);
+ }
+
stmt = conn.createStatement();
-
+
if (parameters != null && !parameters.isEmpty()) {
IntrospectionSupport.setProperties(stmt, parameters);
}
-
+
LOG.debug("Executing JDBC statement: {}", sql);
if (stmt.execute(sql)) {
@@ -98,9 +109,36 @@ public class JdbcProducer extends Defaul
resetAutoCommit(conn, autoCommit);
closeQuietly(conn);
}
+ }
- // populate headers
- exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ private void processingSqlWithoutSettingAutoCommit(Exchange exchange)
throws Exception {
+ String sql = exchange.getIn().getBody(String.class);
+ Connection conn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ conn = dataSource.getConnection();
+
+ stmt = conn.createStatement();
+
+ if (parameters != null && !parameters.isEmpty()) {
+ IntrospectionSupport.setProperties(stmt, parameters);
+ }
+
+ LOG.debug("Executing JDBC statement: {}", sql);
+
+ if (stmt.execute(sql)) {
+ rs = stmt.getResultSet();
+ setResultSet(exchange, rs);
+ } else {
+ int updateCount = stmt.getUpdateCount();
+ exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT,
updateCount);
+ }
+ } finally {
+ closeQuietly(rs);
+ closeQuietly(stmt);
+ closeQuietly(conn);
+ }
}
private void closeQuietly(ResultSet rs) {