This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c0bf3d4  CAMEL-13287: AggregationStrategy - Access original exchange 
in aggregate method for multicast, recipient list and splitter
c0bf3d4 is described below

commit c0bf3d4e73889905605b0ad3331013b2025cea36
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Aug 7 07:14:01 2019 +0200

    CAMEL-13287: AggregationStrategy - Access original exchange in aggregate 
method for multicast, recipient list and splitter
---
 .../java/org/apache/camel/AggregationStrategy.java | 15 ++++
 .../apache/camel/processor/MulticastProcessor.java | 29 ++++----
 .../ShareUnitOfWorkAggregationStrategy.java        | 10 +++
 .../src/main/docs/eips/multicast-eip.adoc          |  5 ++
 .../src/main/docs/eips/recipientList-eip.adoc      | 25 ++-----
 core/camel-core/src/main/docs/eips/split-eip.adoc  | 17 ++---
 .../camel/builder/AggregationStrategyClause.java   |  5 ++
 ...ticastAggregationStrategyInputExchangeTest.java | 82 ++++++++++++++++++++
 ...ntListAggregationStrategyInputExchangeTest.java | 80 ++++++++++++++++++++
 ...litterAggregationStrategyInputExchangeTest.java | 87 ++++++++++++++++++++++
 10 files changed, 311 insertions(+), 44 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
index c018c17..6ae3f10 100644
--- a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
@@ -59,6 +59,21 @@ public interface AggregationStrategy {
     Exchange aggregate(Exchange oldExchange, Exchange newExchange);
 
     /**
+     * Aggregates an old and new exchange together to create a single combined 
exchange.
+     * <p/>
+     * Important: Only Multicast and Recipient List EIP supports this method 
with access to the input exchange. All other EIPs
+     * does not and uses the {@link #aggregate(Exchange, Exchange)} method 
instead.
+     *
+     * @param oldExchange    the oldest exchange (is <tt>null</tt> on first 
aggregation as we only have the new exchange)
+     * @param newExchange    the newest exchange (can be <tt>null</tt> if 
there was no data possible to acquire)
+     * @param inputExchange  the input exchange (input to the EIP)
+     * @return a combined composite of the two exchanges, favor returning the 
<tt>oldExchange</tt> whenever possible
+     */
+    default Exchange aggregate(Exchange oldExchange, Exchange newExchange, 
Exchange inputExchange) {
+        return aggregate(oldExchange, newExchange);
+    }
+
+    /**
      * Indicates if this aggregation strategy uses pre-completion mode.
      * @return <tt>true</tt> if this strategy uses pre-completion mode, or 
<tt>false</tt> otherwise.
      */
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 64b20c9..9172b76 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -65,10 +65,10 @@ import 
org.apache.camel.util.concurrent.AsyncCompletionService;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
-
 /**
  * Implements the Multicast pattern to send a message exchange to a number of
  * endpoints, each endpoint receiving a copy of the message exchange.
+ *
  * @see Pipeline
  */
 public class MulticastProcessor extends AsyncProcessorSupport implements 
Navigate<Processor>, Traceable, IdAware {
@@ -281,7 +281,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
 
         @Override
         public String toString() {
-            return "Step[" + original.getExchangeId() + "," + 
MulticastProcessor.this + "]";
+            return "MulticastTask[" + original.getExchangeId() + "," + 
MulticastProcessor.this + "]";
         }
 
         @Override
@@ -370,7 +370,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                 try {
                     Exchange exchange;
                     while (!done.get() && (exchange = completion.poll()) != 
null) {
-                        doAggregate(result, exchange);
+                        doAggregate(result, exchange, original);
                         if (nbAggregated.incrementAndGet() >= 
nbExchangeSent.get() && allSent.get()) {
                             doDone(result.get(), true);
                         }
@@ -398,7 +398,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
                                     nbAggregated.getAndIncrement(), 
nbExchangeSent.get(), timeout);
                         }
                         if (exchange != null) {
-                            doAggregate(result, exchange);
+                            doAggregate(result, exchange, original);
                             nbAggregated.incrementAndGet();
                         }
                     }
@@ -544,14 +544,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
      *
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregateInternal(AggregationStrategy, AtomicReference, 
org.apache.camel.Exchange)
-     * @see #doAggregateSync(AggregationStrategy, AtomicReference, 
org.apache.camel.Exchange)
+     * @param inputExchange the input exchange that was sent as input to this 
EIP
      */
