Author: davsclaus
Date: Sat Apr 7 08:55:48 2012
New Revision: 1310710
URL: http://svn.apache.org/viewvc?rev=1310710&view=rev
Log:
Improved catching errors from aggregation strategy.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Sat Apr 7 08:55:48 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Producer;
@@ -160,7 +161,15 @@ public class Enricher extends ServiceSup
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- Exchange aggregatedExchange =
aggregationStrategy.aggregate(exchange, resourceExchange);
+ // must catch any exception from aggregation
+ Exchange aggregatedExchange;
+ try {
+ aggregatedExchange = aggregationStrategy.aggregate(exchange,
resourceExchange);
+ } catch (Throwable e) {
+ exchange.setException(new CamelExchangeException("Error
occurred during aggregation", exchange, e));
+ callback.done(true);
+ return true;
+ }
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving
pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Sat Apr 7 08:55:48 2012
@@ -649,6 +649,7 @@ public class MulticastProcessor extends
return;
}
+ // must catch any exceptions from aggregation
try {
doAggregate(getAggregationStrategy(subExchange),
result, subExchange);
} catch (Throwable e) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Sat Apr 7 08:55:48 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
@@ -140,7 +141,13 @@ public class PollEnricher extends Servic
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
- Exchange aggregatedExchange =
aggregationStrategy.aggregate(exchange, resourceExchange);
+ // must catch any exception from aggregation
+ Exchange aggregatedExchange;
+ try {
+ aggregatedExchange = aggregationStrategy.aggregate(exchange,
resourceExchange);
+ } catch (Throwable e) {
+ throw new CamelExchangeException("Error occurred during
aggregation", exchange, e);
+ }
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving
pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sat Apr 7 08:55:48 2012
@@ -240,7 +240,12 @@ public class AggregateProcessor extends
// prepare the exchanges for aggregation and aggregate it
ExchangeHelper.prepareAggregation(oldExchange, newExchange);
- answer = onAggregation(oldExchange, exchange);
+ // must catch any exception from aggregation
+ try {
+ answer = onAggregation(oldExchange, exchange);
+ } catch (Throwable e) {
+ throw new CamelExchangeException("Error occurred during
aggregation", exchange, e);
+ }
if (answer == null) {
throw new CamelExchangeException("AggregationStrategy " +
aggregationStrategy + " returned null which is not allowed", exchange);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Sat Apr 7 08:55:48 2012
@@ -28,7 +28,9 @@ public interface TimeoutAwareAggregation
// TODO: In Camel 3.0 we should move this to org.apache.camel package
/**
- * A timeout occurred
+ * A timeout occurred.
+ * <p/>
+ * <b>Important: </b> This method must <b>not</b> throw any exceptions.
*
* @param oldExchange the current aggregated exchange, or the original
{@link Exchange} if no aggregation
* has been done before the timeout occurred