This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new d299b841993 CAMEL-19478: Add synchronous option to relevant EIPs
d299b841993 is described below

commit d299b84199377b8195fc79ce849650deb3a06615
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jun 21 11:27:57 2023 +0200

    CAMEL-19478: Add synchronous option to relevant EIPs
---
 .../docs/modules/eips/pages/multicast-eip.adoc     |   7 ++
 .../docs/modules/eips/pages/recipientList-eip.adoc |   7 ++
 .../main/docs/modules/eips/pages/split-eip.adoc    | 115 ++++++++++++++++-----
 .../org/apache/camel/model/multicast.json          |   3 +-
 .../org/apache/camel/model/recipientList.json      |   3 +-
 .../resources/org/apache/camel/model/split.json    |   3 +-
 .../apache/camel/model/MulticastDefinition.java    |  92 +++++++++++++++++
 .../camel/model/RecipientListDefinition.java       |  96 ++++++++++++++++-
 .../org/apache/camel/model/SplitDefinition.java    |  54 ++++++++++
 .../apache/camel/processor/MulticastProcessor.java |  34 ++++++
 .../org/apache/camel/processor/RecipientList.java  |  10 ++
 .../org/apache/camel/reifier/MulticastReifier.java |   2 +
 .../apache/camel/reifier/RecipientListReifier.java |   2 +
 .../org/apache/camel/reifier/SplitReifier.java     |   2 +
 .../MulticastParallelSynchronousTest.java          |  74 +++++++++++++
 .../RecipientListParallelSynchronousTest.java      |  77 ++++++++++++++
 .../processor/SplitParallelSynchronousTest.java    |  68 ++++++++++++
 .../java/org/apache/camel/xml/in/ModelParser.java  |   3 +
 .../dsl/yaml/deserializers/ModelDeserializers.java |  18 ++++
 .../generated/resources/schema/camel-yaml-dsl.json |   9 ++
 .../generated/resources/schema/camelYamlDsl.json   |   9 ++
 21 files changed, 658 insertions(+), 30 deletions(-)

diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc
index a8c01165237..a3564b26af5 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/multicast-eip.adoc
@@ -94,6 +94,13 @@ And in XML:
 </route>
 ----
 
+[IMPORTANT]
+====
+When parallel processing is enabled, then the Camel routing engin will 
continue processing
+using last used thread from the parallel thread pool. However, if you want to 
use the original
+thread that called the multicast, then make sure to enable the synchronous 
option as well.
+====
+
 === Ending a Multicast block
 
 You may want to continue routing the exchange after the Multicast EIP. In Java 
DSL you need to use `end()`
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
index 227d071f47b..dd5525a6ef1 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/recipientList-eip.adoc
@@ -164,6 +164,13 @@ And in XML it is an attribute on `<recipientList>`:
 </route>
 ----
 
+[IMPORTANT]
+====
+When parallel processing is enabled, then the Camel routing engin will 
continue processing
+using last used thread from the parallel thread pool. However, if you want to 
use the original
+thread that called the recipient list, then make sure to enable the 
synchronous option as well.
+====
+
 ==== Using custom thread pool
 
 A thread pool is only used for `parallelProcessing`. You supply your own 
custom thread pool via the `ExecutorServiceStrategy` (see Camel's Threading 
Model),
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc
index 362d2482115..ba5f4d5f89d 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/split-eip.adoc
@@ -90,6 +90,10 @@ so you could use any of the supported languages such as
 xref:components:languages:simple-language.adoc[Simple], 
xref:components:languages:xpath-language.adoc[XPath],
 xref:components:languages:jsonpath-language.adoc[JSonPath], 
xref:components:languages:groovy-language.adoc[Groovy] to perform the split.
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("activemq:my.queue")
@@ -97,8 +101,8 @@ from("activemq:my.queue")
         .to("file:some/directory")
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -109,6 +113,7 @@ And in XML:
     </split>
 </route>
 ----
+====
 
 === Splitting the message body
 
@@ -130,6 +135,10 @@ the Split EIP will be split into a single message (the 
same).
 
 To use this with the splitter you should _just_ use body as the expression:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:splitUsingBody")
@@ -137,6 +146,8 @@ from("direct:splitUsingBody")
         .to("mock:result");
 ----
 
+XML::
++
 In XML you use xref:components:languages:simple-language.adoc[Simple] to refer 
to the message body:
 
 [source,xml]
@@ -149,6 +160,7 @@ In XML you use 
xref:components:languages:simple-language.adoc[Simple] to refer t
   </split>
 </route>
 ----
+====
 
 === Splitting with parallel processing
 
@@ -157,6 +169,10 @@ is processed by its own thread in parallel.
 
 The example below enabled parallel mode:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:a")
@@ -166,8 +182,8 @@ from("direct:a")
     .to("direct:z");
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -180,6 +196,14 @@ And in XML:
     </split>
 </route>
 ----