-    protected void doAggregate(AtomicReference<Exchange> result, Exchange 
exchange) {
+    protected void doAggregate(AtomicReference<Exchange> result, Exchange 
exchange, Exchange inputExchange) {
         if (parallelAggregate) {
-            doAggregateInternal(getAggregationStrategy(exchange), result, 
exchange);
+            doAggregateInternal(getAggregationStrategy(exchange), result, 
exchange, inputExchange);
         } else {
-            doAggregateSync(getAggregationStrategy(exchange), result, 
exchange);
+            doAggregateSync(getAggregationStrategy(exchange), result, 
exchange, inputExchange);
         }
     }
 
@@ -562,10 +561,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
      * @param strategy the aggregation strategy to use
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregateInternal(AggregationStrategy, AtomicReference, 
org.apache.camel.Exchange)
+     * @param inputExchange the input exchange that was sent as input to this 
EIP
      */
-    protected synchronized void doAggregateSync(AggregationStrategy strategy, 
AtomicReference<Exchange> result, Exchange exchange) {
-        doAggregateInternal(strategy, result, exchange);
+    private synchronized void doAggregateSync(AggregationStrategy strategy, 
AtomicReference<Exchange> result, Exchange exchange, Exchange inputExchange) {
+        doAggregateInternal(strategy, result, exchange, inputExchange);
     }
 
     /**
@@ -576,14 +575,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
      * @param strategy the aggregation strategy to use
      * @param result   the current result
      * @param exchange the exchange to be added to the result
-     * @see #doAggregateSync
+     * @param inputExchange the input exchange that was sent as input to this 
EIP
      */
-    protected void doAggregateInternal(AggregationStrategy strategy, 
AtomicReference<Exchange> result, Exchange exchange) {
+    private void doAggregateInternal(AggregationStrategy strategy, 
AtomicReference<Exchange> result, Exchange exchange, Exchange inputExchange) {
         if (strategy != null) {
             // prepare the exchanges for aggregation
             Exchange oldExchange = result.get();
             ExchangeHelper.prepareAggregation(oldExchange, exchange);
-            result.set(strategy.aggregate(oldExchange, exchange));
+            result.set(strategy.aggregate(oldExchange, exchange, 
inputExchange));
         }
     }
 
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index 5628ed5..b367070 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -94,6 +94,16 @@ public final class ShareUnitOfWorkAggregationStrategy 
extends ServiceSupport imp
         return answer;
     }
 
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange, 
Exchange inputExchange) {
+        // aggregate using the actual strategy first
+        Exchange answer = strategy.aggregate(oldExchange, newExchange, 
inputExchange);
+        // ensure any errors is propagated from the new exchange to the answer
+        propagateFailure(answer, newExchange);
+
+        return answer;
+    }
+
     protected void propagateFailure(Exchange answer, Exchange newExchange) {
         // if new exchange failed then propagate all the error related 
properties to the answer
         boolean exceptionHandled = 
hasExceptionBeenHandledByErrorHandler(newExchange);
diff --git a/core/camel-core/src/main/docs/eips/multicast-eip.adoc 
b/core/camel-core/src/main/docs/eips/multicast-eip.adoc
index d08b278..2d699e7 100644
--- a/core/camel-core/src/main/docs/eips/multicast-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/multicast-eip.adoc
@@ -59,6 +59,11 @@ from("direct:start")
   .to("mock:result");
 ----
 
+NOTE: The Multicast, Recipient List, and Splitter EIPs have special support 
for using `AggregationStrategy` with
+access to the original input exchange. You may want to use this when you 
aggregate messages and
+there has been a failure in one of the messages, which you then want to enrich 
on the original
+input message and return as response; its the aggregate method with 3 exchange 
parameters.
+
 == Stop processing in case of exception
 
 The mutlicast EIP will by default continue to process
diff --git a/core/camel-core/src/main/docs/eips/recipientList-eip.adoc 
b/core/camel-core/src/main/docs/eips/recipientList-eip.adoc
index 19203fe..da32e34 100644
--- a/core/camel-core/src/main/docs/eips/recipientList-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/recipientList-eip.adoc
@@ -211,8 +211,12 @@ And in XML it is again an attribute on the recipient list 
tag.
 <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
 ----
 
+NOTE: The Multicast, Recipient List, and Splitter EIPs have special support 
for using `AggregationStrategy` with
+access to the original input exchange. You may want to use this when you 
aggregate messages and
+there has been a failure in one of the messages, which you then want to enrich 
on the original
+input message and return as response; its the aggregate method with 3 exchange 
parameters.
+
 == Knowing which endpoint when using custom AggregationStrategy
-*Available as of Camel 2.12*
 
 When using a custom `AggregationStrategy` then the `aggregate` method is 
always invoked in sequential order
 (also if parallel processing is enabled) of the endpoints the Recipient List 
is using.
@@ -315,23 +319,7 @@ from("direct:c").to("mock:C").setBody(constant("C"));
 This timeout feature is also supported by Splitter and both multicast and 
recipientList.
 ===
 
-By default if a timeout occurs the `AggregationStrategy` is not invoked. 
However you can implement a special version `TimeoutAwareAggregationStrategy`
-
-[source,java]
-----
-public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {
-
-    /**
-     * A timeout occurred
-     *
-     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first 
aggregation as we only have the new exchange)
-     * @param index        the index
-     * @param total        the total
-     * @param timeout      the timeout value in millis
-     */
-    void timeout(Exchange oldExchange, int index, int total, long timeout);
-----
-
+By default if a timeout occurs the `AggregationStrategy` is not invoked. 
However you can implement the `timeout` method:
 This allows you to deal with the timeout in the `AggregationStrategy` if you 
really need to.
 
 [NOTE]
@@ -345,7 +333,6 @@ The remainders will be cancelled. Camel will also only 
invoke the `timeout` meth
 See details at the Multicast EIP
 
 == Using ExchangePattern in recipients
-*Available as of Camel 2.15*
 
 The recipient list will by default use the current Exchange Pattern. Though 
one can imagine use-cases where one wants to send
 a message to a recipient using a different exchange pattern. For example you 
may have a route that initiates as an `InOnly` route,
diff --git a/core/camel-core/src/main/docs/eips/split-eip.adoc 
b/core/camel-core/src/main/docs/eips/split-eip.adoc
index 6109aef..f2ea37b 100644
--- a/core/camel-core/src/main/docs/eips/split-eip.adoc
+++ b/core/camel-core/src/main/docs/eips/split-eip.adoc
@@ -136,6 +136,12 @@ There is a sample on this page (Split aggregate 
request/reply sample).
 Notice its the same strategy as the Aggregate EIP supports.
 This Splitter can be viewed as having a build in light weight Aggregate EIP.
 
+NOTE: The Multicast, Recipient List, and Splitter EIPs have special support 
for using `AggregationStrategy` with
+access to the original input exchange. You may want to use this when you 
aggregate messages and
+there has been a failure in one of the messages, which you then want to enrich 
on the original
+input message and return as response; its the aggregate method with 3 exchange 
parameters.
+
+
 == Parallel execution of distinct parts
 
 If you want to execute all parts in parallel you can use the 
`parallelProcessing` option as show:
@@ -183,8 +189,6 @@ from("direct:streaming")
 
 There are two tokenizers that can be used to tokenize an XML payload. The 
first tokenizer uses the same principle as in the text tokenizer to scan the 
XML payload and extract a sequence of tokens.
 
-*Available as of Camel 2.9*
-
 If you have a big XML payload, from a file source, and want to split it in 
streaming mode, then you can use the Tokenizer language with start/end tokens 
to do this with low memory footprint.
 
 [NOTE]
@@ -258,14 +262,10 @@ from("file:inbox")
      .to("activemq:queue:order");
 ----
 
-Available as of Camel 2.13.1, you can set the above `inheritNamsepaceTagName` 
property to `*` to include the preceding context in each token (i.e., 
generating each token enclosed in its ancestor elements). It is noted that each 
token must share the same ancestor elements in this case.
-
+You can set the above `inheritNamsepaceTagName` property to `*` to include the 
preceding context in each token (i.e., generating each token enclosed in its 
ancestor elements). It is noted that each token must share the same ancestor 
elements in this case.
 The above tokenizer works well on simple structures but has some inherent 
limitations in handling more complex XML structures.
 
-*Available as of Camel 2.14*
-
 The second tokenizer uses a StAX parser to overcome these limitations. This 
tokenizer recognizes XML namespaces and also handles simple and complex XML 
structures more naturally and efficiently.
-
 To split using this tokenizer at {urn:shop}order, we can write
 
 [source,java]
@@ -344,7 +344,6 @@ it results in invalid xml snippets after the split. For 
example the snippet coul
 ----
 
 == Splitting files by grouping N lines together
-*Available as of Camel 2.10*
 
 The Tokenizer language has a new option group that allows you to group N parts 
together, for example to split big files into chunks of 1000 lines.
 
@@ -610,12 +609,10 @@ And using XML DSL you specify it as follows:
 ----
 
 == Using onPrepare to execute custom logic when preparing messages
-*Available as of Camel 2.8*
 
 See details at Multicast EIP
 
 == Sharing unit of work
-*Available as of Camel 2.8*
 
 The Splitter will by default not share unit of work between the parent 
exchange and each split exchange.
 This means each sub exchange has its own individual unit of work.
diff --git 
a/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
 
b/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
index 480c6f2..82a8c68 100644
--- 
a/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
+++ 
b/core/camel-core/src/main/java/org/apache/camel/builder/AggregationStrategyClause.java
@@ -37,6 +37,11 @@ public class AggregationStrategyClause<T> implements 
AggregationStrategy {
         return ObjectHelper.notNull(strategy, 
"AggregationStrategy").aggregate(oldExchange, newExchange);
     }
 
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange, 
Exchange inputExchange) {
+        return ObjectHelper.notNull(strategy, 
"AggregationStrategy").aggregate(oldExchange, newExchange, inputExchange);
+    }
+
     // *******************************
     // Exchange
     // *******************************
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyInputExchangeTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyInputExchangeTest.java
new file mode 100644
index 0000000..979a72d
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyInputExchangeTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MulticastAggregationStrategyInputExchangeTest extends 
ContextTestSupport {
+
+    @Test
+    public void testInputExchange() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", p -> 
p.getMessage().setBody("Hello World"));
+        assertNotNull(out);
+        assertEquals("Hello World", out.getMessage().getBody());
+        assertEquals("Forced", out.getMessage().getHeader("FailedDue"));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").multicast(new MyAggregateBean())
+                    .to("direct:a")
+                    .to("direct:b")
+                .end();
+
+                from("direct:a")
+                        .setHeader("foo", constant("123"))
+                        .transform(constant("A"))
+                        .to("mock:a");
+                from("direct:b")
+                        .setHeader("bar", constant("456"))
+                        .transform(constant("B"))
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            // NOT in use
+            return null;
+        }
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange, 
Exchange inputExchange) {
+            if (newExchange.isFailed()) {
+                inputExchange.getMessage().setHeader("FailedDue", 
newExchange.getException().getMessage());
+                return inputExchange;
+            }
+            // dont care so much about merging in this unit test
+            return newExchange;
+        }
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/RecipientListAggregationStrategyInputExchangeTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/RecipientListAggregationStrategyInputExchangeTest.java
new file mode 100644
index 0000000..5386ab7
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/RecipientListAggregationStrategyInputExchangeTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class RecipientListAggregationStrategyInputExchangeTest extends 
ContextTestSupport {
+
+    @Test
+    public void testInputExchange() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", p -> 
p.getMessage().setBody("Hello World"));
+        assertNotNull(out);
+        assertEquals("Hello World", out.getMessage().getBody());
+        assertEquals("Forced", out.getMessage().getHeader("FailedDue"));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    
.recipientList(constant("direct:a,direct:b")).aggregationStrategy(new 
MyAggregateBean());
+
+                from("direct:a")
+                        .setHeader("foo", constant("123"))
+                        .transform(constant("A"))
+                        .to("mock:a");
+                from("direct:b")
+                        .setHeader("bar", constant("456"))
+                        .transform(constant("B"))
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            // NOT in use
+            return null;
+        }
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange, 
Exchange inputExchange) {
+            if (newExchange.isFailed()) {
+                inputExchange.getMessage().setHeader("FailedDue", 
newExchange.getException().getMessage());
+                return inputExchange;
+            }
+            // dont care so much about merging in this unit test
+            return newExchange;
+        }
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterAggregationStrategyInputExchangeTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterAggregationStrategyInputExchangeTest.java
new file mode 100644
index 0000000..f6831ea
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterAggregationStrategyInputExchangeTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class SplitterAggregationStrategyInputExchangeTest extends 
ContextTestSupport {
+
+    @Test
+    public void testInputExchange() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        Exchange out = template.request("direct:start", p -> 
p.getMessage().setBody("A,B"));
+        assertNotNull(out);
+        assertEquals("A,B", out.getMessage().getBody());
+        assertEquals("Forced", out.getMessage().getHeader("FailedDue"));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(body(), new MyAggregateBean())
+                        .choice()
+                            .when(body().contains("A"))
+                                .to("direct:a")
+                            .otherwise()
+                                .to("direct:b")
+                        .end()
+                    .end();
+
+                from("direct:a")
+                        .setHeader("foo", constant("123"))
+                        .transform(constant("A"))
+                        .to("mock:a");
+                from("direct:b")
+                        .setHeader("bar", constant("456"))
+                        .transform(constant("B"))
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            // NOT in use
+            return null;
+        }
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange, 
Exchange inputExchange) {
+            if (newExchange.isFailed()) {
+                inputExchange.getMessage().setHeader("FailedDue", 
newExchange.getException().getMessage());
+                return inputExchange;
+            }
+            // dont care so much about merging in this unit test
+            return newExchange;
+        }
+    }
+
+}

Reply via email to