Repository: camel Updated Branches: refs/heads/master 3948a0734 -> a23bd9cc5
CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a23bd9cc Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a23bd9cc Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a23bd9cc Branch: refs/heads/master Commit: a23bd9cc5065e21566a24db3face99f12acf41e7 Parents: 3948a07 Author: Claus Ibsen <[email protected]> Authored: Mon Feb 22 09:39:56 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Feb 22 09:39:56 2016 +0100 ---------------------------------------------------------------------- .../component/sjms/batch/SjmsBatchConsumer.java | 18 ++++++------------ .../component/sjms/batch/SjmsBatchEndpoint.java | 1 - 2 files changed, 6 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a23bd9cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 4ab7437..215a72b 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -32,10 +32,8 @@ import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; -import javax.jms.TextMessage; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -327,14 +325,9 @@ public class SjmsBatchConsumer extends DefaultConsumer { startTime = new Date().getTime(); } - // TODO: why only object or text messages? - if (message instanceof ObjectMessage || message instanceof TextMessage) { - final Exchange exchange = getEndpoint().createExchange(message, session); - aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange); - aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount); - } else { - throw new IllegalArgumentException("Unexpected message type: " + message.getClass().toString()); - } + final Exchange exchange = getEndpoint().createExchange(message, session); + aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange); + aggregatedExchange.setProperty(Exchange.BATCH_SIZE, messageCount); } if (usingTimeout && startTime > 0) { @@ -422,9 +415,10 @@ public class SjmsBatchConsumer extends DefaultConsumer { */ private void processBatch(Exchange exchange, Session session) { int id = BATCH_COUNT.getAndIncrement(); - int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class); + int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class); if (LOG.isDebugEnabled()) { - LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + MESSAGE_RECEIVED.addAndGet(batchSize)); + long total = MESSAGE_RECEIVED.get() + batchSize; + LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total); } SessionCompletion sessionCompletion = new SessionCompletion(session); http://git-wip-us.apache.org/repos/asf/camel/blob/a23bd9cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java index d989e08..9286000 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -50,7 +50,6 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ public static final int DEFAULT_COMPLETION_TIMEOUT = 500; - public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize"; private JmsBinding binding;
