This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push:
new e4d0d6e50dd CAMEL-21671: camel-core: Split/Multicast EIP: Sync
processing in parallel mode to avoid unbounded thread use, but to respect the
thread pool limits from the EIP (#17331)
e4d0d6e50dd is described below
commit e4d0d6e50dd059b1c7434224008c211a9fffc4e4
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 3 10:23:28 2025 +0000
CAMEL-21671: camel-core: Split/Multicast EIP: Sync processing in parallel
mode to avoid unbounded thread use, but to respect the thread pool limits from
the EIP (#17331)
* CAMEL-21671: camel-core: Split/Multicast EIP: Sync processing in parallel
mode to avoid unbounded thread use, but to respect the thread pool limits from
the EIP. Thanks to Soheila Esmaeili for unit test.
---
.../org/apache/camel/catalog/models/multicast.json | 2 +-
.../apache/camel/catalog/models/recipientList.json | 2 +-
.../org/apache/camel/catalog/models/split.json | 2 +-
.../apache/camel/catalog/schemas/camel-spring.xsd | 17 ++-
.../META-INF/org/apache/camel/model/multicast.json | 2 +-
.../org/apache/camel/model/recipientList.json | 2 +-
.../META-INF/org/apache/camel/model/split.json | 2 +-
.../apache/camel/model/MulticastDefinition.java | 15 +++
.../camel/model/RecipientListDefinition.java | 15 +++
.../org/apache/camel/model/SplitDefinition.java | 15 +++
.../apache/camel/processor/MulticastProcessor.java | 19 +++-
.../SplitterParallelAsyncProcessorIssueTest.java | 122 +++++++++++++++++++++
.../AsyncEndpointMulticastSynchronousTest.java | 67 +++++++++++
.../ROOT/pages/camel-4x-upgrade-guide-4_10.adoc | 13 ++-
.../dsl/yaml/deserializers/ModelDeserializers.java | 6 +-
.../generated/resources/schema/camelYamlDsl.json | 6 +-
16 files changed, 287 insertions(+), 20 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json
index 0d22e833805..48f563f8eff 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/multicast.json
@@ -19,7 +19,7 @@
"aggregationStrategyMethodName": { "index": 4, "kind": "attribute",
"displayName": "Aggregation Strategy Method Name", "group": "advanced",
"label": "advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "This option can be used to explicit declare the method name to
use, when using POJOs as the AggregationStrategy." },
"aggregationStrategyMethodAllowNull": { "index": 5, "kind": "attribute",
"displayName": "Aggregation Strategy Method Allow Null", "group": "advanced",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "If this option is false then the
aggregate method is not used if there was no data to enrich. If this option is
true then null values is used as the [...]
"parallelAggregate": { "index": 6, "kind": "attribute", "displayName":
"Parallel Aggregate", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
enabled then the aggregate method on AggregationStrategy can be called
concurrently. Notice that this would require the implementation of
AggregationStrategy to be implemented as thre [...]
- "parallelProcessing": { "index": 7, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the multicasts occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
+ "parallelProcessing": { "index": 7, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the multicasts occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
"synchronous": { "index": 8, "kind": "attribute", "displayName":
"Synchronous", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Sets whether
synchronous processing should be strictly used. When enabled then the same
thread is used to continue routing after the multicast is complete, even if
parallel processing is enabled." },
"streaming": { "index": 9, "kind": "attribute", "displayName":
"Streaming", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then Camel
will process replies out-of-order, eg in the order they come back. If disabled,
Camel will process replies in the same order as defined by the multicast." },
"stopOnException": { "index": 10, "kind": "attribute", "displayName":
"Stop On Exception", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Will now stop further processing if an exception or failure occurred during
processing of an org.apache.camel.Exchange and the caused exception will be
thrown. Will also stop if processin [...]
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json
index 024240e23b3..a0248c8757b 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/recipientList.json
@@ -21,7 +21,7 @@
"aggregationStrategyMethodName": { "index": 6, "kind": "attribute",
"displayName": "Aggregation Strategy Method Name", "group": "advanced",
"label": "advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "This option can be used to explicit declare the method name to
use, when using POJOs as the AggregationStrategy." },
"aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute",
"displayName": "Aggregation Strategy Method Allow Null", "group": "advanced",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "If this option is false then the
aggregate method is not used if there was no data to enrich. If this option is
true then null values is used as the [...]
"parallelAggregate": { "index": 8, "kind": "attribute", "displayName":
"Parallel Aggregate", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
enabled then the aggregate method on AggregationStrategy can be called
concurrently. Notice that this would require the implementation of
AggregationStrategy to be implemented as thre [...]
- "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the recipients occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
+ "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the recipients occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
"synchronous": { "index": 10, "kind": "attribute", "displayName":
"Synchronous", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Sets whether
synchronous processing should be strictly used. When enabled then the same
thread is used to continue routing after the recipient list is complete, even
if parallel processing is enabled." },
"timeout": { "index": 11, "kind": "attribute", "displayName": "Timeout",
"group": "common", "required": false, "type": "duration", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "0", "description": "Sets a total timeout specified in millis,
when using parallel processing. If the Recipient List hasn't been able to send
and process all replies within the given timeframe, then the timeout triggers
and the Recipient List breaks o [...]
"executorService": { "index": 12, "kind": "attribute", "displayName":
"Executor Service", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType": "java.util.concurrent.ExecutorService",
"deprecated": false, "autowired": false, "secret": false, "description": "To
use a custom Thread Pool to be used for parallel processing. Notice if you set
this option, then parallel processing is automatic implied, and you do not have
to enable that option as well." },
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json
index 816bb7504f5..f62ad40e890 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/split.json
@@ -21,7 +21,7 @@
"aggregationStrategyMethodName": { "index": 6, "kind": "attribute",
"displayName": "Aggregation Strategy Method Name", "group": "advanced",
"label": "advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "This option can be used to explicit declare the method name to
use, when using POJOs as the AggregationStrategy." },
"aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute",
"displayName": "Aggregation Strategy Method Allow Null", "group": "advanced",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "If this option is false then the
aggregate method is not used if there was no data to enrich. If this option is
true then null values is used as the [...]
"parallelAggregate": { "index": 8, "kind": "attribute", "displayName":
"Parallel Aggregate", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
enabled then the aggregate method on AggregationStrategy can be called
concurrently. Notice that this would require the implementation of
AggregationStrategy to be implemented as thre [...]
- "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then
processing each split messages occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
It's only processing the sub mess [...]
+ "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then
processing each split messages occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
It's only processing the sub mess [...]
"synchronous": { "index": 10, "kind": "attribute", "displayName":
"Synchronous", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Sets whether
synchronous processing should be strictly used. When enabled then the same
thread is used to continue routing after the split is complete, even if
parallel processing is enabled." },
"streaming": { "index": 11, "kind": "attribute", "displayName":
"Streaming", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "When in streaming mode,
then the splitter splits the original message on-demand, and each split message
is processed one by one. This reduces memory usage as the splitter do not split
all the messages first, but then we do n [...]
"stopOnException": { "index": 12, "kind": "attribute", "displayName":
"Stop On Exception", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Will now stop further processing if an exception or failure occurred during
processing of an org.apache.camel.Exchange and the caused exception will be
thrown. Will also stop if processin [...]
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 06008c197e7..438dd6ef9d1 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -10952,7 +10952,11 @@ If enabled then sending messages to the multicasts
occurs concurrently. Note the
messages has been fully processed, before it continues. Its only the sending
and processing the replies from the
multicasts which happens concurrently. When parallel processing is enabled,
then the Camel routing engin will continue
processing using last used thread from the parallel thread pool. However, if
you want to use the original thread that
-called the multicast, then make sure to enable the synchronous option as well.
Default value: false
+called the multicast, then make sure to enable the synchronous option as well.
In parallel processing mode, you may want
+to also synchronous = true to force this EIP to process the sub-tasks using
the upper bounds of the thread-pool. If
+using synchronous = false then Camel will allow its reactive routing engine to
use as many threads as possible, which
+may be available due to sub-tasks using other thread-pools such as
CompletableFuture.runAsync or others. Default value:
+false
]]>
</xs:documentation>
</xs:annotation>
@@ -11992,7 +11996,11 @@ If enabled then sending messages to the recipients
occurs concurrently. Note the
messages has been fully processed, before it continues. Its only the sending
and processing the replies from the
recipients which happens concurrently. When parallel processing is enabled,
then the Camel routing engin will continue
processing using last used thread from the parallel thread pool. However, if
you want to use the original thread that
-called the recipient list, then make sure to enable the synchronous option as
well. Default value: false
+called the recipient list, then make sure to enable the synchronous option as
well. In parallel processing mode, you may
+want to also synchronous = true to force this EIP to process the sub-tasks
using the upper bounds of the thread-pool. If
+using synchronous = false then Camel will allow its reactive routing engine to
use as many threads as possible, which
+may be available due to sub-tasks using other thread-pools such as
CompletableFuture.runAsync or others. Default value:
+false
]]>
</xs:documentation>
</xs:annotation>
@@ -13619,7 +13627,10 @@ If enabled then processing each split messages occurs
concurrently. Note the cal
messages has been fully processed, before it continues. It's only processing
the sub messages from the splitter which
happens concurrently. When parallel processing is enabled, then the Camel
routing engin will continue processing using
last used thread from the parallel thread pool. However, if you want to use
the original thread that called the
-splitter, then make sure to enable the synchronous option as well. Default
value: false
+splitter, then make sure to enable the synchronous option as well. In parallel
processing mode, you may want to also
+synchronous = true to force this EIP to process the sub-tasks using the upper
bounds of the thread-pool. If using
+synchronous = false then Camel will allow its reactive routing engine to use
as many threads as possible, which may be
+available due to sub-tasks using other thread-pools such as
CompletableFuture.runAsync or others. Default value: false
]]>
</xs:documentation>
</xs:annotation>
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json
index 0d22e833805..48f563f8eff 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json
+++
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/multicast.json
@@ -19,7 +19,7 @@
"aggregationStrategyMethodName": { "index": 4, "kind": "attribute",
"displayName": "Aggregation Strategy Method Name", "group": "advanced",
"label": "advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "This option can be used to explicit declare the method name to
use, when using POJOs as the AggregationStrategy." },
"aggregationStrategyMethodAllowNull": { "index": 5, "kind": "attribute",
"displayName": "Aggregation Strategy Method Allow Null", "group": "advanced",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "If this option is false then the
aggregate method is not used if there was no data to enrich. If this option is
true then null values is used as the [...]
"parallelAggregate": { "index": 6, "kind": "attribute", "displayName":
"Parallel Aggregate", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
enabled then the aggregate method on AggregationStrategy can be called
concurrently. Notice that this would require the implementation of
AggregationStrategy to be implemented as thre [...]
- "parallelProcessing": { "index": 7, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the multicasts occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
+ "parallelProcessing": { "index": 7, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the multicasts occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
"synchronous": { "index": 8, "kind": "attribute", "displayName":
"Synchronous", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Sets whether
synchronous processing should be strictly used. When enabled then the same
thread is used to continue routing after the multicast is complete, even if
parallel processing is enabled." },
"streaming": { "index": 9, "kind": "attribute", "displayName":
"Streaming", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then Camel
will process replies out-of-order, eg in the order they come back. If disabled,
Camel will process replies in the same order as defined by the multicast." },
"stopOnException": { "index": 10, "kind": "attribute", "displayName":
"Stop On Exception", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Will now stop further processing if an exception or failure occurred during
processing of an org.apache.camel.Exchange and the caused exception will be
thrown. Will also stop if processin [...]
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json
index 024240e23b3..a0248c8757b 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json
+++
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/recipientList.json
@@ -21,7 +21,7 @@
"aggregationStrategyMethodName": { "index": 6, "kind": "attribute",
"displayName": "Aggregation Strategy Method Name", "group": "advanced",
"label": "advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "This option can be used to explicit declare the method name to
use, when using POJOs as the AggregationStrategy." },
"aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute",
"displayName": "Aggregation Strategy Method Allow Null", "group": "advanced",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "If this option is false then the
aggregate method is not used if there was no data to enrich. If this option is
true then null values is used as the [...]
"parallelAggregate": { "index": 8, "kind": "attribute", "displayName":
"Parallel Aggregate", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
enabled then the aggregate method on AggregationStrategy can be called
concurrently. Notice that this would require the implementation of
AggregationStrategy to be implemented as thre [...]
- "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the recipients occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
+ "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then sending
messages to the recipients occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
Its only the sending and proc [...]
"synchronous": { "index": 10, "kind": "attribute", "displayName":
"Synchronous", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Sets whether
synchronous processing should be strictly used. When enabled then the same
thread is used to continue routing after the recipient list is complete, even
if parallel processing is enabled." },
"timeout": { "index": 11, "kind": "attribute", "displayName": "Timeout",
"group": "common", "required": false, "type": "duration", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "0", "description": "Sets a total timeout specified in millis,
when using parallel processing. If the Recipient List hasn't been able to send
and process all replies within the given timeframe, then the timeout triggers
and the Recipient List breaks o [...]
"executorService": { "index": 12, "kind": "attribute", "displayName":
"Executor Service", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType": "java.util.concurrent.ExecutorService",
"deprecated": false, "autowired": false, "secret": false, "description": "To
use a custom Thread Pool to be used for parallel processing. Notice if you set
this option, then parallel processing is automatic implied, and you do not have
to enable that option as well." },
diff --git
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json
index 816bb7504f5..f62ad40e890 100644
---
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json
+++
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/split.json
@@ -21,7 +21,7 @@
"aggregationStrategyMethodName": { "index": 6, "kind": "attribute",
"displayName": "Aggregation Strategy Method Name", "group": "advanced",
"label": "advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "This option can be used to explicit declare the method name to
use, when using POJOs as the AggregationStrategy." },
"aggregationStrategyMethodAllowNull": { "index": 7, "kind": "attribute",
"displayName": "Aggregation Strategy Method Allow Null", "group": "advanced",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "If this option is false then the
aggregate method is not used if there was no data to enrich. If this option is
true then null values is used as the [...]
"parallelAggregate": { "index": 8, "kind": "attribute", "displayName":
"Parallel Aggregate", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": true,
"autowired": false, "secret": false, "defaultValue": false, "description": "If
enabled then the aggregate method on AggregationStrategy can be called
concurrently. Notice that this would require the implementation of
AggregationStrategy to be implemented as thre [...]
- "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then
processing each split messages occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
It's only processing the sub mess [...]
+ "parallelProcessing": { "index": 9, "kind": "attribute", "displayName":
"Parallel Processing", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If enabled then
processing each split messages occurs concurrently. Note the caller thread will
still wait until all messages has been fully processed, before it continues.
It's only processing the sub mess [...]
"synchronous": { "index": 10, "kind": "attribute", "displayName":
"Synchronous", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Sets whether
synchronous processing should be strictly used. When enabled then the same
thread is used to continue routing after the split is complete, even if
parallel processing is enabled." },
"streaming": { "index": 11, "kind": "attribute", "displayName":
"Streaming", "group": "common", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "When in streaming mode,
then the splitter splits the original message on-demand, and each split message
is processed one by one. This reduces memory usage as the splitter do not split
all the messages first, but then we do n [...]
"stopOnException": { "index": 12, "kind": "attribute", "displayName":
"Stop On Exception", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Will now stop further processing if an exception or failure occurred during
processing of an org.apache.camel.Exchange and the caused exception will be
thrown. Will also stop if processin [...]
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 6f5c5276e2c..49d6ec063f8 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -208,6 +208,11 @@ public class MulticastDefinition extends
OutputDefinition<MulticastDefinition>
* from the parallel thread pool. However, if you want to use the original
thread that called the multicast, then
* make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public MulticastDefinition parallelProcessing() {
@@ -224,6 +229,11 @@ public class MulticastDefinition extends
OutputDefinition<MulticastDefinition>
* from the parallel thread pool. However, if you want to use the original
thread that called the multicast, then
* make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public MulticastDefinition parallelProcessing(String parallelProcessing) {
@@ -240,6 +250,11 @@ public class MulticastDefinition extends
OutputDefinition<MulticastDefinition>
* from the parallel thread pool. However, if you want to use the original
thread that called the multicast, then
* make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public MulticastDefinition parallelProcessing(boolean parallelProcessing) {
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index cb394224180..e253e8490d4 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -232,6 +232,11 @@ public class RecipientListDefinition<Type extends
ProcessorDefinition<Type>> ext
* from the parallel thread pool. However, if you want to use the original
thread that called the recipient list,
* then make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public RecipientListDefinition<Type> parallelProcessing() {
@@ -248,6 +253,11 @@ public class RecipientListDefinition<Type extends
ProcessorDefinition<Type>> ext
* from the parallel thread pool. However, if you want to use the original
thread that called the recipient list,
* then make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public RecipientListDefinition<Type> parallelProcessing(String
parallelProcessing) {
@@ -264,6 +274,11 @@ public class RecipientListDefinition<Type extends
ProcessorDefinition<Type>> ext
* from the parallel thread pool. However, if you want to use the original
thread that called the recipient list,
* then make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public RecipientListDefinition<Type> parallelProcessing(boolean
parallelProcessing) {
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
index 193a91ba96d..3934f60447b 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -229,6 +229,11 @@ public class SplitDefinition extends OutputExpressionNode
implements ExecutorSer
* from the parallel thread pool. However, if you want to use the original
thread that called the splitter, then
* make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public SplitDefinition parallelProcessing() {
@@ -244,6 +249,11 @@ public class SplitDefinition extends OutputExpressionNode
implements ExecutorSer
* from the parallel thread pool. However, if you want to use the original
thread that called the splitter, then
* make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public SplitDefinition parallelProcessing(boolean parallelProcessing) {
@@ -259,6 +269,11 @@ public class SplitDefinition extends OutputExpressionNode
implements ExecutorSer
* from the parallel thread pool. However, if you want to use the original
thread that called the splitter, then
* make sure to enable the synchronous option as well.
*
+ * In parallel processing mode, you may want to also synchronous = true to
force this EIP to process the sub-tasks
+ * using the upper bounds of the thread-pool. If using synchronous = false
then Camel will allow its reactive
+ * routing engine to use as many threads as possible, which may be
available due to sub-tasks using other
+ * thread-pools such as CompletableFuture.runAsync or others.
+ *
* @return the builder
*/
public SplitDefinition parallelProcessing(String parallelProcessing) {
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 3fd2143fdd8..7ec13021502 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -604,8 +604,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
// compute time taken if sending to another endpoint
StopWatch watch = beforeSend(pair);
- AsyncProcessor async =
AsyncProcessorConverterHelper.convert(pair.getProcessor());
- async.process(exchange, doneSync -> {
+ AsyncCallback taskCallback = (doneSync) -> {
afterSend(pair, watch);
// Decide whether to continue with the multicast or
not; similar logic to the Pipeline
@@ -640,7 +639,21 @@ public class MulticastProcessor extends
AsyncProcessorSupport
if (hasNext && !isParallelProcessing()) {
schedule(this);
}
- });
+ };
+
+ AsyncProcessor async =
AsyncProcessorConverterHelper.convert(pair.getProcessor());
+ if (synchronous) {
+ // force synchronous processing using await manager
+ // to restrict total number of threads to be bound by
the thread-pool of this EIP,
+ // as otherwise in case of async processing then other
thread pools can cause
+ // unbounded thread use that cannot be controlled by
Camel
+ awaitManager.process(async, exchange);
+ taskCallback.done(true);
+ } else {
+ // async processing in reactive-mode which can use as
many threads as possible
+ // if the downstream processors are async and use
different threads
+ async.process(exchange, taskCallback);
+ }
});
// after submitting this pair then move on to the next pair
(if in parallel mode)
if (hasNext && isParallelProcessing()) {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelAsyncProcessorIssueTest.java
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelAsyncProcessorIssueTest.java
new file mode 100644
index 00000000000..fbc2331ddcd
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelAsyncProcessorIssueTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SplitterParallelAsyncProcessorIssueTest extends
ContextTestSupport {
+
+ private final Set<String> threads = new HashSet<>();
+
+ @Test
+ public void testSplitParallelProcessingMaxThreads() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(10);
+
+ String xmlBody = "<employees>" +
+ "<employee><id>1</id><name>John</name></employee>" +
+ "<employee><id>2</id><name>Jane</name></employee>" +
+ "<employee><id>3</id><name>Jim</name></employee>" +
+ "<employee><id>4</id><name>Jack</name></employee>" +
+ "<employee><id>5</id><name>Jill</name></employee>" +
+ "<employee><id>6</id><name>opi</name></employee>" +
+ "<employee><id>7</id><name>ds</name></employee>" +
+ "<employee><id>8</id><name>hhh</name></employee>" +
+ "<employee><id>9</id><name>fki</name></employee>" +
+ "<employee><id>10</id><name>abc</name></employee>" +
+ "</employees> ";
+
+ template.sendBody("direct:start", xmlBody);
+
+ assertMockEndpointsSatisfied();
+
+ log.info("{} Threads in use: {}", threads.size(), threads);
+
+ // 2 from split EIP and 2 from the async processor that uses the JDK
ForJoinPool
+ Assertions.assertTrue(threads.size() <= 4, "Should not use more than 4
threads, was: " + threads.size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // only use 2 threads at-most
+ ThreadPoolProfile myThreadPoolProfile = new
ThreadPoolProfile("threadPoolProfile");
+ myThreadPoolProfile.setMaxPoolSize(2);
+ myThreadPoolProfile.setPoolSize(2);
+ myThreadPoolProfile.setMaxQueueSize(2);
+
myThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
+
+
getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
+
+ AsyncProcessor asyncProcessor = new AsyncProcessor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback
callback) {
+ // Run the processing in a separate thread
+ CompletableFuture.runAsync(() -> {
+ threads.add(Thread.currentThread().getName());
+ try {
+ // Simulate some asynchronous processing
+ Thread.sleep(250);
+ exchange.getIn().setBody("Processed
asynchronously");
+ } catch (InterruptedException e) {
+ exchange.setException(e);
+ } finally {
+ // Signal completion
+ callback.done(false);
+ }
+ });
+
+ // Return false to indicate that processing is not
complete yet
+ return false;
+ }
+
+ @Override
+ public CompletableFuture<Exchange> processAsync(Exchange
exchange) {
+ return null;
+ }
+ };
+
+ from("direct:start")
+ .split()
+ .xpath("/employees/employee")
+
.parallelProcessing().stopOnException().timeout(10000).executorService("threadPoolProfile").synchronous()
+ .process(e ->
threads.add(Thread.currentThread().getName()))
+ .process(asyncProcessor)
+ .process(e ->
threads.add(Thread.currentThread().getName()))
+ .delay(250)
+ .end()
+ .to("mock:split");
+ }
+ };
+ }
+
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointMulticastSynchronousTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointMulticastSynchronousTest.java
new file mode 100644
index 00000000000..8f07e18aa1b
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointMulticastSynchronousTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AsyncEndpointMulticastSynchronousTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ @Test
+ public void testAsyncEndpoint() {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ String reply = template.requestBody("direct:start", "Hello Camel",
String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertTrue(beforeThreadName.equalsIgnoreCase(afterThreadName), "Should
use same threads");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ context.addComponent("async", new MyAsyncComponent());
+
+
from("direct:start").to("mock:before").to("log:before").process(new Processor()
{
+ public void process(Exchange exchange) {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ }).multicast().synchronous().to("async:hi:moon",
"direct:foo").end().process(new Processor() {
+ public void process(Exchange exchange) {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ }).to("log:after").to("mock:after").to("mock:result");
+
+ from("direct:foo").setBody(constant("Bye Camel"));
+ }
+ };
+ }
+
+}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc
index d35d0e50dca..9127eb867c8 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_10.adoc
@@ -4,8 +4,17 @@ This document is for helping you upgrade your Apache Camel
application
from Camel 4.x to 4.y. For example, if you are upgrading Camel 4.0 to 4.2,
then you should follow the guides
from both 4.0 to 4.1 and 4.1 to 4.2.
-<<<<<<< HEAD
-=======
+== Upgrading from 4.10.1 to 4.10.2
+
+=== Recipient List, Split and Multicast EIP
+
+In parallel processing mode, you can also enable `synchronous=true` to force
these EIPs to process
+the sub-tasks using the upper bounds of the thread-pool. If using
`synchronous=false` then Camel
+will allow its reactive routing engine to use as many threads as possible,
which may be available
+due to sub-tasks using other thread-pools such as `CompletableFuture.runAsync`
or others.
+
+Setting `synchronous=true` is the same behaviour is in Camel 2 which did not
have the reactive routing engine.
+
== Upgrading from 4.10.0 to 4.10.1
=== camel-api
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index 5fbdc56df6f..ea018fb0655 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -10136,7 +10136,7 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
@YamlProperty(name = "id", type = "string", description =
"Sets the id of this node", displayName = "Id"),
@YamlProperty(name = "onPrepare", type = "string",
description = "Uses the Processor when preparing the org.apache.camel.Exchange
to be send. This can be used to deep-clone messages that should be send, or any
custom logic needed before the exchange is send.", displayName = "On Prepare"),
@YamlProperty(name = "parallelAggregate", type =
"boolean", deprecated = true, description = "If enabled then the aggregate
method on AggregationStrategy can be called concurrently. Notice that this
would require the implementation of AggregationStrategy to be implemented as
thread-safe. By default this is false meaning that Camel synchronizes the call
to the aggregate method. Though in some use-cases this can be used to archive
higher performance when the Aggregation [...]
- @YamlProperty(name = "parallelProcessing", type =
"boolean", description = "If enabled then sending messages to the multicasts
occurs concurrently. Note the caller thread will still wait until all messages
has been fully processed, before it continues. Its only the sending and
processing the replies from the multicasts which happens concurrently. When
parallel processing is enabled, then the Camel routing engin will continue
processing using last used thread from the [...]
+ @YamlProperty(name = "parallelProcessing", type =
"boolean", description = "If enabled then sending messages to the multicasts
occurs concurrently. Note the caller thread will still wait until all messages
has been fully processed, before it continues. Its only the sending and
processing the replies from the multicasts which happens concurrently. When
parallel processing is enabled, then the Camel routing engin will continue
processing using last used thread from the [...]
@YamlProperty(name = "shareUnitOfWork", type = "boolean",
description = "Shares the org.apache.camel.spi.UnitOfWork with the parent and
each of the sub messages. Multicast will by default not share unit of work
between the parent exchange and each multicasted exchange. This means each sub
exchange has its own individual unit of work.", displayName = "Share Unit Of
Work"),
@YamlProperty(name = "steps", type =
"array:org.apache.camel.model.ProcessorDefinition"),
@YamlProperty(name = "stopOnException", type = "boolean",
description = "Will now stop further processing if an exception or failure
occurred during processing of an org.apache.camel.Exchange and the caused
exception will be thrown. Will also stop if processing the exchange failed (has
a fault message) or an exception was thrown and handled by the error handler
(such as using onException). In all situations the multicast will stop further
processing. This is the same [...]
@@ -13030,7 +13030,7 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
@YamlProperty(name = "ignoreInvalidEndpoints", type =
"boolean", description = "Ignore the invalidate endpoint exception when try to
create a producer with that endpoint", displayName = "Ignore Invalid
Endpoints"),
@YamlProperty(name = "onPrepare", type = "string",
description = "Uses the Processor when preparing the org.apache.camel.Exchange
to be used send. This can be used to deep-clone messages that should be send,
or any custom logic needed before the exchange is send.", displayName = "On
Prepare"),
@YamlProperty(name = "parallelAggregate", type =
"boolean", deprecated = true, description = "If enabled then the aggregate
method on AggregationStrategy can be called concurrently. Notice that this
would require the implementation of AggregationStrategy to be implemented as
thread-safe. By default this is false meaning that Camel synchronizes the call
to the aggregate method. Though in some use-cases this can be used to archive
higher performance when the Aggregation [...]
- @YamlProperty(name = "parallelProcessing", type =
"boolean", description = "If enabled then sending messages to the recipients
occurs concurrently. Note the caller thread will still wait until all messages
has been fully processed, before it continues. Its only the sending and
processing the replies from the recipients which happens concurrently. When
parallel processing is enabled, then the Camel routing engin will continue
processing using last used thread from the [...]
+ @YamlProperty(name = "parallelProcessing", type =
"boolean", description = "If enabled then sending messages to the recipients
occurs concurrently. Note the caller thread will still wait until all messages
has been fully processed, before it continues. Its only the sending and
processing the replies from the recipients which happens concurrently. When
parallel processing is enabled, then the Camel routing engin will continue
processing using last used thread from the [...]
@YamlProperty(name = "shareUnitOfWork", type = "boolean",
description = "Shares the org.apache.camel.spi.UnitOfWork with the parent and
each of the sub messages. Recipient List will by default not share unit of work
between the parent exchange and each recipient exchange. This means each sub
exchange has its own individual unit of work.", displayName = "Share Unit Of
Work"),
@YamlProperty(name = "stopOnException", type = "boolean",
description = "Will now stop further processing if an exception or failure
occurred during processing of an org.apache.camel.Exchange and the caused
exception will be thrown. Will also stop if processing the exchange failed (has
a fault message) or an exception was thrown and handled by the error handler
(such as using onException). In all situations the recipient list will stop
further processing. This is the [...]
@YamlProperty(name = "streaming", type = "boolean",
description = "If enabled then Camel will process replies out-of-order, eg in
the order they come back. If disabled, Camel will process replies in the same
order as defined by the recipient list.", displayName = "Streaming"),
@@ -17422,7 +17422,7 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
@YamlProperty(name = "id", type = "string", description =
"Sets the id of this node", displayName = "Id"),
@YamlProperty(name = "onPrepare", type = "string",
description = "Uses the Processor when preparing the org.apache.camel.Exchange
to be sent. This can be used to deep-clone messages that should be sent, or any
custom logic needed before the exchange is sent.", displayName = "On Prepare"),
@YamlProperty(name = "parallelAggregate", type =
"boolean", deprecated = true, description = "If enabled then the aggregate
method on AggregationStrategy can be called concurrently. Notice that this
would require the implementation of AggregationStrategy to be implemented as
thread-safe. By default this is false meaning that Camel synchronizes the call
to the aggregate method. Though in some use-cases this can be used to archive
higher performance when the Aggregation [...]
- @YamlProperty(name = "parallelProcessing", type =
"boolean", description = "If enabled then processing each split messages occurs
concurrently. Note the caller thread will still wait until all messages has
been fully processed, before it continues. It's only processing the sub
messages from the splitter which happens concurrently. When parallel processing
is enabled, then the Camel routing engin will continue processing using last
used thread from the parallel thread [...]
+ @YamlProperty(name = "parallelProcessing", type =
"boolean", description = "If enabled then processing each split messages occurs
concurrently. Note the caller thread will still wait until all messages has
been fully processed, before it continues. It's only processing the sub
messages from the splitter which happens concurrently. When parallel processing
is enabled, then the Camel routing engin will continue processing using last
used thread from the parallel thread [...]
@YamlProperty(name = "shareUnitOfWork", type = "boolean",
description = "Shares the org.apache.camel.spi.UnitOfWork with the parent and
each of the sub messages. Splitter will by default not share unit of work
between the parent exchange and each split exchange. This means each split
exchange has its own individual unit of work.", displayName = "Share Unit Of
Work"),
@YamlProperty(name = "steps", type =
"array:org.apache.camel.model.ProcessorDefinition"),
@YamlProperty(name = "stopOnException", type = "boolean",
description = "Will now stop further processing if an exception or failure
occurred during processing of an org.apache.camel.Exchange and the caused
exception will be thrown. Will also stop if processing the exchange failed (has
a fault message) or an exception was thrown and handled by the error handler
(such as using onException). In all situations the splitter will stop further
processing. This is the same b [...]
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index 7deecd56aaf..28d783304bc 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -3383,7 +3383,7 @@
"parallelProcessing" : {
"type" : "boolean",
"title" : "Parallel Processing",
- "description" : "If enabled then sending messages to the
multicasts occurs concurrently. Note the caller thread will still wait until
all messages has been fully processed, before it continues. Its only the
sending and processing the replies from the multicasts which happens
concurrently. When parallel processing is enabled, then the Camel routing engin
will continue processing using last used thread from the parallel thread pool.
However, if you want to use the original thre [...]
+ "description" : "If enabled then sending messages to the
multicasts occurs concurrently. Note the caller thread will still wait until
all messages has been fully processed, before it continues. Its only the
sending and processing the replies from the multicasts which happens
concurrently. When parallel processing is enabled, then the Camel routing engin
will continue processing using last used thread from the parallel thread pool.
However, if you want to use the original thre [...]
},
"shareUnitOfWork" : {
"type" : "boolean",
@@ -4484,7 +4484,7 @@
"parallelProcessing" : {
"type" : "boolean",
"title" : "Parallel Processing",
- "description" : "If enabled then sending messages to the
recipients occurs concurrently. Note the caller thread will still wait until
all messages has been fully processed, before it continues. Its only the
sending and processing the replies from the recipients which happens
concurrently. When parallel processing is enabled, then the Camel routing engin
will continue processing using last used thread from the parallel thread pool.
However, if you want to use the original thre [...]
+ "description" : "If enabled then sending messages to the
recipients occurs concurrently. Note the caller thread will still wait until
all messages has been fully processed, before it continues. Its only the
sending and processing the replies from the recipients which happens
concurrently. When parallel processing is enabled, then the Camel routing engin
will continue processing using last used thread from the parallel thread pool.
However, if you want to use the original thre [...]
},
"shareUnitOfWork" : {
"type" : "boolean",
@@ -6794,7 +6794,7 @@
"parallelProcessing" : {
"type" : "boolean",
"title" : "Parallel Processing",
- "description" : "If enabled then processing each split messages
occurs concurrently. Note the caller thread will still wait until all messages
has been fully processed, before it continues. It's only processing the sub
messages from the splitter which happens concurrently. When parallel processing
is enabled, then the Camel routing engin will continue processing using last
used thread from the parallel thread pool. However, if you want to use the
original thread that called t [...]
+ "description" : "If enabled then processing each split messages
occurs concurrently. Note the caller thread will still wait until all messages
has been fully processed, before it continues. It's only processing the sub
messages from the splitter which happens concurrently. When parallel processing
is enabled, then the Camel routing engin will continue processing using last
used thread from the parallel thread pool. However, if you want to use the
original thread that called t [...]
},
"shareUnitOfWork" : {
"type" : "boolean",