CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/efaa7bf7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/efaa7bf7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/efaa7bf7 Branch: refs/heads/master Commit: efaa7bf71a674ac7a98d43b9c187860b04eef9ad Parents: 7973ac5 Author: Claus Ibsen <[email protected]> Authored: Mon Mar 23 10:25:09 2015 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Mar 23 11:56:03 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/model/AggregateDefinition.java | 1 + .../processor/aggregate/AggregateProcessor.java | 89 ++++++++++++++------ .../AggregatePreCompleteAwareStrategyTest.java | 54 ++++++++++++ ...gatePreCompleteAwareStrategyTimeoutTest.java | 54 ++++++++++++ .../AggregatePredicateAwareStrategyTest.java | 53 ------------ 5 files changed, 172 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 942d69b..cfcb027 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -41,6 +41,7 @@ import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; +import org.apache.camel.processor.aggregate.PreCompletionAwareAggregationStrategy; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index b365442..fbec104 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -92,6 +92,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor private final Processor processor; private String id; private AggregationStrategy aggregationStrategy; + private boolean preCompletion; private Expression correlationExpression; private AggregateController aggregateController; private final ExecutorService executorService; @@ -376,6 +377,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor LOG.trace("onAggregation +++ start +++ with correlation key: {}", key); List<Exchange> list = new ArrayList<Exchange>(); + String complete = null; Exchange answer; Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key); @@ -396,31 +398,36 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor ExchangeHelper.prepareAggregation(oldExchange, newExchange); // check if we are pre complete - boolean preComplete; - try { - // put the current aggregated size on the exchange so its avail during completion check - newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); - preComplete = onPreCompletionAggregation(oldExchange, newExchange); - // remove it afterwards - newExchange.removeProperty(Exchange.AGGREGATED_SIZE); - } catch (Throwable e) { - // must catch any exception from aggregation - throw new CamelExchangeException("Error occurred during preComplete", newExchange, e); - } - - // check if we are complete - String complete = null; - if (!preComplete && isEagerCheckCompletion()) { + if (preCompletion) { + try { + // put the current aggregated size on the exchange so its avail during completion check + newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); + complete = isPreCompleted(key, oldExchange, newExchange); + // make sure to track timeouts if not complete + if (complete == null) { + trackTimeout(key, newExchange); + } + // remove it afterwards + newExchange.removeProperty(Exchange.AGGREGATED_SIZE); + } catch (Throwable e) { + // must catch any exception from aggregation + throw new CamelExchangeException("Error occurred during preComplete", newExchange, e); + } + } else if (isEagerCheckCompletion()) { // put the current aggregated size on the exchange so its avail during completion check newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); complete = isCompleted(key, newExchange); + // make sure to track timeouts if not complete + if (complete == null) { + trackTimeout(key, newExchange); + } // remove it afterwards newExchange.removeProperty(Exchange.AGGREGATED_SIZE); } - if (preComplete) { + if (preCompletion && complete != null) { // need to pre complete the current group before we aggregate - doAggregationComplete("strategy", list, key, originalExchange, oldExchange); + doAggregationComplete(complete, list, key, originalExchange, oldExchange); // as we complete the current group eager, we should indicate the new group is not complete complete = null; // and clear old/original exchange as we start on a new group @@ -445,8 +452,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor answer.setProperty(Exchange.AGGREGATED_SIZE, size); // maybe we should check completion after the aggregation - if (!isEagerCheckCompletion()) { + if (!preCompletion && !isEagerCheckCompletion()) { complete = isCompleted(key, answer); + // make sure to track timeouts if not complete + if (complete == null) { + trackTimeout(key, newExchange); + } } if (complete == null) { @@ -515,6 +526,22 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } /** + * Tests whether the given exchanges is pre-complete or not + * + * @param key the correlation key + * @param oldExchange the existing exchange + * @param newExchange the incoming exchange + * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion + */ + protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) { + boolean complete = false; + if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) { + complete = ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange, newExchange); + } + return complete ? "strategy" : null; + } + + /** * Tests whether the given exchange is complete or not * * @param key the correlation key @@ -564,6 +591,11 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } } + // not complete + return null; + } + + protected void trackTimeout(String key, Exchange exchange) { // timeout can be either evaluated based on an expression or from a fixed value // expression takes precedence boolean timeoutSet = false; @@ -586,9 +618,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } addExchangeToTimeoutMap(key, exchange, getCompletionTimeout()); } - - // not complete - return null; } protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { @@ -1182,11 +1211,19 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor @Override protected void doStart() throws Exception { - if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null - && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null - && getCompletionSizeExpression() == null) { - throw new IllegalStateException("At least one of the completions options" - + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); + if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) { + preCompletion = true; + LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId()); + } + + if (!preCompletion) { + // if not in pre completion mode then check we configured the completion required + if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null + && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null + && getCompletionSizeExpression() == null) { + throw new IllegalStateException("At least one of the completions options" + + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); + } } if (getCloseCorrelationKeyOnCompletion() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java new file mode 100644 index 0000000..f965c90 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy; + +/** + * @version + */ +public class AggregatePreCompleteAwareStrategyTest extends ContextTestSupport { + + public void testAggregatePreComplete() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "D", "id", 123); + template.sendBodyAndHeader("direct:start", "E", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "F", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()) + .to("mock:aggregated"); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java new file mode 100644 index 0000000..abfda10 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy; + +/** + * @version + */ +public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSupport { + + public void testAggregatePreCompleteTimeout() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X+F"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "D", "id", 123); + template.sendBodyAndHeader("direct:start", "E", "id", 123); + template.sendBodyAndHeader("direct:start", "X", "id", 123); + template.sendBodyAndHeader("direct:start", "F", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionTimeout(1000) + .to("mock:aggregated"); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java deleted file mode 100644 index 74fe19b..0000000 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor.aggregator; - -import org.apache.camel.ContextTestSupport; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy; - -/** - * @version - */ -public class AggregatePredicateAwareStrategyTest extends ContextTestSupport { - - public void testAggregatePreComplete() throws Exception { - getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E"); - - template.sendBodyAndHeader("direct:start", "A", "id", 123); - template.sendBodyAndHeader("direct:start", "B", "id", 123); - template.sendBodyAndHeader("direct:start", "C", "id", 123); - template.sendBodyAndHeader("direct:start", "X", "id", 123); - template.sendBodyAndHeader("direct:start", "D", "id", 123); - template.sendBodyAndHeader("direct:start", "E", "id", 123); - template.sendBodyAndHeader("direct:start", "X", "id", 123); - - assertMockEndpointsSatisfied(); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start") - .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionSize(5) - .to("mock:aggregated"); - } - }; - } -} \ No newline at end of file
