This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push:
new 5be6548 CAMEL-13955: camel-sjms batch now supports aggregation
strategy completion aware. Also add exchange property with detail how it was
completed like the Aggregate EIP does. (#3249)
5be6548 is described below
commit 5be6548b592c219af244f5dcce849a9e2ba002c2
Author: bradhgbst <[email protected]>
AuthorDate: Wed Oct 16 13:59:27 2019 +1000
CAMEL-13955: camel-sjms batch now supports aggregation strategy completion
aware. Also add exchange property with detail how it was completed like the
Aggregate EIP does. (#3249)
---
.../component/sjms/batch/SjmsBatchConsumer.java | 25 +++++++---
.../sjms/batch/SjmsBatchConsumerTest.java | 55 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 7 deletions(-)
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 3e742f6..6ac149e 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -41,6 +41,8 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -400,7 +402,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (timeout.compareAndSet(true, false) ||
timeoutInterval.compareAndSet(true, false)) {
// trigger timeout
LOG.trace("Completion batch due timeout");
- completionBatch(session);
+ String completedBy = completionInterval > 0 ?
"interval" : "timeout";
+ completionBatch(session, completedBy);
reset();
continue;
}
@@ -408,7 +411,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (completionSize > 0 && messageCount >= completionSize) {
// trigger completion size
LOG.trace("Completion batch due size");
- completionBatch(session);
+ completionBatch(session, "size");
reset();
continue;
}
@@ -451,7 +454,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (complete) {
// trigger completion predicate
LOG.trace("Completion batch due
predicate");
- completionBatch(session);
+ completionBatch(session, "predicate");
reset();
}
} catch (Exception e) {
@@ -489,12 +492,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
aggregatedExchange = null;
}
- private void completionBatch(final Session session) {
+ private void completionBatch(final Session session, String
completedBy) {
// batch
if (aggregatedExchange == null &&
getEndpoint().isSendEmptyMessageWhenIdle()) {
processEmptyMessage();
} else if (aggregatedExchange != null) {
- processBatch(aggregatedExchange, session);
+ processBatch(aggregatedExchange, session, completedBy);
}
}
@@ -543,7 +546,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
/**
* Send an message with the batches messages.
*/
- private void processBatch(Exchange exchange, Session session) {
+ private void processBatch(Exchange exchange, Session session, String
completedBy) {
int id = BATCH_COUNT.getAndIncrement();
int batchSize = exchange.getProperty(Exchange.BATCH_SIZE,
Integer.class);
if (LOG.isDebugEnabled()) {
@@ -551,6 +554,16 @@ public class SjmsBatchConsumer extends DefaultConsumer {
LOG.debug("Processing batch[" + id + "]:size=" + batchSize +
":total=" + total);
}
+ if ("timeout".equals(completedBy) && aggregationStrategy
instanceof TimeoutAwareAggregationStrategy) {
+
((TimeoutAwareAggregationStrategy)aggregationStrategy).timeout(exchange, id,
batchSize, completionTimeout);
+ }
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
completedBy);
+
+ // invoke the on completion callback
+ if (aggregationStrategy instanceof
CompletionAwareAggregationStrategy) {
+ ((CompletionAwareAggregationStrategy)
aggregationStrategy).onCompletion(exchange);
+ }
+
SessionCompletion sessionCompletion = new
SessionCompletion(session);
exchange.addOnCompletion(sessionCompletion);
try {
diff --git
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 6afc288..a611be1 100644
---
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,7 +25,9 @@ import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.util.toolbox.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.SjmsComponent;
@@ -153,6 +155,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("size",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -180,6 +185,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(50));
template.sendBody("direct:in", "Message done");
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("predicate",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -206,6 +214,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("timeout",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -233,6 +244,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("interval",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -383,8 +397,47 @@ public class SjmsBatchConsumerTest extends
CamelTestSupport {
context.startRoute("batchConsumer");
assertMockEndpointsSatisfied();
- stopWatch.stop();
+ stopWatch.taken();
+ }
+
+ @Test
+ public void testConsumptionCompletionAware() throws Exception {
+ final int completionSize = 5;
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ ((SimpleRegistry)context.getRegistry()).put("groupedStrategy",
AggregationStrategies.groupedBody());
+
+
fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+ queueName,
completionSize).routeId("batchConsumer").startupOrder(10)
+ .log(LoggingLevel.DEBUG, "${body.size}")
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(1);
+
+ template.sendBody("direct:in", "A,B,C,D,E");
+
+ mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertNotNull(msg);
+
+ assertEquals("size",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+ List body = msg.getBody(List.class);
+ assertNotNull(body);
+ assertEquals(5, body.size());
+ assertEquals("A", body.get(0));
+ assertEquals("B", body.get(1));
+ assertEquals("C", body.get(2));
+ assertEquals("D", body.get(3));
+ assertEquals("E", body.get(4));
}
private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int
expectedLength) {