This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new f31679d CAMEL-13955: camel-sjms batch now supports aggregation
strategy completion aware. Also add exchange property with detail how it was
completed like the Aggregate EIP does.
f31679d is described below
commit f31679dc3b98f451806d6ff6b95b50b44e3fbbfe
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Oct 7 12:52:05 2019 +0200
CAMEL-13955: camel-sjms batch now supports aggregation strategy completion
aware. Also add exchange property with detail how it was completed like the
Aggregate EIP does.
---
.../component/sjms/batch/SjmsBatchConsumer.java | 21 ++++++---
.../sjms/batch/SjmsBatchConsumerTest.java | 54 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 6 deletions(-)
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 4659b0e..ec6168e 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -403,7 +403,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (timeout.compareAndSet(true, false) ||
timeoutInterval.compareAndSet(true, false)) {
// trigger timeout
LOG.trace("Completion batch due timeout");
- completionBatch(session);
+ String completedBy = completionInterval > 0 ?
"interval" : "timeout";
+ completionBatch(session, completedBy);
reset();
continue;
}
@@ -411,7 +412,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (completionSize > 0 && messageCount >= completionSize) {
// trigger completion size
LOG.trace("Completion batch due size");
- completionBatch(session);
+ completionBatch(session, "size");
reset();
continue;
}
@@ -454,7 +455,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (complete) {
// trigger completion predicate
LOG.trace("Completion batch due
predicate");
- completionBatch(session);
+ completionBatch(session, "predicate");
reset();
}
} catch (Exception e) {
@@ -492,12 +493,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
aggregatedExchange = null;
}
- private void completionBatch(final Session session) {
+ private void completionBatch(final Session session, String
completedBy) {
// batch
if (aggregatedExchange == null &&
getEndpoint().isSendEmptyMessageWhenIdle()) {
processEmptyMessage();
} else if (aggregatedExchange != null) {
- processBatch(aggregatedExchange, session);
+ processBatch(aggregatedExchange, session, completedBy);
}
}
@@ -546,7 +547,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
/**
* Send an message with the batches messages.
*/
- private void processBatch(Exchange exchange, Session session) {
+ private void processBatch(Exchange exchange, Session session, String
completedBy) {
int id = BATCH_COUNT.getAndIncrement();
int batchSize = exchange.getProperty(Exchange.BATCH_SIZE,
Integer.class);
if (LOG.isDebugEnabled()) {
@@ -554,6 +555,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
LOG.debug("Processing batch[" + id + "]:size=" + batchSize +
":total=" + total);
}
+ if ("timeout".equals(completedBy)) {
+ aggregationStrategy.timeout(exchange, id, batchSize,
completionTimeout);
+ }
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
completedBy);
+
+ // invoke the on completion callback
+ aggregationStrategy.onCompletion(exchange);
+
SessionCompletion sessionCompletion = new
SessionCompletion(session);
exchange.addOnCompletion(sessionCompletion);
try {
diff --git
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 3b3d59d..ecf7d10 100644
---
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -25,12 +25,15 @@ import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.component.sjms.support.MockConnectionFactory;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
import org.apache.camel.support.SimpleRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.StopWatch;
@@ -153,6 +156,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("size",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -180,6 +186,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(50));
template.sendBody("direct:in", "Message done");
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("predicate",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -206,6 +215,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
assertFirstMessageBodyOfLength(mockBatches, messageCount);
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("timeout",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -233,6 +245,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport
{
template.sendBody("direct:in", generateStrings(messageCount));
mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertEquals("interval",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
}
@Test
@@ -384,7 +399,46 @@ public class SjmsBatchConsumerTest extends
CamelTestSupport {
assertMockEndpointsSatisfied();
stopWatch.taken();
+ }
+
+ @Test
+ public void testConsumptionCompletionAware() throws Exception {
+ final int completionSize = 5;
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ context.getRegistry().bind("groupedStrategy",
AggregationStrategies.groupedBody());
+
+
fromF("sjms-batch:%s?completionSize=%s&aggregationStrategy=#groupedStrategy",
+ queueName,
completionSize).routeId("batchConsumer").startupOrder(10)
+ .log(LoggingLevel.DEBUG, "${body.size}")
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(1);
+
+ template.sendBody("direct:in", "A,B,C,D,E");
+
+ mockBatches.assertIsSatisfied();
+
+ Message msg = mockBatches.getExchanges().get(0).getMessage();
+ assertNotNull(msg);
+
+ assertEquals("size",
msg.getExchange().getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+
+ List body = msg.getBody(List.class);
+ assertNotNull(body);
+ assertEquals(5, body.size());
+ assertEquals("A", body.get(0));
+ assertEquals("B", body.get(1));
+ assertEquals("C", body.get(2));
+ assertEquals("D", body.get(3));
+ assertEquals("E", body.get(4));
}
private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int
expectedLength) {