This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f31679d  CAMEL-13955: camel-sjms batch now supports aggregation 
strategy completion aware. Also add exchange property with detail how it was 
completed like the Aggregate EIP does.
f31679d is described below

commit f31679dc3b98f451806d6ff6b95b50b44e3fbbfe
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Oct 7 12:52:05 2019 +0200

    CAMEL-13955: camel-sjms batch now supports aggregation strategy completion 
aware. Also add exchange property with detail how it was completed like the 
Aggregate EIP does.
---
 .../component/sjms/batch/SjmsBatchConsumer.java    | 21 ++++++---
 .../sjms/batch/SjmsBatchConsumerTest.java          | 54 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 4659b0e..ec6168e 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -403,7 +403,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     if (timeout.compareAndSet(true, false) || 
timeoutInterval.compareAndSet(true, false)) {
                         // trigger timeout
                         LOG.trace("Completion batch due timeout");
-                        completionBatch(session);
+                        String completedBy = completionInterval > 0 ? 
"interval" : "timeout";
+                        completionBatch(session, completedBy);
                         reset();
                         continue;
                     }
@@ -411,7 +412,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     if (completionSize > 0 && messageCount >= completionSize) {
                         // trigger completion size
                         LOG.trace("Completion batch due size");
-                        completionBatch(session);
+                        completionBatch(session, "size");
                         reset();
                         continue;
                     }
@@ -454,7 +455,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                     if (complete) {
                                         // trigger completion predicate
                                         LOG.trace("Completion batch due 
predicate");
-                                        completionBatch(session);
+                                        completionBatch(session, "predicate");
                                         reset();
                                     }
                                 } catch (Exception e) {
@@ -492,12 +493,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 aggregatedExchange = null;
             }
 
-            private void completionBatch(final Session session) {
+            private void completionBatch(final Session session, String 
completedBy) {
                 // batch
                 if (aggregatedExchange == null && 
getEndpoint().isSendEmptyMessageWhenIdle()) {
                     processEmptyMessage();
                 } else if (aggregatedExchange != null) {
-                    processBatch(aggregatedExchange, session);
+                    processBatch(aggregatedExchange, session, completedBy);
                 }
             }
 
@@ -546,7 +547,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         /**
          * Send an message with the batches messages.
          */
-        private void processBatch(Exchange exchange, Session session) {
+        private void processBatch(Exchange exchange, Session session, String 
completedBy) {
             int id = BATCH_COUNT.getAndIncrement();
             int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, 
Integer.class);
             if (LOG.isDebugEnabled()) {
@@ -554,6 +555,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 LOG.debug("Processing batch[" + id + "]:size=" + batchSize + 
":total=" + total);
             }
 
+            if ("timeout".equals(completedBy)) {
+                aggregationStrategy.timeout(exchange, id, batchSize, 
completionTimeout);
+            }
+            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
completedBy);
+
+            // invoke the on completion callback
+            aggregationStrategy.onCompletion(exchange);
+
             SessionCompletion sessionCompletion = new 
SessionCompletion(session);
             exchange.addOnCompletion(sessionCompletion);
             try {
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 3b3d59d..ecf7d10 100644
--- 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,12 +25,15 @@ import javax.jms.ConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategies;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
 import org.apache.camel.component.sjms.support.MockConnectionFactory;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
 import org.apache.camel.support.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.util.StopWatch;
@@ -153,6 +156,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
 
         template.sendBody("direct:in", generateStrings(messageCount));
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("size", 
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -180,6 +186,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
         template.sendBody("direct:in", generateStrings(50));
         template.sendBody("direct:in", "Message done");
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("predicate", 
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -206,6 +215,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
         template.sendBody("direct:in", generateStrings(messageCount));
         mockBatches.assertIsSatisfied();
         assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("timeout", 
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -233,6 +245,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport 
{
         template.sendBody("direct:in", generateStrings(messageCount));
 
         mockBatches.assertIsSatisfied();
+
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertEquals("interval", 
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
     }
 
     @Test
@@ -384,7 +399,46 @@ public class SjmsBatchConsumerTest extends 
CamelTestSupport {
 
         assertMockEndpointsSatisfied();
         stopWatch.taken();
+    }
+
+    @Test
+    public void testConsumptionCompletionAware() throws Exception {
+        final int completionSize = 5;
 
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                context.getRegistry().bind("groupedStrategy", 
AggregationStrategies.groupedBody());
+
+                
fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+                        queueName, 
completionSize).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(1);
+
+        template.sendBody("direct:in", "A,B,C,D,E");
+
+        mockBatches.assertIsSatisfied();
+        
+        Message msg = mockBatches.getExchanges().get(0).getMessage();
+        assertNotNull(msg);
+
+        assertEquals("size", 
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+        List body = msg.getBody(List.class);
+        assertNotNull(body);
+        assertEquals(5, body.size());
+        assertEquals("A", body.get(0));
+        assertEquals("B", body.get(1));
+        assertEquals("C", body.get(2));
+        assertEquals("D", body.get(3));
+        assertEquals("E", body.get(4));
     }
 
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int 
expectedLength) {

Reply via email to