Repository: camel
Updated Branches:
  refs/heads/master 3948a0734 -> a23bd9cc5


CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the 
code and fixed some mistakes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a23bd9cc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a23bd9cc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a23bd9cc

Branch: refs/heads/master
Commit: a23bd9cc5065e21566a24db3face99f12acf41e7
Parents: 3948a07
Author: Claus Ibsen <[email protected]>
Authored: Mon Feb 22 09:39:56 2016 +0100
Committer: Claus Ibsen <[email protected]>
Committed: Mon Feb 22 09:39:56 2016 +0100

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java   | 18 ++++++------------
 .../component/sjms/batch/SjmsBatchEndpoint.java   |  1 -
 2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a23bd9cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 4ab7437..215a72b 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -32,10 +32,8 @@ import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -327,14 +325,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                 startTime = new Date().getTime();
                             }
 
-                            // TODO: why only object or text messages?
-                            if (message instanceof ObjectMessage || message 
instanceof TextMessage) {
-                                final Exchange exchange = 
getEndpoint().createExchange(message, session);
-                                aggregatedExchange = 
aggregationStrategy.aggregate(aggregatedExchange, exchange);
-                                
aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, 
messageCount);
-                            } else {
-                                throw new IllegalArgumentException("Unexpected 
message type: " + message.getClass().toString());
-                            }
+                            final Exchange exchange = 
getEndpoint().createExchange(message, session);
+                            aggregatedExchange = 
aggregationStrategy.aggregate(aggregatedExchange, exchange);
+                            
aggregatedExchange.setProperty(Exchange.BATCH_SIZE, messageCount);
                         }
 
                         if (usingTimeout && startTime > 0) {
@@ -422,9 +415,10 @@ public class SjmsBatchConsumer extends DefaultConsumer {
          */
         private void processBatch(Exchange exchange, Session session) {
             int id = BATCH_COUNT.getAndIncrement();
-            int batchSize = 
exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
+            int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, 
Integer.class);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + 
":total=" + MESSAGE_RECEIVED.addAndGet(batchSize));
+                long total = MESSAGE_RECEIVED.get() + batchSize;
+                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + 
":total=" + total);
             }
 
             SessionCompletion sessionCompletion = new 
SessionCompletion(session);

http://git-wip-us.apache.org/repos/asf/camel/blob/a23bd9cc/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index d989e08..9286000 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -50,7 +50,6 @@ public class SjmsBatchEndpoint extends DefaultEndpoint 
implements HeaderFilterSt
 
     public static final int DEFAULT_COMPLETION_SIZE = 200; // the default 
dispatch queue size in ActiveMQ
     public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
-    public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
 
     private JmsBinding binding;
 

Reply via email to