[
https://issues.apache.org/jira/browse/CAMEL-20614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bartosz Popiela updated CAMEL-20614:
------------------------------------
Description:
When two or more threads try to instantiate a route template / kamelet
parallelly with toD, the following exception is thrown:
{code:java}
Thread[#77,Camel (camel-1) thread #15 - Split,5,main]
org.apache.camel.component.kamelet.KameletConsumerNotAvailableException: No
consumers available on endpoint: kamelet://my-kamelet?someParameter=someValue.
Exchange[17F2F4997569083-000000000000000C]
at
org.apache.camel.component.kamelet.KameletProducer.process(KameletProducer.java:78)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at
org.apache.camel.processor.errorhandler.NoErrorHandler.process(NoErrorHandler.java:46)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:383)
at
org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:102)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:196)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:164)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:54)
at
org.apache.camel.processor.MulticastProcessor.lambda$schedule$1(MulticastProcessor.java:369)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
at
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1589)
{code}
The sequence of events leading to this exception is as follows:
1. Thread A creates an endpoint with
_org.apache.camel.impl.engine.AbstractCamelContext#doGetEndpoint_
2. Thread A initiates the endpoint with
_org.apache.camel.component.kamelet.KameletEndpoint#doInit_ and sets a key,
e.g. my-kamelet-1
3. Thread A creates a new instance of a route template / kamelet with
{_}org.apache.camel.impl.DefaultModel#addRouteFromTemplate{_}. This method
delegates to _org.apache.camel.model.RouteTemplateDefinition#asRouteDefinition_
which copies RouteDefinition from the template, but {*}it is only a shallow
copy (not a deep copy){*}, so any referenced fields (e.g.
RouteDefinition#input, RouteDefinition#outputs) are shared between threads.
4. Thread A updates the id of RouteDefinition (routeId=1) and URI-s of
RouteDefinition#input (kamelet://source?routeId=1) and RouteDefinition#outputs
(kamelet://sink?routeId=1) (see
{_}org.apache.camel.component.kamelet.Kamelet#templateToRoute{_}) which is also
reflected in the template's RouteDefinition.
5. Thread B executes steps 1-4 and overwrites the URI-s of in the template's
RouteDefinition (my-kamelet-2), hence in the shallow-copied RouteDefinition
referenced by Thread A.
6. Thread A instantiates a Consumer for the URI from RouteDefinition#input, but
because it has been modified by Thread B, it will create a Consumer for
my-kamelet-2 instead of my-kamelet-1.
7. Thread A tries to send an Exchange to the route my-kamelet-1 and creates a
Producer with the same key (my-kamelet-1). This Producer searches for a
Consumer with the same key (my-kamelet-1), but it can't find it because in the
previous step a Consumer with a different key (my-kamelet-2) was created and it
throws
{_}org.apache.camel.component.kamelet.KameletConsumerNotAvailableException{_}.
As a solution _org.apache.camel.component.kamelet.Kamelet#templateToRoute_ can
be synchronised and referenced fields that are modified in this method should
also be copied.
was:
When two or more threads try to instantiate a route template / kamelet
parallelly with toD, the following exception is thrown:
{code:java}
Thread[#77,Camel (camel-1) thread #15 - Split,5,main]
org.apache.camel.component.kamelet.KameletConsumerNotAvailableException: No
consumers available on endpoint: kamelet://my-kamelet?someParameter=someValue.
Exchange[17F2F4997569083-000000000000000C]
at
org.apache.camel.component.kamelet.KameletProducer.process(KameletProducer.java:78)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at
org.apache.camel.processor.errorhandler.NoErrorHandler.process(NoErrorHandler.java:46)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:383)
at
org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:102)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:196)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:164)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:54)
at
org.apache.camel.processor.MulticastProcessor.lambda$schedule$1(MulticastProcessor.java:369)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
at
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1589)
{code}
The following sequence of events occurs which leads to this exception:
1. Thread A creates an endpoint with
_org.apache.camel.impl.engine.AbstractCamelContext#doGetEndpoint_
2. Thread A initiates the endpoint with
_org.apache.camel.component.kamelet.KameletEndpoint#doInit_ and sets a key,
e.g. my-kamelet-1
3. Thread A creates a new instance of a route template / kamelet with
{_}org.apache.camel.impl.DefaultModel#addRouteFromTemplate{_}. This method
delegates to _org.apache.camel.model.RouteTemplateDefinition#asRouteDefinition_
which copies RouteDefinition of the template, but {*}it is only a shallow copy
(not deep copy){*}, thus a referenced fields (e.g. RouteDefinition#input,
RouteDefinition#outputs) will be shared between threads.
4. Thread A updates the id of RouteDefinition (routeId=1) and URI-s of
RouteDefinition#input (kamelet://source?routeId=1) and RouteDefinition#outputs
(kamelet://sink?routeId=1) (see
{_}org.apache.camel.component.kamelet.Kamelet#templateToRoute{_}) which is also
reflected in the template's RouteDefinition.
5. Thread B executes steps 1-4 and overwrites the URI-s of in the template's
RouteDefinition (my-kamelet-2), hence in shallow-copied RouteDefinition
reference by Thread A.
6. Thread A instantiates a Consumer for the URI of RouteDefinition#input, but
because it has been modified by Thread B, it will create a Consumer for
my-kamelet-2 instead of my-kamelet-1.
7. Thread A tries to send an Exchange to the route my-kamelet-1, thus creates a
Producer with the same key (my-kamelet-1) and this Producer searches for a
Consumer with the same key (my-kamelet-1), but it can't be found because in the
previous step a Consumer with a different key (my-kamelet-2) has been created
and it throws
{_}org.apache.camel.component.kamelet.KameletConsumerNotAvailableException{_}.
As a solution
org.apache.camel.component.kamelet.KameletConsumerNotAvailableException can be
synchronized and referenced fields which are modified should also be copied.
> KameletConsumerNotAvailableException is thrown when two or more threads call
> toD
> --------------------------------------------------------------------------------
>
> Key: CAMEL-20614
> URL: https://issues.apache.org/jira/browse/CAMEL-20614
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 3.22.1, 4.4.1
> Reporter: Bartosz Popiela
> Priority: Major
>
> When two or more threads try to instantiate a route template / kamelet
> parallelly with toD, the following exception is thrown:
> {code:java}
> Thread[#77,Camel (camel-1) thread #15 - Split,5,main]
> org.apache.camel.component.kamelet.KameletConsumerNotAvailableException: No
> consumers available on endpoint:
> kamelet://my-kamelet?someParameter=someValue.
> Exchange[17F2F4997569083-000000000000000C]
> at
> org.apache.camel.component.kamelet.KameletProducer.process(KameletProducer.java:78)
> at
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
> at
> org.apache.camel.processor.errorhandler.NoErrorHandler.process(NoErrorHandler.java:46)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:383)
> at
> org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:102)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:196)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:164)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:54)
> at
> org.apache.camel.processor.MulticastProcessor.lambda$schedule$1(MulticastProcessor.java:369)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1589)
> {code}
> The sequence of events leading to this exception is as follows:
> 1. Thread A creates an endpoint with
> _org.apache.camel.impl.engine.AbstractCamelContext#doGetEndpoint_
> 2. Thread A initiates the endpoint with
> _org.apache.camel.component.kamelet.KameletEndpoint#doInit_ and sets a key,
> e.g. my-kamelet-1
> 3. Thread A creates a new instance of a route template / kamelet with
> {_}org.apache.camel.impl.DefaultModel#addRouteFromTemplate{_}. This method
> delegates to
> _org.apache.camel.model.RouteTemplateDefinition#asRouteDefinition_ which
> copies RouteDefinition from the template, but {*}it is only a shallow copy
> (not a deep copy){*}, so any referenced fields (e.g. RouteDefinition#input,
> RouteDefinition#outputs) are shared between threads.
> 4. Thread A updates the id of RouteDefinition (routeId=1) and URI-s of
> RouteDefinition#input (kamelet://source?routeId=1) and
> RouteDefinition#outputs (kamelet://sink?routeId=1) (see
> {_}org.apache.camel.component.kamelet.Kamelet#templateToRoute{_}) which is
> also reflected in the template's RouteDefinition.
> 5. Thread B executes steps 1-4 and overwrites the URI-s of in the template's
> RouteDefinition (my-kamelet-2), hence in the shallow-copied RouteDefinition
> referenced by Thread A.
> 6. Thread A instantiates a Consumer for the URI from RouteDefinition#input,
> but because it has been modified by Thread B, it will create a Consumer for
> my-kamelet-2 instead of my-kamelet-1.
> 7. Thread A tries to send an Exchange to the route my-kamelet-1 and creates a
> Producer with the same key (my-kamelet-1). This Producer searches for a
> Consumer with the same key (my-kamelet-1), but it can't find it because in
> the previous step a Consumer with a different key (my-kamelet-2) was created
> and it throws
> {_}org.apache.camel.component.kamelet.KameletConsumerNotAvailableException{_}.
> As a solution _org.apache.camel.component.kamelet.Kamelet#templateToRoute_
> can be synchronised and referenced fields that are modified in this method
> should also be copied.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)