This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit ab8a08e8717c860cc2bd6f6b8d4d716d3ee782dc Author: Claus Ibsen <[email protected]> AuthorDate: Mon Aug 5 14:10:08 2019 +0200 CAMEL-6325: Add discardOnAggregationFailure option to aggregate EIP --- .../processor/aggregate/AggregateProcessor.java | 58 +++++++-- .../apache/camel/model/AggregateDefinition.java | 27 +++- .../org/apache/camel/reifier/AggregateReifier.java | 8 ++ .../aggregator/AggregateDiscardOnFailureTest.java | 140 +++++++++++++++++++++ 4 files changed, 219 insertions(+), 14 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index efb0b2a..c7dbb09 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -229,6 +229,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat private boolean completionOnNewCorrelationGroup; private AtomicInteger batchConsumerCounter = new AtomicInteger(); private boolean discardOnCompletionTimeout; + private boolean discardOnAggregationFailure; private boolean forceCompletionOnStop; private boolean completeAllOnStop; private long completionTimeoutCheckerInterval = 1000; @@ -477,7 +478,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat if (preCompletion && complete != null) { // need to pre complete the current group before we aggregate - doAggregationComplete(complete, list, key, originalExchange, oldExchange); + doAggregationComplete(complete, list, key, originalExchange, oldExchange, false); // as we complete the current group eager, we should indicate the new group is not complete complete = null; // and clear old/original exchange as we start on a new group @@ -490,11 +491,24 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } // aggregate the exchanges + boolean aggregateFailed = false; try { answer = onAggregation(oldExchange, newExchange); } catch (Throwable e) { - // must catch any exception from aggregation - throw new CamelExchangeException("Error occurred during aggregation", newExchange, e); + aggregateFailed = true; + if (isDiscardOnAggregationFailure()) { + // discard due failure in aggregation strategy + log.debug("Aggregation for correlation key {} discarding aggregated exchange: {} due to failure in AggregationStrategy caused by: {}", key, oldExchange, e.getMessage()); + complete = COMPLETED_BY_STRATEGY; + answer = oldExchange; + if (answer == null) { + // first message in group failed during aggregation and we should just discard this + return null; + } + } else { + // must catch any exception from aggregation + throw new CamelExchangeException("Error occurred during aggregation", newExchange, e); + } } if (answer == null) { throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange); @@ -529,19 +543,20 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } } - if (complete == null) { + if (!aggregateFailed && complete == null) { // only need to update aggregation repository if we are not complete doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer); } else { // if we are complete then add the answer to the list - doAggregationComplete(complete, list, key, originalExchange, answer); + doAggregationComplete(complete, list, key, originalExchange, answer, aggregateFailed); } log.trace("onAggregation +++ end +++ with correlation key: {}", key); return list; } - protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) { + protected void doAggregationComplete(String complete, List<Exchange> list, String key, + Exchange originalExchange, Exchange answer, boolean aggregateFailed) { if (COMPLETED_BY_CONSUMER.equals(complete)) { for (String batchKey : batchConsumerCorrelationKeys) { Exchange batchAnswer; @@ -554,7 +569,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat if (batchAnswer != null) { batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); - onCompletion(batchKey, originalExchange, batchAnswer, false); + onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed); list.add(batchAnswer); } } @@ -564,7 +579,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } else if (answer != null) { // we are complete for this exchange answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); - answer = onCompletion(key, originalExchange, answer, false); + answer = onCompletion(key, originalExchange, answer, false, aggregateFailed); } if (answer != null) { @@ -685,7 +700,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat return aggregationStrategy.aggregate(oldExchange, newExchange); } - protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) { + protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout, boolean aggregateFailed) { // store the correlation key as property before we remove so the repository has that information if (original != null) { original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); @@ -726,6 +741,15 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat redeliveryState.remove(aggregated.getExchangeId()); // the completion was from timeout and we should just discard it answer = null; + } else if (aggregateFailed && isDiscardOnAggregationFailure()) { + // discard due aggregation failed + log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); + // must confirm the discarded exchange + aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); + // and remove redelivery state as well + redeliveryState.remove(aggregated.getExchangeId()); + // the completion was failed during aggregation and we should just discard it + answer = null; } else { // the aggregated exchange should be published (sent out) answer = aggregated; @@ -1006,6 +1030,14 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat this.discardOnCompletionTimeout = discardOnCompletionTimeout; } + public boolean isDiscardOnAggregationFailure() { + return discardOnAggregationFailure; + } + + public void setDiscardOnAggregationFailure(boolean discardOnAggregationFailure) { + this.discardOnAggregationFailure = discardOnAggregationFailure; + } + public void setForceCompletionOnStop(boolean forceCompletionOnStop) { this.forceCompletionOnStop = forceCompletionOnStop; } @@ -1165,7 +1197,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // indicate it was completed by timeout answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_TIMEOUT); try { - answer = onCompletion(key, answer, answer, true); + answer = onCompletion(key, answer, answer, true, false); if (answer != null) { onSubmitCompletion(key, answer); } @@ -1213,7 +1245,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // indicate it was completed by interval exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_INTERVAL); try { - Exchange answer = onCompletion(key, exchange, exchange, false); + Exchange answer = onCompletion(key, exchange, exchange, false, false); if (answer != null) { onSubmitCompletion(key, answer); } @@ -1573,7 +1605,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat log.trace("Force completion triggered for correlation key: {}", key); // indicate it was completed by a force completion request exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE); - Exchange answer = onCompletion(key, exchange, exchange, false); + Exchange answer = onCompletion(key, exchange, exchange, false, false); if (answer != null) { onSubmitCompletion(key, answer); } @@ -1615,7 +1647,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat log.trace("Force completion triggered for correlation key: {}", key); // indicate it was completed by a force completion request exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, COMPLETED_BY_FORCE); - Exchange answer = onCompletion(key, exchange, exchange, false); + Exchange answer = onCompletion(key, exchange, exchange, false, false); if (answer != null) { onSubmitCompletion(key, answer); } diff --git a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 62c8b13..0392f9d 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -109,6 +109,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition @XmlAttribute private Boolean discardOnCompletionTimeout; @XmlAttribute + private Boolean discardOnAggregationFailure; + @XmlAttribute private Boolean forceCompletionOnStop; @XmlAttribute private Boolean completeAllOnStop; @@ -450,7 +452,15 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { this.discardOnCompletionTimeout = discardOnCompletionTimeout; } - + + public Boolean getDiscardOnAggregationFailure() { + return discardOnAggregationFailure; + } + + public void setDiscardOnAggregationFailure(Boolean discardOnAggregationFailure) { + this.discardOnAggregationFailure = discardOnAggregationFailure; + } + public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; } @@ -555,9 +565,24 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * Discards the aggregated message when aggregation failed (an exception was thrown from {@link AggregationStrategy}. + * This means the partly aggregated message is dropped and not sent out of the aggregator. + * <p/> + * This option cannot be used together with completionFromBatchConsumer. + * + * @return builder + */ + public AggregateDefinition discardOnAggregationFailure() { + setDiscardOnAggregationFailure(true); + return this; + } + + /** * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. + * <p/> + * This option cannot be used together with discardOnAggregationFailure. * * @return builder */ diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java index 4334755..3d00fd0 100644 --- a/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -100,6 +100,11 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { answer.setTimeoutCheckerExecutorService(timeoutThreadPool); answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool); + if (definition.getCompletionFromBatchConsumer() != null && definition.getCompletionFromBatchConsumer() + && definition.getDiscardOnAggregationFailure() != null && definition.getDiscardOnAggregationFailure()) { + throw new IllegalArgumentException("Cannot use both completionFromBatchConsumer and discardOnAggregationFailure on: " + definition); + } + // set other options answer.setParallelProcessing(parallel); if (definition.getOptimisticLocking() != null) { @@ -148,6 +153,9 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { if (definition.getDiscardOnCompletionTimeout() != null) { answer.setDiscardOnCompletionTimeout(definition.getDiscardOnCompletionTimeout()); } + if (definition.getDiscardOnAggregationFailure() != null) { + answer.setDiscardOnAggregationFailure(definition.getDiscardOnAggregationFailure()); + } if (definition.getForceCompletionOnStop() != null) { answer.setForceCompletionOnStop(definition.getForceCompletionOnStop()); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java new file mode 100644 index 0000000..35c5284 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnFailureTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class AggregateDiscardOnFailureTest extends ContextTestSupport { + + @Test + public void testAggregateDiscardOnAggregationFailureFirst() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123); + + mock.assertIsSatisfied(); + + // send in a new group's with same correlation key but should not fail + mock.reset(); + mock.expectedBodiesReceived("ABC", "DEF"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + + template.sendBodyAndHeader("direct:start", "D", "id", 456); + template.sendBodyAndHeader("direct:start", "E", "id", 456); + + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "F", "id", 456); + + mock.assertIsSatisfied(); + } + + @Test + public void testAggregateDiscardOnAggregationFailureMiddle() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123); + + mock.assertIsSatisfied(); + + // send in a new group's with same correlation key but should not fail + mock.reset(); + mock.expectedBodiesReceived("ABC", "DEF"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + + template.sendBodyAndHeader("direct:start", "D", "id", 456); + template.sendBodyAndHeader("direct:start", "E", "id", 456); + + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "F", "id", 456); + + mock.assertIsSatisfied(); + } + + @Test + public void testAggregateDiscardOnAggregationFailureLast() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "Kaboom", "id", 123); + + mock.assertIsSatisfied(); + + // send in a new group's with same correlation key but should not fail + mock.reset(); + mock.expectedBodiesReceived("ABC", "DEF"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + + template.sendBodyAndHeader("direct:start", "D", "id", 456); + template.sendBodyAndHeader("direct:start", "E", "id", 456); + + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "F", "id", 456); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + .aggregate(header("id"), new MyAggregationStrategy()) + .completionSize(3).completionTimeout(2000) + // and if an exception happens in aggregate then discard the message + .discardOnAggregationFailure() + .to("mock:aggregated"); + // END SNIPPET: e1 + } + }; + } + + private class MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if ("Kaboom".equals(newExchange.getMessage().getBody())) { + throw new IllegalArgumentException("Forced"); + } + + if (oldExchange == null) { + return newExchange; + } + + Object body = oldExchange.getMessage().getBody(String.class) + newExchange.getMessage().getBody(String.class); + oldExchange.getMessage().setBody(body); + return oldExchange; + } + } +}
