Author: hadrian
Date: Wed Nov 5 17:21:28 2008
New Revision: 711750
URL: http://svn.apache.org/viewvc?rev=711750&view=rev
Log:
CAMEL-630. Patch applied with many thanks to Clayton.
Added:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
Modified:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
Modified:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisComponent.java
Wed Nov 5 17:21:28 2008
@@ -18,11 +18,13 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.Map;
import com.ibatis.sqlmap.client.SqlMapClient;
import com.ibatis.sqlmap.client.SqlMapClientBuilder;
import org.apache.camel.Endpoint;
+import org.apache.camel.component.ResourceBasedComponent;
import org.apache.camel.impl.DefaultComponent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,55 +36,99 @@
* for performing SQL operations using an XML mapping file to abstract away
the SQL
*
* @version $Revision$
+ *
+ * <pre>
+ * Ibatis Component used to read/write to a database.
+ *
+ * <u>Requires one of the following:</u>
+ *
+ * 1. A Sql Map config file either on the root of
+ * the classpath or explicitly set.
+ *
+ * <b>OR</b>
+ *
+ * 2. A SqlMapClient explicityly set.
+ *
+ * Using Ibatis as a source of data (<from>) you can use this component
+ * to treat a database table as a logical queue.
+ * Details are available in the [EMAIL PROTECTED] IBatisPollingConsumer}
+ *
+ * Using Ibatis as a destination for data (<to>) you can use this
+ * component to run an insert statement either on a single message or if the
+ * delivered content contains a collection of messages it can iterate through
+ * the collection and run the insert on each element.
+ * Details are available in the [EMAIL PROTECTED] IBatisProducer}
+ * </pre>
+ *
+ * @see IBatisProducer
+ * @see IBatisPollingConsumer
*/
-public class IBatisComponent extends DefaultComponent {
- public static final String DEFAULT_CONFIG_URI = "SqlMapConfig.xml";
+public class IBatisComponent extends ResourceBasedComponent {
private static final transient Log LOG =
LogFactory.getLog(IBatisComponent.class);
-
-
+ private static final String DEFAULT_CONFIG_URI =
"classpath:SqlMapConfig.xml";
private SqlMapClient sqlMapClient;
- private Resource sqlMapResource;
+ private String sqlMapConfig = DEFAULT_CONFIG_URI;
+ private boolean useTransactions = true;
- public IBatisComponent() {
+ public IBatisComponent(){
}
- public IBatisComponent(SqlMapClient sqlMapClient) {
+ public IBatisComponent(SqlMapClient sqlMapClient){
this.sqlMapClient = sqlMapClient;
}
// Properties
//-------------------------------------------------------------------------
+
+ /**
+ * Returns the configured SqlMapClient.
+ *
+ * @return com.ibatis.sqlmap.client.SqlMapClient
+ * @throws IOException If configured with a SqlMapConfig and there
+ * is a problem reading the resource.
+ */
public SqlMapClient getSqlMapClient() throws IOException {
if (sqlMapClient == null) {
sqlMapClient = createSqlMapClient();
}
return sqlMapClient;
}
-
+
+ /**
+ * Sets the SqlMapClient
+ * @param sqlMapClient The client
+ */
public void setSqlMapClient(SqlMapClient sqlMapClient) {
this.sqlMapClient = sqlMapClient;
}
- public Resource getSqlMapResource() {
- if (sqlMapResource == null) {
- sqlMapResource = new ClassPathResource(DEFAULT_CONFIG_URI);
- LOG.debug("Defaulting to use the iBatis configuration from: " +
sqlMapResource);
- }
- return sqlMapResource;
- }
-
- public void setSqlMapResource(Resource sqlMapResource) {
- this.sqlMapResource = sqlMapResource;
- }
-
- // Implementation methods
- //-------------------------------------------------------------------------
- protected Endpoint createEndpoint(String uri, String remaining, Map
parameters) throws Exception {
- return new IBatisEndpoint(uri, this, remaining);
- }
-
- protected SqlMapClient createSqlMapClient() throws IOException {
- InputStream in = getSqlMapResource().getInputStream();
- return SqlMapClientBuilder.buildSqlMapClient(in);
+ /**
+ * The Spring uri of the SqlMapConfig
+ * @return java.lang.String
+ */
+ public String getSqlMapConfig() {
+ return sqlMapConfig;
+ }
+
+ /**
+ * Creates an IbatisEndpoint for use by an IbatisConsumer or
IbatisProducer.
+ */
+ @Override
+ protected IBatisEndpoint createEndpoint(String uri, String remaining, Map
params) throws Exception {
+ return new IBatisEndpoint(uri, this, remaining, params);
+ }
+
+ private SqlMapClient createSqlMapClient() throws IOException {
+ Resource resource = resolveMandatoryResource(sqlMapConfig);
+ InputStream is = resource.getInputStream();
+ return SqlMapClientBuilder.buildSqlMapClient(new
InputStreamReader(is));
+ }
+
+ public boolean isUseTransactions() {
+ return useTransactions;
+ }
+
+ public void setUseTransactions(boolean useTransactions) {
+ this.useTransactions = useTransactions;
}
}
Modified:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisEndpoint.java
Wed Nov 5 17:21:28 2008
@@ -19,13 +19,21 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
import com.ibatis.sqlmap.client.SqlMapClient;
+import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import
org.apache.camel.component.ibatis.strategy.DefaultIBatisProcessingStategy;
+import org.apache.camel.component.ibatis.strategy.IBatisProcessingStrategy;
import org.apache.camel.impl.DefaultPollingEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* An <a href="http://activemq.apache.org/camel/ibatis.html>iBatis Endpoint</a>
@@ -33,24 +41,43 @@
*
* @version $Revision$
*/
-public class IBatisEndpoint extends DefaultPollingEndpoint {
- private final String entityName;
+public class IBatisEndpoint extends DefaultPollingEndpoint<Exchange> {
+ private static final transient Log logger =
LogFactory.getLog(IBatisEndpoint.class);
- public IBatisEndpoint(String endpointUri, IBatisComponent component,
String entityName) {
- super(endpointUri, component);
- this.entityName = entityName;
- }
+ private IBatisProcessingStrategy strategy;
+ /**
+ * Indicates if transactions are necessary. Defaulted in IBatisComponent.
+ */
+ private boolean useTransactions;
+ /**
+ * Statement to run when polling or processing
+ */
+ private String statement;
+ /**
+ * Name of a strategy to use for dealing w/
+ * polling a database and consuming the message. Can be a bean name
+ * or a class name.
+ */
+ private String consumeStrategyName;
+ /**
+ * URI parameters
+ */
+ private Map params;
- public IBatisEndpoint(String endpointUri, String entityName) {
- super(endpointUri);
- this.entityName = entityName;
+ public IBatisEndpoint(String uri, IBatisComponent component,
+ String statement, Map params) throws Exception {
+
+ super(uri, component);
+ this.params = params;
+ setUseTransactions(component.isUseTransactions());
+ setStatement(statement);
}
@Override
public IBatisComponent getComponent() {
return (IBatisComponent) super.getComponent();
}
-
+
public boolean isSingleton() {
return true;
}
@@ -60,26 +87,106 @@
}
@Override
- public PollingConsumer createPollingConsumer() throws Exception {
+ public IBatisPollingConsumer createConsumer(Processor processor) throws
Exception {
+ IBatisPollingConsumer consumer = new IBatisPollingConsumer(this,
processor);
+ configureConsumer(consumer);
+ return consumer;
+ }
+/*
+ @Override
+ public PollingConsumer<Exchange> createPollingConsumer() throws Exception {
return new IBatisPollingConsumer(this);
}
-
+*/
/**
- * Returns the iBatis SQL client
+ * @return SqlMapClient
+ * @throws IOException if the component is configured with a SqlMapConfig
+ * and there is a problem reading the file
*/
- public SqlMapClient getSqlClient() throws IOException {
+ public SqlMapClient getSqlMapClient() throws IOException {
return getComponent().getSqlMapClient();
}
- public String getEntityName() {
- return entityName;
+ /**
+ * Gets the IbatisProcessingStrategy to to use when consuming messages+
* from the database
+ * @return IbatisProcessingStrategy
+ * @throws Exception
+ */
+ public IBatisProcessingStrategy getProcessingStrategy() throws Exception {
+ if (strategy == null) {
+ String strategyName = (String) params.get("consumeStrategy");
+ strategy = getStrategy(strategyName, new
DefaultIBatisProcessingStategy());
+ }
+ return strategy;
}
- public void query(Message message) throws IOException, SQLException {
- String name = getEntityName();
- List list = getSqlClient().queryForList(name);
- message.setBody(list);
- message.setHeader("org.apache.camel.ibatis.queryName", name);
+ /**
+ * Statement to run when polling or processing
+ * @return name of the statement
+ */
+ public String getStatement() {
+ return statement;
+ }
+
+ /**
+ * Statement to run when polling or processing
+ * @param statement
+ */
+ public void setStatement(String statement) {
+ this.statement = statement;
+ }
+ /**
+ * Resolves a strategy in the camelContext or by class name
+ * @param name
+ * @param defaultStrategy
+ * @return IbatisProcessingStrategy
+ * @throws Exception
+ */
+ private IBatisProcessingStrategy getStrategy(String name,
IBatisProcessingStrategy defaultStrategy) throws Exception {
+
+ if (name == null) {
+ return defaultStrategy;
+ }
+
+ IBatisProcessingStrategy strategy =
getComponent().getCamelContext().getRegistry().lookup(name,
IBatisProcessingStrategy.class);
+ if (strategy == null) {
+ try {
+ Class<?> clazz = ObjectHelper.loadClass(name);
+ if (clazz != null) {
+ strategy = ObjectHelper.newInstance(clazz,
IBatisProcessingStrategy.class);
+ }
+ } catch(Exception e) {
+ logger.error("Failed to resolve/create processing strategy ("
+ name + ")", e);
+ throw e;
+ }
+ }
+
+ return strategy != null ? strategy : defaultStrategy;
+ }
+
+ /**
+ * Indicates if transactions should be used when calling statements.
Useful if using a comma separated list when
+ * consuming records.
+ * @return boolean
+ */
+ public boolean isUseTransactions() {
+ return useTransactions;
+ }
+
+ /**
+ * Sets indicator to use transactions for consuming and error handling
statements.
+ * @param useTransactions
+ */
+ public void setUseTransactions(boolean useTransactions) {
+ this.useTransactions = useTransactions;
+ }
+
+ public String getConsumeStrategyName() {
+ return consumeStrategyName;
+ }
+
+ public void setConsumeStrategyName(String consumeStrategyName) {
+ this.consumeStrategyName = consumeStrategyName;
}
}
Modified:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
Wed Nov 5 17:21:28 2008
@@ -16,44 +16,182 @@
*/
package org.apache.camel.component.ibatis;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
+ * <pre>
+ * Ibatis Camel Component used to read data from a database.
+ *
+ * Example Configuration :
+ * <route>
+ * <from uri="ibatis:selectRecords" />
+ * <to uri="jms:destinationQueue" />
+ * </route>
+ *
+ *
+ * This also can be configured to treat a table as a logical queue by defining
+ * an "onConsume" statement.
+ *
+ * Example Configuration :
+ * <route>
+ * <from
uri="ibatis:selectRecords?consumer.onConsume=updateRecord" />
+ * <to uri="jms:destinationQueue" />
+ * </route>
+ *
+ * By default, if the select statement contains multiple rows, it will
+ * iterate over the set and deliver each row to the route. If this is not the
+ * desired behavior then set "useIterator=false". This will
deliver the entire
+ * set to the route as a list.
+ * </pre>
+ *
+ * <b>URI Options</b>
+ * <table border="1">
+ * <thead>
+ * <th>Name</th>
+ * <th>Default Value</th>
+ * <th>description</th>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td>initialDelay</td>
+ * <td>1000 ms</td>
+ * <td>time before polling starts</td>
+ * </tr>
+ * <tr>
+ * <td>delay</td>
+ * <td>500 ms</td>
+ * <td>time before the next poll</td>
+ * </tr>
+ * <tr>
+ * <td>timeUnit</td>
+ * <td>MILLISECONDS</td>
+ * <td>Time unit to use for delay properties (NANOSECONDS, MICROSECONDS,
+ * MILLISECONDS, SECONDS)</td>
+ * </tr>
+ * <tr>
+ * <td>useIterator</td>
+ * <td>true</td>
+ * <td>If true, processes one exchange per row. If false processes one exchange
+ * for all rows</td>
+ * </tr>
+ * <tr>
+ * <td>onConsume</td>
+ * <td>null</td>
+ * <td>statement to run after data has been processed</td>
+ * </tr>
+ * <tbody> </table>
+ *
+ * @see strategy.IBatisProcessingStrategy
*/
-public class IBatisPollingConsumer extends PollingConsumerSupport {
- private final IBatisEndpoint endpoint;
-
- public IBatisPollingConsumer(IBatisEndpoint endpoint) {
- super(endpoint);
- this.endpoint = endpoint;
- }
-
- public Exchange receive(long timeout) {
- return receiveNoWait();
- }
-
- public Exchange receive() {
- return receiveNoWait();
- }
-
- public Exchange receiveNoWait() {
- try {
- Exchange exchange = endpoint.createExchange();
- Message in = exchange.getIn();
- endpoint.query(in);
- return exchange;
- } catch (Exception e) {
- throw new RuntimeCamelException("Failed to poll: " + endpoint + ".
Reason: " + e, e);
+public class IBatisPollingConsumer extends ScheduledPollConsumer<Exchange> {
+ private static Log logger = LogFactory.getLog(IBatisPollingConsumer.class);
+ /**
+ * Statement to run after data has been processed in the route
+ */
+ private String onConsume;
+ /**
+ * Process resultset individually or as a list
+ */
+ private boolean useIterator = true;
+
+ public IBatisPollingConsumer(IBatisEndpoint endpoint, Processor processor)
throws Exception {
+ super(endpoint, processor);
+ }
+
+ public IBatisEndpoint getEndpoint() {
+ return (IBatisEndpoint) super.getEndpoint();
+ }
+
+ /**
+ * Polls the database
+ */
+ @Override
+ protected void poll() throws Exception {
+ IBatisEndpoint endpoint = getEndpoint();
+ List data = endpoint.getProcessingStrategy().poll(this, getEndpoint());
+ if (useIterator) {
+ for (Object object : data) {
+ if (!super.isStopped()) {
+ process(object);
+ }
+ }
+ } else {
+ process(data);
}
}
- protected void doStart() throws Exception {
+ /**
+ * delivers the content
+ *
+ * @param data
+ * a single row object if useIterator=true otherwise the entire
+ * result set
+ */
+ protected void process(final Object data) throws Exception {
+ final IBatisEndpoint endpoint = getEndpoint();
+ final Exchange exchange =
endpoint.createExchange(ExchangePattern.InOnly);
+
+ Message msg = exchange.getIn();
+ msg.setBody(data);
+ msg.setHeader("org.apache.camel.ibatis.queryName",
endpoint.getStatement());
+
+ logger.debug("Setting message");
+
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ try {
+ if (onConsume != null) {
+ endpoint.getProcessingStrategy().commit(endpoint,
exchange, data, onConsume);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Gets the statement to run after successful processing
+ * @return Name of the statement
+ */
+ public String getOnConsume() {
+ return onConsume;
+ }
+
+ /**
+ * Sets the statement to run after successful processing
+ * @param onConsume The name of the statement
+ */
+ public void setOnConsume(String onConsume) {
+ this.onConsume = onConsume;
}
- protected void doStop() throws Exception {
+
+ /**
+ * Indicates how resultset should be delivered to the route
+ * @return boolean
+ */
+ public boolean isUseIterator() {
+ return useIterator;
+ }
+
+ /**
+ * Sets how resultset should be delivered to route.
+ * Indicates delivery as either a list or individual object.
+ * defaults to true.
+ * @param useIterator
+ */
+ public void setUseIterator(boolean useIterator) {
+ this.useIterator = useIterator;
}
}
Modified:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisProducer.java
Wed Nov 5 17:21:28 2008
@@ -17,19 +17,25 @@
package org.apache.camel.component.ibatis;
import java.util.Iterator;
+import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
+import com.ibatis.sqlmap.client.SqlMapClient;
+
/**
* @version $Revision$
*/
-public class IBatisProducer extends DefaultProducer {
- private final IBatisEndpoint endpoint;
+public class IBatisProducer extends DefaultProducer<Exchange> {
+ private String statement;
+ private IBatisEndpoint endpoint;
public IBatisProducer(IBatisEndpoint endpoint) {
super(endpoint);
+ statement = endpoint.getStatement();
this.endpoint = endpoint;
}
@@ -38,26 +44,24 @@
return (IBatisEndpoint) super.getEndpoint();
}
+ /**
+ * Calls insert on the SqlMapClient.
+ */
public void process(Exchange exchange) throws Exception {
+ SqlMapClient client = endpoint.getSqlMapClient();
Object body = exchange.getIn().getBody();
if (body == null) {
// must be a poll so lets do a query
- endpoint.query(exchange.getOut(true));
+ Message msg = exchange.getOut(true);
+ List list = client.queryForList(statement);
+ msg.setBody(list);
+ msg.setHeader("org.apache.camel.ibatis.queryName", statement);
} else {
- String operation = getOperationName(exchange);
-
// lets handle arrays or collections of objects
Iterator iter = ObjectHelper.createIterator(body);
while (iter.hasNext()) {
- endpoint.getSqlClient().insert(operation, iter.next());
+ client.insert(statement, iter.next());
}
}
}
-
- /**
- * Returns the iBatis insert operation name
- */
- protected String getOperationName(Exchange exchange) {
- return endpoint.getEntityName();
- }
}
Added:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java?rev=711750&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
(added)
+++
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/DefaultIBatisProcessingStategy.java
Wed Nov 5 17:21:28 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ibatis.strategy;
+
+import java.sql.Connection;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.ibatis.IBatisEndpoint;
+import org.apache.camel.component.ibatis.IBatisPollingConsumer;
+
+import com.ibatis.sqlmap.client.SqlMapClient;
+
+/**
+ * Default strategy for consuming messages for a route
+ */
+public class DefaultIBatisProcessingStategy implements
IBatisProcessingStrategy {
+ /**
+ * Calls update on the SqlMapClient using the consumeStatement.
+ * Will call multiple statements if the consumeStatement is a comma
separated list.
+ * The parameter passed to the statement is the original data delivered to
the route.
+ */
+ public void commit(IBatisEndpoint endpoint, Exchange exchange, Object
data, String consumeStatement) throws Exception {
+
+ SqlMapClient client = endpoint.getSqlMapClient();
+ boolean useTrans = endpoint.isUseTransactions();
+ String[] statements = consumeStatement.split(",");
+ try{
+ if (useTrans){
+
client.startTransaction(Connection.TRANSACTION_REPEATABLE_READ);
+ }
+ for (String statement: statements) {
+ client.update(statement.trim(), data);
+ }
+ if (useTrans){
+ client.commitTransaction();
+ }
+ } finally {
+ if (useTrans) {
+ client.endTransaction();
+ }
+ }
+ }
+
+ public List poll(IBatisPollingConsumer consumer, IBatisEndpoint endpoint)
throws Exception {
+ SqlMapClient client = endpoint.getSqlMapClient();
+ return client.queryForList(endpoint.getStatement(), null);
+ }
+}
Added:
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java?rev=711750&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
(added)
+++
activemq/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/strategy/IBatisProcessingStrategy.java
Wed Nov 5 17:21:28 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ibatis.strategy;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.ibatis.IBatisEndpoint;
+import org.apache.camel.component.ibatis.IBatisPollingConsumer;
+
+/**
+ * Processing strategy for dealing with IBatis records
+ *
+ */
+public interface IBatisProcessingStrategy {
+
+ /**
+ * Called when record is being queried.
+ * @param consumer The Ibatis Polling Consumer
+ * @param endpoint The Ibatis Endpoint
+ * @return Results of the query as a java.util.List
+ * @throws Exception
+ */
+ List poll (IBatisPollingConsumer consumer, IBatisEndpoint endpoint)
throws Exception;
+
+ /**
+ * Called if there is a statement to be run after processing
+ * @param endpoint The Ibatis Enpoint
+ * @param exchange The exchange after it has been processed
+ * @param data The original data delivered to the route
+ * @param consumeStatement The update statement to run
+ * @throws Exception
+ */
+ void commit(IBatisEndpoint endpoint, Exchange exchange, Object data,
String consumeStatement) throws Exception;
+}
Modified:
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisPollingDelayRouteTest.java
Wed Nov 5 17:21:28 2008
@@ -89,7 +89,7 @@
private Connection createConnection() throws Exception {
IBatisEndpoint endpoint =
resolveMandatoryEndpoint("ibatis:selectAllAccounts", IBatisEndpoint.class);
- return endpoint.getSqlClient().getDataSource().getConnection();
+ return endpoint.getSqlMapClient().getDataSource().getConnection();
}
}
\ No newline at end of file
Added:
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java?rev=711750&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
(added)
+++
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisQueueTest.java
Wed Nov 5 17:21:28 2008
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ibatis;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class IBatisQueueTest extends ContextTestSupport {
+
+ public void testConsume() throws Exception {
+
+ MockEndpoint endpoint = getMockEndpoint("mock:results");
+ endpoint.expectedMinimumMessageCount(2);
+
+
+ Account account = new Account();
+ account.setId(1);
+ account.setFirstName("Bob");
+ account.setLastName("Denver");
+ account.setEmailAddress("[EMAIL PROTECTED]");
+
+ template.sendBody("direct:start", account);
+
+ account = new Account();
+ account.setId(2);
+ account.setFirstName("Alan");
+ account.setLastName("Hale");
+ account.setEmailAddress("[EMAIL PROTECTED]");
+
+ template.sendBody("direct:start", account);
+
+ assertMockEndpointsSatisifed();
+
+ // now lets poll that the account has been inserted
+ Object answer =
template.sendBody("ibatis:selectProcessedAccounts", null);
+ List body = assertIsInstanceOf(List.class, answer);
+
+ assertEquals("Wrong size: " + body, 2, body.size());
+ Account actual = assertIsInstanceOf(Account.class, body.get(0));
+
+ assertEquals("Account.getFirstName()", "Bob",
actual.getFirstName());
+ assertEquals("Account.getLastName()", "Denver",
actual.getLastName());
+
+ answer = template.sendBody("ibatis:selectUnprocessedAccounts",
null);
+
+
+
+ body = assertIsInstanceOf(List.class, answer);
+ assertEquals("Wrong size: " + body, 0, body.size());
+
+
+ }
+
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+
from("ibatis:selectUnprocessedAccounts?consumer.onConsume=consumeAccount").to("mock:results");
+
+ from("direct:start").to("ibatis:insertAccount");
+
+
+ }
+ };
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ // lets create the database...
+ IBatisEndpoint endpoint =
resolveMandatoryEndpoint("ibatis:Account", IBatisEndpoint.class);
+ Connection connection =
endpoint.getSqlMapClient().getDataSource().getConnection();
+ Statement statement = connection.createStatement();
+ statement.execute("create table ACCOUNT ( ACC_ID INTEGER ,
ACC_FIRST_NAME VARCHAR(255), ACC_LAST_NAME VARCHAR(255), ACC_EMAIL
VARCHAR(255), PROCESSED BOOLEAN DEFAULT false)");
+ connection.close();
+ }
+
+ @Override
+ protected void tearDown() throws Exception{
+ super.tearDown();
+ IBatisEndpoint endpoint =
resolveMandatoryEndpoint("ibatis:Account", IBatisEndpoint.class);
+ Connection connection =
endpoint.getSqlMapClient().getDataSource().getConnection();
+ Statement statement = connection.createStatement();
+ statement.execute("drop table ACCOUNT");
+ connection.close();
+
+
+ }
+
+}
Modified:
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/test/java/org/apache/camel/component/ibatis/IBatisRouteTest.java
Wed Nov 5 17:21:28 2008
@@ -91,7 +91,6 @@
private Connection createConnection() throws Exception {
IBatisEndpoint endpoint = resolveMandatoryEndpoint("ibatis:Account",
IBatisEndpoint.class);
- return endpoint.getSqlClient().getDataSource().getConnection();
+ return endpoint.getSqlMapClient().getDataSource().getConnection();
}
-
}
Modified:
activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml?rev=711750&r1=711749&r2=711750&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
(original)
+++
activemq/camel/trunk/components/camel-ibatis/src/test/resources/org/apache/camel/component/ibatis/Account.xml
Wed Nov 5 17:21:28 2008
@@ -80,4 +80,16 @@
delete from ACCOUNT where ACC_ID = #id#
</delete>
+ <select id="selectUnprocessedAccounts" resultMap="AccountResult">
+ select * from ACCOUNT where PROCESSED = false
+ </select>
+
+ <select id="selectProcessedAccounts" resultMap="AccountResult">
+ select * from ACCOUNT where PROCESSED = true
+ </select>
+
+ <update id="consumeAccount" parameterClass="Account">
+ update ACCOUNT set PROCESSED = true where ACC_ID = #id#
+ </update>
+
</sqlMap>
\ No newline at end of file