This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new 63bd587da52 CAMEL-20614: deep-copy output processors during
instantiation of a route template (#13824)
63bd587da52 is described below
commit 63bd587da526c7f681dcef6aba322ae4e275f3d4
Author: Bartosz Popiela <[email protected]>
AuthorDate: Thu Apr 18 13:39:58 2024 +0200
CAMEL-20614: deep-copy output processors during instantiation of a route
template (#13824)
* CAMEL-20614: deep-copy output processors during instantiation of a route
template
When multiple threads try to instantiate and send an exchange to the same
kamelet in parallel,
org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be
thrown because the underlying RouteTemplateDefinition is shallow-copied and
changes to the RouteDefinition are reflected in the RouteTemplateDefinition.
CAMEL-20614: deep-copy output processors during instantiation of a route
template
When multiple threads try to instantiate and send an exchange to the same
kamelet in parallel,
org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be
thrown because the underlying RouteTemplateDefinition is shallow-copied and
changes to the RouteDefinition are reflected in the RouteTemplateDefinition.
* CAMEL-20545: Using replaceFromWith with camel-test and having route
templates can lead to duplicate consumer on starutp error. (#13485)
(cherry picked from commit 8aab61a7a286f6c0ed34433c068ca065084dc67d)
* CAMEL-20614: add shallowCopy as per code review comment
* CAMEL-20614: update access modifiers of copy constructors to be protected
as per code review comment
* CAMEL-20614: add unit tests
---------
Co-authored-by: Claus Ibsen <[email protected]>
---
.../kamelet/KameletMultiThreadedTest.java | 70 ++++++++++++++++++++++
...ition.java => CopyableProcessorDefinition.java} | 15 ++---
.../org/apache/camel/model/FromDefinition.java | 16 +++++
.../org/apache/camel/model/NoOutputDefinition.java | 6 ++
.../camel/model/OptionalIdentifiedDefinition.java | 12 ++++
.../apache/camel/model/ProcessorDefinition.java | 11 ++++
.../camel/model/RouteTemplateDefinition.java | 35 +++++++++--
.../org/apache/camel/model/SendDefinition.java | 8 +++
.../java/org/apache/camel/model/ToDefinition.java | 13 +++-
.../apache/camel/model/ToDynamicDefinition.java | 19 +++++-
.../camel/model/RouteTemplateDefinitionTest.java | 63 +++++++++++++++++++
11 files changed, 251 insertions(+), 17 deletions(-)
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
new file mode 100644
index 00000000000..73d47e2cd6f
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.kamelet.Kamelet.templateToRoute;
+
+public class KameletMultiThreadedTest extends CamelTestSupport {
+
+ @Test
+ public void
createSameKameletTwiceInParallel_KameletConsumerNotAvailableExceptionThrown()
throws InterruptedException {
+ var latch = new CountDownLatch(2);
+ context.addRouteTemplateDefinitionConverter("*", (in, parameters) -> {
+ try {
+ return templateToRoute(in, parameters);
+ } finally {
+ latch.countDown();
+ latch.await();
+ }
+ });
+ getMockEndpoint("mock:foo").expectedMessageCount(2);
+
+ template.sendBody("seda:route", null);
+ template.requestBody("seda:route", ((Object) null));
+
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ // **********************************************
+ //
+ // test set-up
+ //
+ // **********************************************
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("seda:route?concurrentConsumers=2")
+ .toD("kamelet:-");
+
+ routeTemplate("-"). // This is a workaround for "*" to be
iterated before templateId at
org.apache.camel.impl.DefaultModel#addRouteFromTemplate (line 460)
+ from("kamelet:source")
+ .to("mock:foo");
+ }
+ };
+ }
+}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
similarity index 70%
copy from
core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
copy to
core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
index fdd21dd2e8a..cf770d7515c 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
@@ -16,17 +16,10 @@
*/
package org.apache.camel.model;
-import java.util.Collections;
-import java.util.List;
-
/**
- * Base class for definitions which does not support outputs.
+ * This interface is used to copy {@link ProcessorDefinition
ProcessorDefinitions} during instantiation of a route
+ * template.
*/
-public abstract class NoOutputDefinition<Type extends
ProcessorDefinition<Type>> extends ProcessorDefinition<Type> {
-
- @Override
- public List<ProcessorDefinition<?>> getOutputs() {
- return Collections.emptyList();
- }
-
+interface CopyableProcessorDefinition {
+ ProcessorDefinition<?> copy();
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java
index dc5d4b95665..4117dc5efc1 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/FromDefinition.java
@@ -66,6 +66,22 @@ public class FromDefinition extends
OptionalIdentifiedDefinition<FromDefinition>
setEndpointConsumerBuilder(endpointConsumerBuilder);
}
+ FromDefinition copy() {
+ FromDefinition copy = new FromDefinition();
+ copy.parent = this.parent;
+ copy.endpoint = this.endpoint;
+ copy.endpointConsumerBuilder = this.endpointConsumerBuilder;
+ copy.uri = this.uri;
+ copy.variableReceive = this.variableReceive;
+ copy.setCamelContext(this.getCamelContext());
+ copy.setId(this.getId());
+ copy.setCustomId(this.getCustomId());
+ copy.setDescription(this.getDescription());
+ copy.setLineNumber(this.getLineNumber());
+ copy.setLocation(this.getLocation());
+ return copy;
+ }
+
@Override
public String toString() {
return "From[" + getLabel() + "]";
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
index fdd21dd2e8a..92dc058a088 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
@@ -29,4 +29,10 @@ public abstract class NoOutputDefinition<Type extends
ProcessorDefinition<Type>>
return Collections.emptyList();
}
+ public NoOutputDefinition() {
+ }
+
+ protected NoOutputDefinition(NoOutputDefinition source) {
+ super(source);
+ }
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
index f8c42b3e246..767f40181b4 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
@@ -45,6 +45,18 @@ public abstract class OptionalIdentifiedDefinition<T extends
OptionalIdentifiedD
private int lineNumber = -1;
private String location;
+ public OptionalIdentifiedDefinition() {
+ }
+
+ protected OptionalIdentifiedDefinition(OptionalIdentifiedDefinition
source) {
+ this.camelContext = source.camelContext;
+ this.id = source.id;
+ this.customId = source.customId;
+ this.description = source.description;
+ this.lineNumber = source.lineNumber;
+ this.location = source.location;
+ }
+
@Override
public CamelContext getCamelContext() {
return camelContext;
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 8adb212460b..e6c66793487 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -102,6 +102,17 @@ public abstract class ProcessorDefinition<Type extends
ProcessorDefinition<Type>
index = COUNTER.getAndIncrement();
}
+ protected ProcessorDefinition(ProcessorDefinition source) {
+ super(source);
+ this.disabled = source.disabled;
+ this.inheritErrorHandler = source.inheritErrorHandler;
+ this.blocks.addAll(source.blocks);
+ this.parent = source.parent;
+ this.routeConfiguration = source.routeConfiguration;
+ this.interceptStrategies.addAll(source.interceptStrategies);
+ this.index = source.index;
+ }
+
private static <T extends ExpressionNode> ExpressionClause<T>
createAndSetExpression(T result) {
ExpressionClause<T> clause = new ExpressionClause<>(result);
result.setExpression(clause);
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
index 44802872d58..1bf2eeae867 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
@@ -16,7 +16,12 @@
*/
package org.apache.camel.model;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.requireNonNullElse;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@@ -410,15 +415,16 @@ public class RouteTemplateDefinition extends
OptionalIdentifiedDefinition<RouteT
copy.setDelayer(route.getDelayer());
copy.setGroup(route.getGroup());
copy.setInheritErrorHandler(route.isInheritErrorHandler());
- copy.setInput(route.getInput());
+ // make a defensive copy of the input as input can be adviced during
testing or other changes
+ copy.setInput(route.getInput().copy());
copy.setInputType(route.getInputType());
copy.setLogMask(route.getLogMask());
copy.setMessageHistory(route.getMessageHistory());
copy.setOutputType(route.getOutputType());
- copy.setOutputs(route.getOutputs());
- copy.setRoutePolicies(route.getRoutePolicies());
+ copy.setOutputs(copy(route.getOutputs()));
+ copy.setRoutePolicies(shallowCopy(route.getRoutePolicies()));
copy.setRoutePolicyRef(route.getRoutePolicyRef());
- copy.setRouteProperties(route.getRouteProperties());
+ copy.setRouteProperties(shallowCopy(route.getRouteProperties()));
copy.setShutdownRoute(route.getShutdownRoute());
copy.setShutdownRunningTask(route.getShutdownRunningTask());
copy.setStartupOrder(route.getStartupOrder());
@@ -432,6 +438,27 @@ public class RouteTemplateDefinition extends
OptionalIdentifiedDefinition<RouteT
}
copy.setPrecondition(route.getPrecondition());
copy.setRouteConfigurationId(route.getRouteConfigurationId());
+ copy.setTemplateParameters(shallowCopy(route.getTemplateParameters()));
+ return copy;
+ }
+
+ private <T> List<T> shallowCopy(List<T> list) {
+ return (list != null) ? new ArrayList<>(list) : null;
+ }
+
+ private <K, V> Map<K, V> shallowCopy(Map<K, V> map) {
+ return (map != null) ? new HashMap<>(map) : null;
+ }
+
+ private List<ProcessorDefinition<?>> copy(List<ProcessorDefinition<?>>
outputs) {
+ var copy = new ArrayList<ProcessorDefinition<?>>();
+ for (var definition : outputs) {
+ if (definition instanceof CopyableProcessorDefinition copyable) {
+ copy.add(copyable.copy());
+ } else {
+ copy.add(definition);
+ }
+ }
return copy;
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
index 21392205b79..5e3a9725ae3 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
@@ -51,6 +51,14 @@ public abstract class SendDefinition<Type extends
ProcessorDefinition<Type>> ext
this.uri = uri;
}
+ protected SendDefinition(SendDefinition source) {
+ super(source);
+ this.endpointUriToString = source.endpointUriToString;
+ this.endpoint = source.endpoint;
+ this.endpointProducerBuilder = source.endpointProducerBuilder;
+ this.uri = source.uri;
+ }
+
@Override
public String getEndpointUri() {
if (endpointProducerBuilder != null) {
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
index fed74435cd4..469ab1ea8b9 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
@@ -33,7 +33,7 @@ import org.apache.camel.spi.Metadata;
@Metadata(label = "eip,routing")
@XmlRootElement(name = "to")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ToDefinition extends SendDefinition<ToDefinition> {
+public class ToDefinition extends SendDefinition<ToDefinition> implements
CopyableProcessorDefinition {
@XmlAttribute
private String variableSend;
@@ -76,6 +76,13 @@ public class ToDefinition extends
SendDefinition<ToDefinition> {
this.pattern = pattern.name();
}
+ protected ToDefinition(ToDefinition source) {
+ super(source);
+ this.variableSend = source.variableSend;
+ this.variableReceive = source.variableReceive;
+ this.pattern = source.pattern;
+ }
+
@Override
public String getShortName() {
return "to";
@@ -128,4 +135,8 @@ public class ToDefinition extends
SendDefinition<ToDefinition> {
public void setVariableReceive(String variableReceive) {
this.variableReceive = variableReceive;
}
+
+ public ToDefinition copy() {
+ return new ToDefinition(this);
+ }
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
index 3daac2cbf52..b1495e9fa8c 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
@@ -34,7 +34,7 @@ import org.apache.camel.spi.Metadata;
@Metadata(label = "eip,routing")
@XmlRootElement(name = "toD")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ToDynamicDefinition extends
NoOutputDefinition<ToDynamicDefinition> {
+public class ToDynamicDefinition extends
NoOutputDefinition<ToDynamicDefinition> implements CopyableProcessorDefinition {
@XmlTransient
protected EndpointProducerBuilder endpointProducerBuilder;
@@ -69,6 +69,19 @@ public class ToDynamicDefinition extends
NoOutputDefinition<ToDynamicDefinition>
this.uri = uri;
}
+ protected ToDynamicDefinition(ToDynamicDefinition source) {
+ super(source);
+ this.endpointProducerBuilder = source.endpointProducerBuilder;
+ this.uri = source.uri;
+ this.variableSend = source.variableSend;
+ this.variableReceive = source.variableReceive;
+ this.pattern = source.pattern;
+ this.cacheSize = source.cacheSize;
+ this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint;
+ this.allowOptimisedComponents = source.allowOptimisedComponents;
+ this.autoStartComponents = source.autoStartComponents;
+ }
+
@Override
public String getShortName() {
return "toD";
@@ -315,4 +328,8 @@ public class ToDynamicDefinition extends
NoOutputDefinition<ToDynamicDefinition>
public void setAutoStartComponents(String autoStartComponents) {
this.autoStartComponents = autoStartComponents;
}
+
+ public ToDynamicDefinition copy() {
+ return new ToDynamicDefinition(this);
+ }
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
b/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
new file mode 100644
index 00000000000..0c74b35a181
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.support.RoutePolicySupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+class RouteTemplateDefinitionTest {
+
+ @Test
+ void testDeepCopyMutableProperties() {
+ RouteDefinition route = new RouteDefinition();
+ route.setTemplateParameters(Map.of("parameter", "parameterValue"));
+ route.setRouteProperties(List.of(new PropertyDefinition("property",
"propertyValue")));
+ route.setRoutePolicies(List.of(new RoutePolicySupport() {
+ }));
+ route.setInput(new FromDefinition("direct://fromEndpoint"));
+ route.setOutputs(List.of(new ToDefinition("direct://toEndpoint"), new
SetHeaderDefinition("header", "headerValue")));
+ RouteTemplateDefinition routeTemplate = new RouteTemplateDefinition();
+ routeTemplate.setRoute(route);
+
+ RouteDefinition routeCopy = routeTemplate.asRouteDefinition();
+
+ assertNotSame(route.getTemplateParameters(),
routeCopy.getTemplateParameters());
+ assertEquals(route.getTemplateParameters(),
routeCopy.getTemplateParameters());
+ assertNotSame(route.getRouteProperties(),
routeCopy.getRouteProperties());
+ assertEquals(route.getRouteProperties(),
routeCopy.getRouteProperties());
+ assertNotSame(route.getRoutePolicies(), routeCopy.getRoutePolicies());
+ assertEquals(route.getRoutePolicies(), routeCopy.getRoutePolicies());
+ assertNotSame(route.getInput(), routeCopy.getInput());
+ assertEquals(route.getInput().getUri(), routeCopy.getInput().getUri());
+ assertNotSame(route.getOutputs(), routeCopy.getOutputs());
+ assertEquals(2, routeCopy.getOutputs().size());
+ assertNotSame(route.getOutputs().get(0),
routeCopy.getOutputs().get(0));
+ assertInstanceOf(ToDefinition.class, route.getOutputs().get(0));
+ assertInstanceOf(ToDefinition.class, routeCopy.getOutputs().get(0));
+ assertEquals(((ToDefinition) route.getOutputs().get(0)).getUri(),
+ ((ToDefinition) routeCopy.getOutputs().get(0)).getUri());
+ assertSame(route.getOutputs().get(1), routeCopy.getOutputs().get(1));
+ }
+}