+====
+
+[IMPORTANT]
+====
+When parallel processing is enabled, then the Camel routing engin will 
continue processing
+using last used thread from the parallel thread pool. However, if you want to 
use the original
+thread that called the splitter, then make sure to enable the synchronous 
option as well.
+====
 
 === Ending a Split block
 
@@ -190,6 +214,10 @@ In the example above then sending to mock:result happens 
after the Spllit EIP ha
 In other words the message should finish splitting the entire message before 
the message
 continues.
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:a")
@@ -201,6 +229,8 @@ from("direct:a")
   .to("mock:result");
 ----
 
+XML::
++
 And in XML its intuitive as `</split>` marks the end of the block:
 
 [source,xml]
@@ -216,6 +246,7 @@ And in XML its intuitive as `</split>` marks the end of the 
block:
     <to uri="mock:result"/>
 </route>
 ----
+====
 
 === What is returned from Split EIP when its complete
 
@@ -230,6 +261,10 @@ as a single response exchange, that becomes the outgoing 
exchange after the Spli
 
 The example now aggregates with the `MyAggregationStrategy` class:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:start")
@@ -241,6 +276,8 @@ from("direct:start")
   .to("mock:result");
 ----
 
+XML::
++
 And in XML we can refer to the FQN class name with `#class:` syntax as shown 
below:
 
 [source,xml]
@@ -256,6 +293,7 @@ And in XML we can refer to the FQN class name with 
`#class:` syntax as shown bel
     <to uri="mock:result"/>
 </route>
 ----
+====
 
 [NOTE]
 ====
@@ -276,6 +314,10 @@ The Split EIP operates in two modes when splitting:
 
 You can split in streaming mode as shown:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:streaming")
@@ -283,8 +325,8 @@ from("direct:streaming")
     .to("activemq:my.parts");
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -295,10 +337,15 @@ And in XML:
     </split>
 </route>
 ----
+====
 
 You can also supply a custom xref:components:languages:bean-language.adoc[Bean]
 to perform the splitting in streaming mode like this:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:streaming")
@@ -306,8 +353,8 @@ from("direct:streaming")
     .to("activemq:my.parts")
 ----
 
-And in XML:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -318,6 +365,7 @@ And in XML:
     </split>
 </route>
 ----
+====
 
 Then the custom bean could for example be implemented as follows:
 
