Repository: camel Updated Branches: refs/heads/master 472903bf2 -> 3f9651578
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/50945969 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/50945969 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/50945969 Branch: refs/heads/master Commit: 5094596967e93417dc86e5f3fa38b08fe44c9797 Parents: 472903b Author: Claus Ibsen <[email protected]> Authored: Mon Mar 23 07:43:41 2015 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Mar 23 11:55:56 2015 +0100 ---------------------------------------------------------------------- .../processor/aggregate/AggregateProcessor.java | 57 ++++++++++---------- .../PreCompletionAwareAggregationStrategy.java | 32 +++++++++++ 2 files changed, 62 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index b71c0bf..9b93c36 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -422,44 +422,47 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor List<Exchange> list = new ArrayList<Exchange>(); - // only need to update aggregation repository if we are not complete if (complete == null) { + // only need to update aggregation repository if we are not complete doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer); - // we are not complete so the answer should be null - answer = null; } else { - // if batch consumer completion is enabled then we need to complete the group - if ("consumer".equals(complete)) { - for (String batchKey : batchConsumerCorrelationKeys) { - Exchange batchAnswer; - if (batchKey.equals(key)) { - // skip the current aggregated key as we have already aggregated it and have the answer - batchAnswer = answer; - } else { - batchAnswer = aggregationRepository.get(camelContext, batchKey); - } + // if we are complete then add the answer to the list + doAggregationComplete(complete, list, key, originalExchange, answer); + } - if (batchAnswer != null) { - batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); - onCompletion(batchKey, originalExchange, batchAnswer, false); - list.add(batchAnswer); - } + LOG.trace("onAggregation +++ end +++ with correlation key: {}", key); + return list; + } + + protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) { + if ("consumer".equals(complete)) { + for (String batchKey : batchConsumerCorrelationKeys) { + Exchange batchAnswer; + if (batchKey.equals(key)) { + // skip the current aggregated key as we have already aggregated it and have the answer + batchAnswer = answer; + } else { + batchAnswer = aggregationRepository.get(camelContext, batchKey); + } + + if (batchAnswer != null) { + batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); + onCompletion(batchKey, originalExchange, batchAnswer, false); + list.add(batchAnswer); } - batchConsumerCorrelationKeys.clear(); - // we have already submitted to completion, so answer should be null - answer = null; - } else { - // we are complete for this exchange - answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); - answer = onCompletion(key, originalExchange, answer, false); } + batchConsumerCorrelationKeys.clear(); + // we have already submitted to completion, so answer should be null + answer = null; + } else { + // we are complete for this exchange + answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); + answer = onCompletion(key, originalExchange, answer, false); } - LOG.trace("onAggregation +++ end +++ with correlation key: {}", key); if (answer != null) { list.add(answer); } - return list; } protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java new file mode 100644 index 0000000..53fc3f0 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import org.apache.camel.Exchange; + +public interface PreCompletionAwareAggregationStrategy extends AggregationStrategy { + + /** + * Determines if the aggregation should complete the current group, and start a new group, or the aggregation + * should continue using the current group. + * + * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) + * @param newExchange the newest exchange (can be <tt>null</tt> if there was no data possible to acquire) + * @return <tt>true</tt> to complete current group and start a new group, or <tt>false</tt> to keep using current + */ + boolean preComplete(Exchange oldExchange, Exchange newExchange); +}
