Author: davsclaus
Date: Thu Jan 17 13:45:55 2013
New Revision: 1434669
URL: http://svn.apache.org/viewvc?rev=1434669&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after
processing etc.
Added:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
- copied, changed from r1434662,
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
- copied, changed from r1434662,
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
Thu Jan 17 13:45:55 2013
@@ -60,5 +60,27 @@ public class DefaultSqlProcessingStrateg
});
}
+ @Override
+ public int commitBatchComplete(final SqlEndpoint endpoint, final
JdbcTemplate jdbcTemplate, final String query) throws Exception {
+ final String preparedQuery =
endpoint.getPrepareStatementStrategy().prepareQuery(query,
endpoint.isAllowNamedParameters());
+
+ return jdbcTemplate.execute(preparedQuery, new
PreparedStatementCallback<Integer>() {
+ public Integer doInPreparedStatement(PreparedStatement ps) throws
SQLException {
+ int expected = ps.getParameterMetaData().getParameterCount();
+ if (expected != 0) {
+ throw new IllegalArgumentException("Query
onConsumeBatchComplete " + query + " cannot have parameters, was " + expected);
+ }
+
+ LOG.trace("Execute query {}", query);
+ ps.execute();
+
+ int updateCount = ps.getUpdateCount();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update count {}", updateCount);
+ }
+ return updateCount;
+ };
+ });
+ }
}
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
Thu Jan 17 13:45:55 2013
@@ -59,9 +59,17 @@ public class SqlComponent extends Defaul
if (onConsume != null) {
onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute,
"?");
}
+ String onConsumeBatchComplete = getAndRemoveParameter(parameters,
"consumer.onConsumeBatchComplete", String.class);
+ if (onConsumeBatchComplete == null) {
+ onConsumeBatchComplete = getAndRemoveParameter(parameters,
"onConsumeBatchComplete", String.class);
+ }
+ if (onConsumeBatchComplete != null) {
+ onConsumeBatchComplete =
onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
+ }
SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
endpoint.setOnConsume(onConsume);
+ endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
return endpoint;
}
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
Thu Jan 17 13:45:55 2013
@@ -46,6 +46,7 @@ public class SqlConsumer extends Schedul
private final JdbcTemplate jdbcTemplate;
private String onConsume;
+ private String onConsumeBatchComplete;
private boolean useIterator = true;
private boolean routeEmptyResultSet;
private int expectedUpdateCount = -1;
@@ -178,6 +179,19 @@ public class SqlConsumer extends Schedul
}
}
+ try {
+ if (onConsumeBatchComplete != null) {
+ int updateCount =
getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(),
jdbcTemplate, onConsumeBatchComplete);
+ log.debug("onConsumeBatchComplete update count {}",
updateCount);
+ }
+ } catch (Exception e) {
+ if (breakBatchOnConsumeFail) {
+ throw e;
+ } else {
+ handleException("Error executing onConsumeBatchComplete query
" + onConsumeBatchComplete, e);
+ }
+ }
+
return total;
}
@@ -197,6 +211,14 @@ public class SqlConsumer extends Schedul
this.onConsume = onConsume;
}
+ public String getOnConsumeBatchComplete() {
+ return onConsumeBatchComplete;
+ }
+
+ public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+ this.onConsumeBatchComplete = onConsumeBatchComplete;
+ }
+
/**
* Indicates how resultset should be delivered to the route
*/
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
Thu Jan 17 13:45:55 2013
@@ -37,6 +37,7 @@ public class SqlEndpoint extends Default
private SqlProcessingStrategy processingStrategy = new
DefaultSqlProcessingStrategy();
private SqlPrepareStatementStrategy prepareStatementStrategy = new
DefaultSqlPrepareStatementStrategy();
private String onConsume;
+ private String onConsumeBatchComplete;
private boolean allowNamedParameters = true;
// TODO: onConsumeBatchDone to execute a query when batch done
@@ -54,6 +55,7 @@ public class SqlEndpoint extends Default
SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate,
query);
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
consumer.setOnConsume(getOnConsume());
+ consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
configureConsumer(consumer);
return consumer;
}
@@ -122,6 +124,14 @@ public class SqlEndpoint extends Default
this.onConsume = onConsume;
}
+ public String getOnConsumeBatchComplete() {
+ return onConsumeBatchComplete;
+ }
+
+ public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+ this.onConsumeBatchComplete = onConsumeBatchComplete;
+ }
+
public boolean isAllowNamedParameters() {
return allowNamedParameters;
}
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
Thu Jan 17 13:45:55 2013
@@ -36,4 +36,16 @@ public interface SqlProcessingStrategy {
* @throws Exception can be thrown in case of error
*/
int commit(SqlEndpoint endpoint, Exchange exchange, Object data,
JdbcTemplate jdbcTemplate, String query) throws Exception;
+
+ /**
+ * Commit callback when the batch is complete. This allows you to do one
extra query after all rows has been processed in the batch.
+ *
+ * @param endpoint the endpoint
+ * @param jdbcTemplate The JDBC template
+ * @param query The SQL query to execute
+ * @return the update count if the query returned an update count
+ * @throws Exception can be thrown in case of error
+ */
+ int commitBatchComplete(SqlEndpoint endpoint, JdbcTemplate jdbcTemplate,
String query) throws Exception;
+
}
Copied:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
(from r1434662,
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1434662&r2=1434669&rev=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
Thu Jan 17 13:45:55 2013
@@ -16,11 +16,6 @@
*/
package org.apache.camel.component.sql;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +30,7 @@ import org.springframework.jdbc.datasour
/**
*
*/
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteBatchCompleteTest extends CamelTestSupport {
private EmbeddedDatabase db;
private JdbcTemplate jdbcTemplate;
@@ -60,24 +55,13 @@ public class SqlConsumerDeleteTest exten
@Test
public void testConsume() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(3);
+ mock.expectedMessageCount(1);
assertMockEndpointsSatisfied();
- List<Exchange> exchanges = mock.getReceivedExchanges();
- assertEquals(3, exchanges.size());
-
- assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
- assertEquals("Camel",
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
- assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
- assertEquals("AMQ",
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
- assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
- assertEquals("Linux",
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
-
// give it a little tine to delete
Thread.sleep(500);
- // there should only be 1 row in the table
assertEquals("Should have deleted all 3 rows", 0,
jdbcTemplate.queryForInt("select count(*) from projects"));
}
@@ -88,7 +72,7 @@ public class SqlConsumerDeleteTest exten
public void configure() throws Exception {
getContext().getComponent("sql",
SqlComponent.class).setDataSource(db);
- from("sql:select * from projects order by
id?consumer.onConsume=delete from projects where id = :#id")
+ from("sql:select * from projects order by
id?consumer.onConsumeBatchComplete=delete from projects")
.to("mock:result");
}
};
Modified:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434669&r1=1434668&r2=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Thu Jan 17 13:45:55 2013
@@ -77,7 +77,6 @@ public class SqlConsumerDeleteTest exten
// give it a little tine to delete
Thread.sleep(500);
- // there should only be 1 row in the table
assertEquals("Should have deleted all 3 rows", 0,
jdbcTemplate.queryForInt("select count(*) from projects"));
}
Copied:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
(from r1434662,
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1434662&r2=1434669&rev=1434669&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
Thu Jan 17 13:45:55 2013
@@ -16,11 +16,6 @@
*/
package org.apache.camel.component.sql;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +30,7 @@ import org.springframework.jdbc.datasour
/**
*
*/
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteTransformTest extends CamelTestSupport {
private EmbeddedDatabase db;
private JdbcTemplate jdbcTemplate;
@@ -60,24 +55,13 @@ public class SqlConsumerDeleteTest exten
@Test
public void testConsume() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(3);
+ mock.expectedBodiesReceived("The project is Camel", "The project is
AMQ", "The project is Linux");
assertMockEndpointsSatisfied();
- List<Exchange> exchanges = mock.getReceivedExchanges();
- assertEquals(3, exchanges.size());
-
- assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
- assertEquals("Camel",
exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
- assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
- assertEquals("AMQ",
exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
- assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
- assertEquals("Linux",
exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
-
// give it a little tine to delete
Thread.sleep(500);
- // there should only be 1 row in the table
assertEquals("Should have deleted all 3 rows", 0,
jdbcTemplate.queryForInt("select count(*) from projects"));
}
@@ -88,7 +72,10 @@ public class SqlConsumerDeleteTest exten
public void configure() throws Exception {
getContext().getComponent("sql",
SqlComponent.class).setDataSource(db);
+ // even if we transform the exchange we can still do onConsume
as we have the original data at
+ // the point when onConsume is executed
from("sql:select * from projects order by
id?consumer.onConsume=delete from projects where id = :#id")
+ .transform().simple("The project is ${body[project]}")
.to("mock:result");
}
};