@@ -393,15 +441,18 @@ Now to split this big file using 
xref:components:languages:xpath-language.adoc[X
 would cause the entire content to be loaded into memory. So instead we can use 
the
 xref:components:languages:tokenize-language.adoc[Tokenize] language to do this 
as follows:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("file:inbox")
   .split().tokenizeXML("order").streaming()
      .to("activemq:queue:order");
 ----
-
-In XML the route would be as follows:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -412,6 +463,7 @@ In XML the route would be as follows:
   </split>
 </route>
 ----
+====
 
 This will split the file using the tag name of the child nodes (more precisely 
speaking, the local name of the element without its namespace prefix if any),
 which mean it will grab the content between the `<order>` and `</order>` tags 
(incl. the tags).
@@ -427,6 +479,19 @@ So for example a split message would be structured as 
follows:
 
 If you want to inherit namespaces from a root/parent tag, then you can do this 
as well by providing the name of the root/parent tag:
 
+[tabs]
+====
+Java::
++
+[source,java]
+----
+from("file:inbox")
+  .split().tokenizeXML("order", "orders").streaming()
+     .to("activemq:queue:order");
+----
+
+XML::
++
 [source,xml]
 ----
 <route>
@@ -437,15 +502,7 @@ If you want to inherit namespaces from a root/parent tag, 
then you can do this a
   </split>
 </route>
 ----
-
-And in Java DSL it is done like this:
-
-[source,java]
-----
-from("file:inbox")
-  .split().tokenizeXML("order", "orders").streaming()
-     .to("activemq:queue:order");
-----
+====
 
 You can set `inheritNamsepaceTagName` property to `*` to include the preceding 
context in each token (i.e., generating each token enclosed in its ancestor 
elements). It is noted that each token must share the same ancestor elements in 
this case.
 The above tokenizer works well on simple structures but has some inherent 
limitations in handling more complex XML structures.
@@ -549,6 +606,10 @@ can be used for grouping N parts together, for example to 
split big files into c
 
 Doing this is easy as the following example shows:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("file:inbox")
@@ -556,8 +617,8 @@ from("file:inbox")
      .to("activemq:queue:order");
 ----
 
-And in XML DSL:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -568,6 +629,7 @@ And in XML DSL:
   </split>
 </route>
 ----
+====
 
 The `group` value must be a positive number dictating how many elements to 
combine together in a group.
 Each part will be combined using the token.
@@ -706,6 +768,10 @@ handle it. You can do this by specifying that it should 
stop in case of an
 exception occurred. This is done by the `stopOnException` option as
 shown below:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:start")
@@ -716,8 +782,8 @@ from("direct:start")
     .to("direct:cheese");
 ----
 
-And using XML DSL you specify it as follows:
-
+XML::
++
 [source,xml]
 ----
 <route>
@@ -730,6 +796,7 @@ And using XML DSL you specify it as follows:
     <to uri="direct:cheese"/>
 </route>
 ----
+====
 
 In the example above, then `MyProcessor` is causing a failure and throws an 
exception.
 This means the Split EIP will stop after this, and not split anymore.
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
index 1d1113310f9..4d58a81816f 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/multicast.json
@@ -16,7 +16,8 @@
     "aggregationStrategyMethodName": { "kind": "attribute", "displayName": 
"Aggregation Strategy Method Name", "label": "advanced", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "This option can be used to 
explicit declare the method name to use, when using POJOs as the 
AggregationStrategy." },
     "aggregationStrategyMethodAllowNull": { "kind": "attribute", 
"displayName": "Aggregation Strategy Method Allow Null", "label": "advanced", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If this option is false then the aggregate method is not 
used if there was no data to enrich. If this option is true then null values is 
used as the oldExchange (when no data to enr [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel 
Aggregate", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "If enabled then the 
aggregate method on AggregationStrategy can be called concurrently. Notice that 
this would require the implementation of AggregationStrategy to be implemented 
as thread-safe. By default this is fals [...]
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel 
Processing", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "If enabled then sending messages to the 
multicasts occurs concurrently. Note the caller thread will still wait until 
all messages has been fully processed, before it continues. Its only the 
sending and processing the replies from the mul [...]
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel 
Processing", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "If enabled then sending messages to the 
multicasts occurs concurrently. Note the caller thread will still wait until 
all messages has been fully processed, before it continues. Its only the 
sending and processing the replies from the mul [...]
+    "synchronous": { "kind": "attribute", "displayName": "Synchronous", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Sets whether synchronous processing should be strictly 
used. When enabled then the same thread is used to continue routing after the 
multicast is complete, even if parallel processing is enabled." },
     "streaming": { "kind": "attribute", "displayName": "Streaming", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If enabled then Camel will process replies out-of-order, 
eg in the order they come back. If disabled, Camel will process replies in the 
same order as defined by the multicast." },
     "stopOnException": { "kind": "attribute", "displayName": "Stop On 
Exception", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Will now stop further 
processing if an exception or failure occurred during processing of an 
org.apache.camel.Exchange and the caused exception will be thrown. Will also 
stop if processing the exchange failed (has a fault [...]
     "timeout": { "kind": "attribute", "displayName": "Timeout", "label": 
"advanced", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "0", "description": "Sets a total timeout specified in millis, 
when using parallel processing. If the Multicast hasn't been able to send and 
process all replies within the given timeframe, then the timeout triggers and 
the Multicast breaks out and continues. Not [...]
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
index 13b1135ecb2..3580661323a 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/recipientList.json
@@ -18,7 +18,8 @@
     "aggregationStrategyMethodName": { "kind": "attribute", "displayName": 
"Aggregation Strategy Method Name", "label": "advanced", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "This option can be used to 
explicit declare the method name to use, when using POJOs as the 
AggregationStrategy." },
     "aggregationStrategyMethodAllowNull": { "kind": "attribute", 
"displayName": "Aggregation Strategy Method Allow Null", "label": "advanced", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If this option is false then the aggregate method is not 
used if there was no data to enrich. If this option is true then null values is 
used as the oldExchange (when no data to enr [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel 
Aggregate", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "If enabled then the 
aggregate method on AggregationStrategy can be called concurrently. Notice that 
this would require the implementation of AggregationStrategy to be implemented 
as thread-safe. By default this is fals [...]
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel 
Processing", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "If enabled then sending messages to the 
recipients occurs concurrently. Note the caller thread will still wait until 
all messages has been fully processed, before it continues. Its only the 
sending and processing the replies from the rec [...]
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel 
Processing", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "If enabled then sending messages to the 
recipients occurs concurrently. Note the caller thread will still wait until 
all messages has been fully processed, before it continues. Its only the 
sending and processing the replies from the rec [...]
+    "synchronous": { "kind": "attribute", "displayName": "Synchronous", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Sets whether synchronous processing should be strictly 
used. When enabled then the same thread is used to continue routing after the 
recipient list is complete, even if parallel processing is enabled." },
     "timeout": { "kind": "attribute", "displayName": "Timeout", "required": 
false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "0", "description": "Sets 
a total timeout specified in millis, when using parallel processing. If the 
Recipient List hasn't been able to send and process all replies within the 
given timeframe, then the timeout triggers and the Recipient List breaks out 
and continues. Notice if you  [...]
     "executorService": { "kind": "attribute", "displayName": "Executor 
Service", "label": "advanced", "required": false, "type": "object", "javaType": 
"java.util.concurrent.ExecutorService", "deprecated": false, "autowired": 
false, "secret": false, "description": "To use a custom Thread Pool to be used 
for parallel processing. Notice if you set this option, then parallel 
processing is automatic implied, and you do not have to enable that option as 
well." },
     "stopOnException": { "kind": "attribute", "displayName": "Stop On 
Exception", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Will now stop further 
processing if an exception or failure occurred during processing of an 
org.apache.camel.Exchange and the caused exception will be thrown. Will also 
stop if processing the exchange failed (has a fault [...]
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
index 5087b2b7ed2..001943dc58d 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/split.json
@@ -18,7 +18,8 @@
     "aggregationStrategyMethodName": { "kind": "attribute", "displayName": 
"Aggregation Strategy Method Name", "label": "advanced", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "This option can be used to 
explicit declare the method name to use, when using POJOs as the 
AggregationStrategy." },
     "aggregationStrategyMethodAllowNull": { "kind": "attribute", 
"displayName": "Aggregation Strategy Method Allow Null", "label": "advanced", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If this option is false then the aggregate method is not 
used if there was no data to enrich. If this option is true then null values is 
used as the oldExchange (when no data to enr [...]
     "parallelAggregate": { "kind": "attribute", "displayName": "Parallel 
Aggregate", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "If enabled then the 
aggregate method on AggregationStrategy can be called concurrently. Notice that 
this would require the implementation of AggregationStrategy to be implemented 
as thread-safe. By default this is fals [...]
-    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel 
Processing", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "If enabled then processing each split 
messages occurs concurrently. Note the caller thread will still wait until all 
messages has been fully processed, before it continues. It's only processing 
the sub messages from the splitter which ha [...]
+    "parallelProcessing": { "kind": "attribute", "displayName": "Parallel 
Processing", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "If enabled then processing each split 
messages occurs concurrently. Note the caller thread will still wait until all 
messages has been fully processed, before it continues. It's only processing 
the sub messages from the splitter which ha [...]
+    "synchronous": { "kind": "attribute", "displayName": "Synchronous", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Sets whether synchronous processing should be strictly 
used. When enabled then the same thread is used to continue routing after the 
split is complete, even if parallel processing is enabled." },
     "streaming": { "kind": "attribute", "displayName": "Streaming", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "When in streaming mode, then the splitter splits the 
original message on-demand, and each split message is processed one by one. 
This reduces memory usage as the splitter do not split all the messages first, 
but then we do not know the total size, and ther [...]
     "stopOnException": { "kind": "attribute", "displayName": "Stop On 
Exception", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Will now stop further 
processing if an exception or failure occurred during processing of an 
org.apache.camel.Exchange and the caused exception will be thrown. Will also 
stop if processing the exchange failed (has a fault [...]
     "timeout": { "kind": "attribute", "displayName": "Timeout", "label": 
"advanced", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "0", "description": "Sets a total timeout specified in millis, 
when using parallel processing. If the Splitter hasn't been able to split and 
process all the sub messages within the given timeframe, then the timeout 
triggers and the Splitter breaks out and contin [...]
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 91fd7dee5bd..f20522c0fb9 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -65,6 +65,9 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition>
     private String parallelProcessing;
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
+    private String synchronous;
+    @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
     private String streaming;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean")
@@ -176,6 +179,10 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition>
      * until all messages has been fully processed, before it continues. Its 
only the sending and processing the replies
      * from the multicasts which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the multicast, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public MulticastDefinition parallelProcessing() {
@@ -188,6 +195,26 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition>
      * until all messages has been fully processed, before it continues. Its 
only the sending and processing the replies
      * from the multicasts which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the multicast, then
+     * make sure to enable the synchronous option as well.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelProcessing(String parallelProcessing) {
+        setParallelProcessing(parallelProcessing);
+        return this;
+    }
+
+    /**
+     * If enabled then sending messages to the multicasts occurs concurrently. 
Note the caller thread will still wait
+     * until all messages has been fully processed, before it continues. Its 
only the sending and processing the replies
+     * from the multicasts which happens concurrently.
+     *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the multicast, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public MulticastDefinition parallelProcessing(boolean parallelProcessing) {
@@ -195,6 +222,37 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition>
         return this;
     }
 
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the multicast is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition synchronous() {
+        return synchronous(true);
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the multicast is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition synchronous(boolean synchronous) {
+        return synchronous(Boolean.toString(synchronous));
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the multicast is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition synchronous(String synchronous) {
+        setSynchronous(synchronous);
+        return this;
+    }
+
     /**
      * If enabled then the aggregate method on AggregationStrategy can be 
called concurrently. Notice that this would
      * require the implementation of AggregationStrategy to be implemented as 
thread-safe. By default this is false
@@ -208,6 +266,32 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition>
         return this;
     }
 
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be 
called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as 
thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. 
Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented 
as thread-safe.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelAggregate(boolean parallelAggregate) {
+        setParallelAggregate(Boolean.toString(parallelAggregate));
+        return this;
+    }
+
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be 
called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as 
thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. 
Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented 
as thread-safe.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelAggregate(String parallelAggregate) {
+        setParallelAggregate(parallelAggregate);
+        return this;
+    }
+
     /**
      * If enabled then Camel will process replies out-of-order, eg in the 
order they come back. If disabled, Camel will
      * process replies in the same order as defined by the multicast.
@@ -373,6 +457,14 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition>
         this.parallelProcessing = parallelProcessing;
     }
 
+    public String getSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(String synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public String getStreaming() {
         return streaming;
     }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 2ebe2567c06..26bf2cd9476 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -66,6 +66,9 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     @Metadata(javaType = "java.lang.Boolean")
     private String parallelProcessing;
     @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
+    private String synchronous;
+    @XmlAttribute
     @Metadata(javaType = "java.time.Duration", defaultValue = "0")
     private String timeout;
     @XmlAttribute
@@ -197,6 +200,10 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
      * until all messages has been fully processed, before it continues. Its 
only the sending and processing the replies
      * from the recipients which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the recipient list,
+     * then make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public RecipientListDefinition<Type> parallelProcessing() {
@@ -209,13 +216,32 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
      * until all messages has been fully processed, before it continues. Its 
only the sending and processing the replies
      * from the recipients which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the recipient list,
+     * then make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
-    public RecipientListDefinition<Type> parallelProcessing(boolean 
parallelProcessing) {
-        setParallelProcessing(Boolean.toString(parallelProcessing));
+    public RecipientListDefinition<Type> parallelProcessing(String 
parallelProcessing) {
+        setParallelProcessing(parallelProcessing);
         return this;
     }
 
+    /**
+     * If enabled then sending messages to the recipients occurs concurrently. 
Note the caller thread will still wait
+     * until all messages has been fully processed, before it continues. Its 
only the sending and processing the replies
+     * from the recipients which happens concurrently.
+     *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the recipient list,
+     * then make sure to enable the synchronous option as well.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelProcessing(boolean 
parallelProcessing) {
+        return parallelProcessing(Boolean.toString(parallelProcessing));
+    }
+
     /**
      * If enabled then the aggregate method on AggregationStrategy can be 
called concurrently. Notice that this would
      * require the implementation of AggregationStrategy to be implemented as 
thread-safe. By default this is false
@@ -225,7 +251,63 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
      * @return the builder
      */
     public RecipientListDefinition<Type> parallelAggregate() {
-        setParallelAggregate(Boolean.toString(true));
+        return parallelAggregate(Boolean.toString(true));
+    }
+
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be 
called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as 
thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. 
Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented 
as thread-safe.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelAggregate(boolean 
parallelAggregate) {
+        setParallelAggregate(Boolean.toString(parallelAggregate));
+        return this;
+    }
+
+    /**
+     * If enabled then the aggregate method on AggregationStrategy can be 
called concurrently. Notice that this would
+     * require the implementation of AggregationStrategy to be implemented as 
thread-safe. By default this is false
+     * meaning that Camel synchronizes the call to the aggregate method. 
Though in some use-cases this can be used to
+     * archive higher performance when the AggregationStrategy is implemented 
as thread-safe.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelAggregate(String 
parallelAggregate) {
+        setParallelAggregate(parallelAggregate);
+        return this;
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the recipient list is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> synchronous() {
+        return synchronous(true);
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the recipient list is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> synchronous(boolean synchronous) {
+        return synchronous(Boolean.toString(synchronous));
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the recipient list is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> synchronous(String synchronous) {
+        setSynchronous(synchronous);
         return this;
     }
 
@@ -430,6 +512,14 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
         this.parallelProcessing = parallelProcessing;
     }
 
+    public String getSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(String synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public String getIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
index 8eaf0f59b43..e976072fdff 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -65,6 +65,9 @@ public class SplitDefinition extends OutputExpressionNode 
implements ExecutorSer
     private String parallelProcessing;
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
+    private String synchronous;
+    @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
     private String streaming;
     @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean")
@@ -194,6 +197,10 @@ public class SplitDefinition extends OutputExpressionNode 
implements ExecutorSer
      * all messages has been fully processed, before it continues. It's only 
processing the sub messages from the
      * splitter which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the splitter, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public SplitDefinition parallelProcessing() {
@@ -205,6 +212,10 @@ public class SplitDefinition extends OutputExpressionNode 
implements ExecutorSer
      * all messages has been fully processed, before it continues. It's only 
processing the sub messages from the
      * splitter which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the splitter, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public SplitDefinition parallelProcessing(boolean parallelProcessing) {
@@ -216,6 +227,10 @@ public class SplitDefinition extends OutputExpressionNode 
implements ExecutorSer
      * all messages has been fully processed, before it continues. It's only 
processing the sub messages from the
      * splitter which happens concurrently.
      *
+     * When parallel processing is enabled, then the Camel routing engin will 
continue processing using last used thread
+     * from the parallel thread pool. However, if you want to use the original 
thread that called the splitter, then
+     * make sure to enable the synchronous option as well.
+     *
      * @return the builder
      */
     public SplitDefinition parallelProcessing(String parallelProcessing) {
@@ -260,6 +275,37 @@ public class SplitDefinition extends OutputExpressionNode 
implements ExecutorSer
         return this;
     }
 
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the split is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public SplitDefinition synchronous() {
+        return synchronous(true);
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the split is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public SplitDefinition synchronous(boolean synchronous) {
+        return synchronous(Boolean.toString(synchronous));
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used. When 
enabled then the same thread is used to
+     * continue routing after the split is complete, even if parallel 
processing is enabled.
+     *
+     * @return the builder
+     */
+    public SplitDefinition synchronous(String synchronous) {
+        setSynchronous(synchronous);
+        return this;
+    }
+
     /**
      * When in streaming mode, then the splitter splits the original message 
on-demand, and each split message is
      * processed one by one. This reduces memory usage as the splitter do not 
split all the messages first, but then we
@@ -522,6 +568,14 @@ public class SplitDefinition extends OutputExpressionNode 
implements ExecutorSer
         this.parallelProcessing = parallelProcessing;
     }
 
+    public String getSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(String synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public String getStreaming() {
         return streaming;
     }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 0b5f1f6dc33..8ce05283e28 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -55,6 +55,7 @@ import org.apache.camel.Traceable;
 import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ErrorHandlerAware;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.InternalProcessorFactory;
@@ -149,6 +150,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     protected final Processor onPrepare;
     protected final ProcessorExchangeFactory processorExchangeFactory;
+    private final AsyncProcessorAwaitManager awaitManager;
     private final CamelContext camelContext;
     private final InternalProcessorFactory internalProcessorFactory;
     private final Route route;
@@ -159,6 +161,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     private Collection<Processor> processors;
     private final AggregationStrategy aggregationStrategy;
     private final boolean parallelProcessing;
+    private boolean synchronous;
     private final boolean streaming;
     private final boolean parallelAggregate;
     private final boolean stopOnException;
@@ -189,6 +192,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.internalProcessorFactory = 
camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory();
+        this.awaitManager = 
camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         this.route = route;
         this.reactiveExecutor = 
camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processors = processors;
@@ -251,6 +255,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return camelContext;
     }
 
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     @Override
     protected void doBuild() throws Exception {
         if (processorExchangeFactory != null) {
@@ -289,6 +301,28 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (synchronous) {
+            try {
+                // force synchronous processing using await manager
+                awaitManager.process(new AsyncProcessorSupport() {
+                    @Override
+                    public boolean process(Exchange exchange, AsyncCallback 
callback) {
+                        // must invoke doProcess directly here to avoid 
calling recursive
+                        return doProcess(exchange, callback);
+                    }
+                }, exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            } finally {
+                callback.done(true);
+            }
+            return true;
+        } else {
+            return doProcess(exchange, callback);
+        }
+    }
+
+    protected boolean doProcess(Exchange exchange, AsyncCallback callback) {
         Iterable<ProcessorExchangePair> pairs;
         int size = 0;
         try {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index bed81851648..dbccb6cacf7 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -58,6 +58,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
     private final Expression expression;
     private final String delimiter;
     private boolean parallelProcessing;
+    private boolean synchronous;
     private boolean parallelAggregate;
     private boolean stopOnException;
     private boolean ignoreInvalidEndpoints;
@@ -205,6 +206,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
                 camelContext, null, expression, delimiter, producerCache, 
getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate());
+        recipientListProcessor.setSynchronous(synchronous);
         recipientListProcessor.setErrorHandler(errorHandler);
         
recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
         
recipientListProcessor.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
@@ -264,6 +266,14 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
         this.parallelProcessing = parallelProcessing;
     }
 
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public boolean isParallelAggregate() {
         return parallelAggregate;
     }
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index 2f638e8629e..86d2de72f2a 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -58,6 +58,7 @@ public class MulticastReifier extends 
ProcessorReifier<MulticastDefinition> {
         final AggregationStrategy strategy = createAggregationStrategy();
 
         boolean isParallelProcessing = 
parseBoolean(definition.getParallelProcessing(), false);
+        boolean isSynchronous = parseBoolean(definition.getSynchronous(), 
false);
         boolean isShareUnitOfWork = 
parseBoolean(definition.getShareUnitOfWork(), false);
         boolean isStreaming = parseBoolean(definition.getStreaming(), false);
         boolean isStopOnException = 
parseBoolean(definition.getStopOnException(), false);
@@ -78,6 +79,7 @@ public class MulticastReifier extends 
ProcessorReifier<MulticastDefinition> {
         MulticastProcessor answer = new MulticastProcessor(
                 camelContext, route, list, strategy, isParallelProcessing, 
threadPool, shutdownThreadPool, isStreaming,
                 isStopOnException, timeout, prepare, isShareUnitOfWork, 
isParallelAggregate);
+        answer.setSynchronous(isSynchronous);
         return answer;
     }
 
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
index bfc3ad6f61f..5ced2c43173 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RecipientListReifier.java
@@ -46,6 +46,7 @@ public class RecipientListReifier extends 
ProcessorReifier<RecipientListDefiniti
         final Expression expression = 
createExpression(definition.getExpression());
 
         boolean isParallelProcessing = 
parseBoolean(definition.getParallelProcessing(), false);
+        boolean isSynchronous = parseBoolean(definition.getSynchronous(), 
false);
         boolean isStreaming = parseBoolean(definition.getStreaming(), false);
         boolean isParallelAggregate = 
parseBoolean(definition.getParallelAggregate(), false);
         boolean isShareUnitOfWork = 
parseBoolean(definition.getShareUnitOfWork(), false);
@@ -62,6 +63,7 @@ public class RecipientListReifier extends 
ProcessorReifier<RecipientListDefiniti
         answer.setAggregationStrategy(createAggregationStrategy());
         answer.setParallelProcessing(isParallelProcessing);
         answer.setParallelAggregate(isParallelAggregate);
+        answer.setSynchronous(isSynchronous);
         answer.setStreaming(isStreaming);
         answer.setShareUnitOfWork(isShareUnitOfWork);
         answer.setStopOnException(isStopOnException);
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
index 471beee3b01..55fc66c1e4b 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SplitReifier.java
@@ -44,6 +44,7 @@ public class SplitReifier extends 
ExpressionReifier<SplitDefinition> {
         final AggregationStrategy strategy = createAggregationStrategy();
 
         boolean isParallelProcessing = 
parseBoolean(definition.getParallelProcessing(), false);
+        boolean isSynchronous = parseBoolean(definition.getSynchronous(), 
false);
         boolean isStreaming = parseBoolean(definition.getStreaming(), false);
         boolean isShareUnitOfWork = 
parseBoolean(definition.getShareUnitOfWork(), false);
         boolean isParallelAggregate = 
parseBoolean(definition.getParallelAggregate(), false);
@@ -75,6 +76,7 @@ public class SplitReifier extends 
ExpressionReifier<SplitDefinition> {
                     threadPool, shutdownThreadPool, isStreaming, 
isStopOnException, timeout, prepare,
                     isShareUnitOfWork, isParallelAggregate);
         }
+        answer.setSynchronous(isSynchronous);
 
         return answer;
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelSynchronousTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelSynchronousTest.java
new file mode 100644
index 00000000000..0b90f83f8e1
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelSynchronousTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MulticastParallelSynchronousTest extends ContextTestSupport {
+
+    private String before;
+    private String middle;
+    private String middle2;
+    private String after;
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "A");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertNotEquals(before, middle);
+        Assertions.assertNotEquals(before, middle2);
+        Assertions.assertNotEquals(after, middle);
+        Assertions.assertNotEquals(after, middle2);
+        Assertions.assertEquals(before, after);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                        .process(e -> {
+                            before = Thread.currentThread().getName();
+                        })
+                        .multicast().parallelProcessing().synchronous()
+                        .process(e -> {
+                            middle = Thread.currentThread().getName();
+                        })
+                        .process(e -> {
+                            middle2 = Thread.currentThread().getName();
+                        })
+                        .end()
+                        .process(e -> {
+                            after = Thread.currentThread().getName();
+                        })
+                        .to("mock:end");
+
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelSynchronousTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelSynchronousTest.java
new file mode 100644
index 00000000000..7ab24dbbced
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelSynchronousTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RecipientListParallelSynchronousTest extends ContextTestSupport {
+
+    private String before;
+    private String middle;
+    private String middle2;
+    private String after;
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:a", "A", "whereTo", 
"direct:b,direct:c");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertNotEquals(before, middle);
+        Assertions.assertNotEquals(before, middle2);
+        Assertions.assertNotEquals(after, middle);
+        Assertions.assertNotEquals(after, middle2);
+        Assertions.assertEquals(before, after);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                        .process(e -> {
+                            before = Thread.currentThread().getName();
+                        })
+                        
.recipientList(header("whereTo")).parallelProcessing().synchronous()
+                        .process(e -> {
+                            after = Thread.currentThread().getName();
+                        })
+                        .to("mock:end");
+
+                from("direct:b")
+                        .process(e -> {
+                            middle = Thread.currentThread().getName();
+                        });
+
+                from("direct:c")
+                        .process(e -> {
+                            middle2 = Thread.currentThread().getName();
+                        });
+
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelSynchronousTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelSynchronousTest.java
new file mode 100644
index 00000000000..4bd5a300ea3
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelSynchronousTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SplitParallelSynchronousTest extends ContextTestSupport {
+
+    private String before;
+    private String middle;
+    private String after;
+
+    @Test
+    public void testSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:a", "A,B,C,D,E");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertNotEquals(before, middle);
+        Assertions.assertNotEquals(after, middle);
+        Assertions.assertEquals(before, after);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a")
+                        .process(e -> {
+                            before = Thread.currentThread().getName();
+                        })
+                        
.split(body().tokenize(",")).parallelProcessing().synchronous()
+                        .process(e -> {
+                            middle = Thread.currentThread().getName();
+                        })
+                        .end()
+                        .process(e -> {
+                            after = Thread.currentThread().getName();
+                        })
+                        .to("mock:end");
+
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index cf3b37c9597..cc748b0d14e 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -606,6 +606,7 @@ public class ModelParser extends BaseParser {
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
                 case "stopOnException": def.setStopOnException(val); break;
                 case "streaming": def.setStreaming(val); break;
+                case "synchronous": def.setSynchronous(val); break;
                 case "timeout": def.setTimeout(val); break;
                 default: return 
processorDefinitionAttributeHandler().accept(def, key, val);
             }
@@ -818,6 +819,7 @@ public class ModelParser extends BaseParser {
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
                 case "stopOnException": def.setStopOnException(val); break;
                 case "streaming": def.setStreaming(val); break;
+                case "synchronous": def.setSynchronous(val); break;
                 case "timeout": def.setTimeout(val); break;
                 default: return 
processorDefinitionAttributeHandler().accept(def, key, val);
             }
@@ -1343,6 +1345,7 @@ public class ModelParser extends BaseParser {
                 case "shareUnitOfWork": def.setShareUnitOfWork(val); break;
                 case "stopOnException": def.setStopOnException(val); break;
                 case "streaming": def.setStreaming(val); break;
+                case "synchronous": def.setSynchronous(val); break;
                 case "timeout": def.setTimeout(val); break;
                 default: return 
processorDefinitionAttributeHandler().accept(def, key, val);
             }
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index fbff89f0b11..42f77213767 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -8879,6 +8879,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "steps", type = 
"array:org.apache.camel.model.ProcessorDefinition"),
                     @YamlProperty(name = "stop-on-exception", type = 
"boolean"),
                     @YamlProperty(name = "streaming", type = "boolean"),
+                    @YamlProperty(name = "synchronous", type = "boolean"),
                     @YamlProperty(name = "timeout", type = "string")
             }
     )
@@ -8956,6 +8957,11 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     target.setStreaming(val);
                     break;
                 }
+                case "synchronous": {
+                    String val = asText(node);
+                    target.setSynchronous(val);
+                    break;
+                }
                 case "timeout": {
                     String val = asText(node);
                     target.setTimeout(val);
@@ -11365,6 +11371,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "share-unit-of-work", type = 
"boolean"),
                     @YamlProperty(name = "stop-on-exception", type = 
"boolean"),
                     @YamlProperty(name = "streaming", type = "boolean"),
+                    @YamlProperty(name = "synchronous", type = "boolean"),
                     @YamlProperty(name = "timeout", type = "string")
             }
     )
@@ -11462,6 +11469,11 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     target.setStreaming(val);
                     break;
                 }
+                case "synchronous": {
+                    String val = asText(node);
+                    target.setSynchronous(val);
+                    break;
+                }
                 case "timeout": {
                     String val = asText(node);
                     target.setTimeout(val);
@@ -15286,6 +15298,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "steps", type = 
"array:org.apache.camel.model.ProcessorDefinition"),
                     @YamlProperty(name = "stop-on-exception", type = 
"boolean"),
                     @YamlProperty(name = "streaming", type = "boolean"),
+                    @YamlProperty(name = "synchronous", type = "boolean"),
                     @YamlProperty(name = "timeout", type = "string")
             }
     )
@@ -15373,6 +15386,11 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     target.setStreaming(val);
                     break;
                 }
+                case "synchronous": {
+                    String val = asText(node);
+                    target.setSynchronous(val);
+                    break;
+                }
                 case "timeout": {
                     String val = asText(node);
                     target.setTimeout(val);
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
index aae0998745d..7f8aeca9036 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camel-yaml-dsl.json
@@ -1650,6 +1650,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -2115,6 +2118,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -3015,6 +3021,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index c60ab670796..e8701676be3 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -1554,6 +1554,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -2019,6 +2022,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }
@@ -2919,6 +2925,9 @@
           "streaming" : {
             "type" : "boolean"
           },
+          "synchronous" : {
+            "type" : "boolean"
+          },
           "timeout" : {
             "type" : "string"
           }

Reply via email to