This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 557b7ff166a CAMEL-21675: camel-kamelet - Allow to call another kamelet
from within a kamelet
557b7ff166a is described below
commit 557b7ff166a754dcdd34997d56648de8f78b5dcb
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jan 29 11:22:04 2025 +0100
CAMEL-21675: camel-kamelet - Allow to call another kamelet from within a
kamelet
---
.../apache/camel/component/kamelet/Kamelet.java | 3 +-
.../component/properties/PropertiesComponent.java | 17 +-
.../org/apache/camel/impl/DefaultCamelContext.java | 213 +++++++++---------
.../org/apache/camel/support/DefaultRegistry.java | 28 +--
.../camel/dsl/yaml/CallNestedKameletTest.groovy | 36 +++
.../dsl/yaml/PipeLoaderErrorHandlerTest.groovy | 242 +++++++++++++++++++++
.../apache/camel/dsl/yaml/PipeLoaderTest.groovy | 155 -------------
.../kamelets/prefix-and-upper-action.kamelet.yaml | 46 ++++
.../resources/kamelets/upper-action.kamelet.yaml | 35 +++
.../src/test/resources/routes2/routes.yaml | 22 ++
10 files changed, 512 insertions(+), 285 deletions(-)
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index f9a642074df..46e1b10f30f 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -217,7 +217,8 @@ public final class Kamelet {
wrap = pro == null ||
ProcessorDefinitionHelper.shouldWrapInErrorHandler(def.getCamelContext(), pro,
null,
pro.getInheritErrorHandler());
}
- if (wrap && parent != null) {
+ if (wrap && parent != null && parent.isKamelet() == null) {
+ // do not wrap if the parent is also a kamelet
def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder());
} else {
def.setErrorHandlerFactory(new NoErrorHandlerBuilder());
diff --git
a/core/camel-base/src/main/java/org/apache/camel/component/properties/PropertiesComponent.java
b/core/camel-base/src/main/java/org/apache/camel/component/properties/PropertiesComponent.java
index ab3e2a6bac1..9215e412547 100644
---
a/core/camel-base/src/main/java/org/apache/camel/component/properties/PropertiesComponent.java
+++
b/core/camel-base/src/main/java/org/apache/camel/component/properties/PropertiesComponent.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
+import java.util.Stack;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -123,8 +124,7 @@ public class PropertiesComponent extends ServiceSupport
private boolean defaultFallbackEnabled = true;
private Properties initialProperties;
private Properties overrideProperties;
- private final ThreadLocal<Properties> localProperties = new
ThreadLocal<>();
- private volatile boolean localPropertiesEnabled;
+ private final Stack<Properties> localProperties = new Stack<>();;
private int systemPropertiesMode = SYSTEM_PROPERTIES_MODE_OVERRIDE;
private int environmentVariableMode = ENVIRONMENT_VARIABLES_MODE_OVERRIDE;
private boolean autoDiscoverPropertiesSources = true;
@@ -570,11 +570,9 @@ public class PropertiesComponent extends ServiceSupport
@Override
public void setLocalProperties(Properties localProperties) {
if (localProperties != null) {
- this.localProperties.set(localProperties);
- this.localPropertiesEnabled = true;
- } else {
- this.localProperties.remove();
- this.localPropertiesEnabled = false;
+ this.localProperties.push(localProperties);
+ } else if (!this.localProperties.isEmpty()) {
+ this.localProperties.pop();
}
}
@@ -583,7 +581,10 @@ public class PropertiesComponent extends ServiceSupport
* currently in use.
*/
public Properties getLocalProperties() {
- return localPropertiesEnabled ? localProperties.get() : null;
+ if (localProperties.isEmpty()) {
+ return null;
+ }
+ return localProperties.peek();
}
@Override
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 6b381c80369..87675091e80 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -629,125 +629,128 @@ public class DefaultCamelContext extends
SimpleCamelContext implements ModelCame
RouteDefinitionHelper.forceAssignIds(getCamelContextReference(),
routeDefinitions);
List<RouteDefinition> routeDefinitionsToRemove = null;
for (RouteDefinition routeDefinition : routeDefinitions) {
- // assign ids to the routes and validate that the id's is all
unique
- String duplicate =
RouteDefinitionHelper.validateUniqueIds(routeDefinition, routeDefinitions,
- routeDefinition.getNodePrefixId());
- if (duplicate != null) {
- throw new FailedToStartRouteException(
- routeDefinition.getId(),
- "duplicate id detected: " + duplicate + ". Please
correct ids to be unique among all your routes.");
- }
+ try {
+ // assign ids to the routes and validate that the id's is
all unique
+ String duplicate =
RouteDefinitionHelper.validateUniqueIds(routeDefinition, routeDefinitions,
+ routeDefinition.getNodePrefixId());
+ if (duplicate != null) {
+ throw new FailedToStartRouteException(
+ routeDefinition.getId(),
+ "duplicate id detected: " + duplicate
+ + ". Please correct
ids to be unique among all your routes.");
+ }
- // if the route definition was created via a route template
then we need to prepare its parameters when the route is being created and
started
- if (routeDefinition.isTemplate() != null &&
routeDefinition.isTemplate()
- && routeDefinition.getTemplateParameters() != null) {
+ // if the route definition was created via a route
template then we need to prepare its parameters when the route is being created
and started
+ if (routeDefinition.isTemplate() != null &&
routeDefinition.isTemplate()
+ && routeDefinition.getTemplateParameters() !=
null) {
- // apply configurer if any present
- if
(routeDefinition.getRouteTemplateContext().getConfigurer() != null) {
-
routeDefinition.getRouteTemplateContext().getConfigurer()
-
.accept(routeDefinition.getRouteTemplateContext());
- }
+ // apply configurer if any present
+ if
(routeDefinition.getRouteTemplateContext().getConfigurer() != null) {
+
routeDefinition.getRouteTemplateContext().getConfigurer()
+
.accept(routeDefinition.getRouteTemplateContext());
+ }
- // copy parameters/bean repository to not cause side effect
- Map<String, Object> params = new
HashMap<>(routeDefinition.getTemplateParameters());
- LocalBeanRegistry bbr
- = (LocalBeanRegistry)
routeDefinition.getRouteTemplateContext().getLocalBeanRepository();
- LocalBeanRegistry bbrCopy = new LocalBeanRegistry();
-
- // make all bean in the bean repository use unique keys
(need to add uuid counter)
- // so when the route template is used again to create
another route, then there is
- // no side-effect from previously used values that Camel
may use in its endpoint
- // registry and elsewhere
- if (bbr != null && !bbr.isEmpty()) {
- for (Map.Entry<String, Object> param :
params.entrySet()) {
- Object value = param.getValue();
- if (value instanceof String oldKey) {
- boolean clash = bbr.keys().stream().anyMatch(k
-> k.equals(oldKey));
- if (clash) {
- String newKey = oldKey + "-" +
UUID.generateUuid();
- LOG.debug(
- "Route: {} re-assigning local-bean
id: {} to: {} to ensure ids are globally unique",
- routeDefinition.getId(), oldKey,
newKey);
- bbrCopy.put(newKey, bbr.remove(oldKey));
- param.setValue(newKey);
+ // copy parameters/bean repository to not cause side
effect
+ Map<String, Object> params = new
HashMap<>(routeDefinition.getTemplateParameters());
+ LocalBeanRegistry bbr
+ = (LocalBeanRegistry)
routeDefinition.getRouteTemplateContext().getLocalBeanRepository();
+ LocalBeanRegistry bbrCopy = new LocalBeanRegistry();
+
+ // make all bean in the bean repository use unique
keys (need to add uuid counter)
+ // so when the route template is used again to create
another route, then there is
+ // no side-effect from previously used values that
Camel may use in its endpoint
+ // registry and elsewhere
+ if (bbr != null && !bbr.isEmpty()) {
+ for (Map.Entry<String, Object> param :
params.entrySet()) {
+ Object value = param.getValue();
+ if (value instanceof String oldKey) {
+ boolean clash =
bbr.keys().stream().anyMatch(k -> k.equals(oldKey));
+ if (clash) {
+ String newKey = oldKey + "-" +
UUID.generateUuid();
+ LOG.debug(
+ "Route: {} re-assigning
local-bean id: {} to: {} to ensure ids are globally unique",
+ routeDefinition.getId(),
oldKey, newKey);
+ bbrCopy.put(newKey,
bbr.remove(oldKey));
+ param.setValue(newKey);
+ }
}
}
- }
- // the remainder of the local beans must also have
their ids made global unique
- for (Map.Entry<String, Map<Class<?>, Object>> entry :
bbr.entrySet()) {
- String oldKey = entry.getKey();
- String newKey = oldKey + "-" + UUID.generateUuid();
- LOG.debug(
- "Route: {} re-assigning local-bean id: {}
to: {} to ensure ids are globally unique",
- routeDefinition.getId(), oldKey, newKey);
- bbrCopy.put(newKey, entry.getValue());
- if (!params.containsKey(oldKey)) {
- // if a bean was bound as local bean with a
key and it was not defined as template parameter
- // then store it as if it was a template
parameter with same key=value which allows us
- // to use this local bean in the route without
any problem such as:
- // to("bean:{{myBean}}")
- // and myBean is the local bean id.
- params.put(oldKey, newKey);
+ // the remainder of the local beans must also have
their ids made global unique
+ for (Map.Entry<String, Map<Class<?>, Object>>
entry : bbr.entrySet()) {
+ String oldKey = entry.getKey();
+ String newKey = oldKey + "-" +
UUID.generateUuid();
+ LOG.debug(
+ "Route: {} re-assigning local-bean id:
{} to: {} to ensure ids are globally unique",
+ routeDefinition.getId(), oldKey,
newKey);
+ bbrCopy.put(newKey, entry.getValue());
+ if (!params.containsKey(oldKey)) {
+ // if a bean was bound as local bean with
a key and it was not defined as template parameter
+ // then store it as if it was a template
parameter with same key=value which allows us
+ // to use this local bean in the route
without any problem such as:
+ // to("bean:{{myBean}}")
+ // and myBean is the local bean id.
+ params.put(oldKey, newKey);
+ }
}
}
- }
- OrderedLocationProperties prop = new
OrderedLocationProperties();
- if (routeDefinition.getTemplateDefaultParameters() !=
null) {
- // need to keep track if a parameter is set as default
value or end user configured value
- params.forEach((k, v) -> {
- Object dv =
routeDefinition.getTemplateDefaultParameters().get(k);
- prop.put(routeDefinition.getLocation(), k, v, dv);
- });
- } else {
- prop.putAll(routeDefinition.getLocation(), params);
- }
- pc.setLocalProperties(prop);
+ OrderedLocationProperties prop = new
OrderedLocationProperties();
+ if (routeDefinition.getTemplateDefaultParameters() !=
null) {
+ // need to keep track if a parameter is set as
default value or end user configured value
+ params.forEach((k, v) -> {
+ Object dv =
routeDefinition.getTemplateDefaultParameters().get(k);
+ prop.put(routeDefinition.getLocation(), k, v,
dv);
+ });
+ } else {
+ prop.putAll(routeDefinition.getLocation(), params);
+ }
+ pc.setLocalProperties(prop);
- // we need to shadow the bean registry on the CamelContext
with the local beans from the route template context
- if (localBeans != null) {
- localBeans.setLocalBeanRepository(bbrCopy);
- }
+ // we need to shadow the bean registry on the
CamelContext with the local beans from the route template context
+ if (localBeans != null) {
+ localBeans.setLocalBeanRepository(bbrCopy);
+ }
- // need to reset auto assigned ids, so there is no clash
when creating routes
-
ProcessorDefinitionHelper.resetAllAutoAssignedNodeIds(routeDefinition);
- // must re-init parent when created from a template
- RouteDefinitionHelper.initParent(routeDefinition);
- }
- // Check if the route is included
- if (includedRoute(routeDefinition)) {
- // must ensure route is prepared, before we can start it
- if (!routeDefinition.isPrepared()) {
-
RouteDefinitionHelper.prepareRoute(getCamelContextReference(), routeDefinition);
- routeDefinition.markPrepared();
+ // need to reset auto assigned ids, so there is no
clash when creating routes
+
ProcessorDefinitionHelper.resetAllAutoAssignedNodeIds(routeDefinition);
+ // must re-init parent when created from a template
+ RouteDefinitionHelper.initParent(routeDefinition);
}
- // force the creation of ids on all nodes in the route
- RouteDefinitionHelper.forceAssignIds(this,
routeDefinition.getInput());
- RouteDefinitionHelper.forceAssignIds(this,
routeDefinition);
+ // Check if the route is included
+ if (includedRoute(routeDefinition)) {
+ // must ensure route is prepared, before we can start
it
+ if (!routeDefinition.isPrepared()) {
+
RouteDefinitionHelper.prepareRoute(getCamelContextReference(), routeDefinition);
+ routeDefinition.markPrepared();
+ }
+ // force the creation of ids on all nodes in the route
+ RouteDefinitionHelper.forceAssignIds(this,
routeDefinition.getInput());
+ RouteDefinitionHelper.forceAssignIds(this,
routeDefinition);
- StartupStepRecorder recorder
- =
getCamelContextReference().getCamelContextExtension().getStartupStepRecorder();
- StartupStep step = recorder.beginStep(Route.class,
routeDefinition.getRouteId(), "Create Route");
+ StartupStepRecorder recorder
+ =
getCamelContextReference().getCamelContextExtension().getStartupStepRecorder();
+ StartupStep step = recorder.beginStep(Route.class,
routeDefinition.getRouteId(), "Create Route");
-
getCamelContextExtension().createRoute(routeDefinition.getRouteId());
+
getCamelContextExtension().createRoute(routeDefinition.getRouteId());
- Route route =
model.getModelReifierFactory().createRoute(this, routeDefinition);
- recorder.endStep(step);
+ Route route =
model.getModelReifierFactory().createRoute(this, routeDefinition);
+ recorder.endStep(step);
- RouteService routeService = new RouteService(route);
- startRouteService(routeService, true);
- } else {
- // Add the definition to the list of definitions to remove
as the route is excluded
- if (routeDefinitionsToRemove == null) {
- routeDefinitionsToRemove = new
ArrayList<>(routeDefinitions.size());
+ RouteService routeService = new RouteService(route);
+ startRouteService(routeService, true);
+ } else {
+ // Add the definition to the list of definitions to
remove as the route is excluded
+ if (routeDefinitionsToRemove == null) {
+ routeDefinitionsToRemove = new
ArrayList<>(routeDefinitions.size());
+ }
+ routeDefinitionsToRemove.add(routeDefinition);
+ }
+ } finally {
+ // clear local after the route is created via the reifier
+ pc.setLocalProperties(null);
+ if (localBeans != null) {
+ localBeans.setLocalBeanRepository(null);
}
- routeDefinitionsToRemove.add(routeDefinition);
- }
-
- // clear local after the route is created via the reifier
- pc.setLocalProperties(null);
- if (localBeans != null) {
- localBeans.setLocalBeanRepository(null);
}
}
if (routeDefinitionsToRemove != null) {
@@ -759,10 +762,6 @@ public class DefaultCamelContext extends
SimpleCamelContext implements ModelCame
setStartingRoutes(false);
}
getCamelContextExtension().createRoute(null);
- pc.setLocalProperties(null);
- if (localBeans != null) {
- localBeans.setLocalBeanRepository(null);
- }
}
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultRegistry.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultRegistry.java
index 4b08844eb57..182c53e6808 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultRegistry.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultRegistry.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Stack;
import java.util.function.Supplier;
import org.apache.camel.CamelContext;
@@ -57,8 +58,7 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
private static final Logger LOG =
LoggerFactory.getLogger(DefaultRegistry.class);
protected CamelContext camelContext;
- protected final ThreadLocal<BeanRepository> localRepository = new
ThreadLocal<>();
- protected volatile boolean localRepositoryEnabled; // flag to keep track
if local is in use or not
+ protected final Stack<BeanRepository> localRepository = new Stack<>();;
protected List<BeanRepository> repositories;
protected Registry fallbackRegistry = new SimpleRegistry();
protected Registry supplierRegistry = new SupplierRegistry();
@@ -103,21 +103,21 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
*/
public void setLocalBeanRepository(BeanRepository repository) {
if (repository != null) {
- this.localRepository.set(repository);
- this.localRepositoryEnabled = true;
- } else {
- BeanRepository old = this.localRepository.get();
+ this.localRepository.push(repository);
+ } else if (!this.localRepository.isEmpty()) {
+ BeanRepository old = this.localRepository.pop();
if (old != null) {
ServiceHelper.stopService(old);
}
- this.localRepository.remove();
- this.localRepositoryEnabled = false;
}
}
@Override
public BeanRepository getLocalBeanRepository() {
- return localRepositoryEnabled ? localRepository.get() : null;
+ if (localRepository.isEmpty()) {
+ return null;
+ }
+ return localRepository.peek();
}
/**
@@ -257,7 +257,7 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
}
// local repository takes precedence
- BeanRepository local = localRepositoryEnabled ? localRepository.get()
: null;
+ BeanRepository local = getLocalBeanRepository();
if (local != null) {
answer = local.lookupByName(name);
if (answer != null) {
@@ -298,7 +298,7 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
}
// local repository takes precedence
- BeanRepository local = localRepositoryEnabled ? localRepository.get()
: null;
+ BeanRepository local = getLocalBeanRepository();
if (local != null) {
answer = local.lookupByNameAndType(name, type);
if (answer != null) {
@@ -329,7 +329,7 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
Map<String, T> answer = new LinkedHashMap<>();
// local repository takes precedence
- BeanRepository local = localRepositoryEnabled ? localRepository.get()
: null;
+ BeanRepository local = getLocalBeanRepository();
if (local != null) {
Map<String, T> found = local.findByTypeWithName(type);
if (found != null && !found.isEmpty()) {
@@ -363,7 +363,7 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
Set<T> answer = new LinkedHashSet<>();
// local repository takes precedence
- BeanRepository local = localRepositoryEnabled ? localRepository.get()
: null;
+ BeanRepository local = getLocalBeanRepository();
if (local != null) {
Set<T> found = local.findByType(type);
if (found != null && !found.isEmpty()) {
@@ -397,7 +397,7 @@ public class DefaultRegistry extends ServiceSupport
implements Registry, LocalBe
T found = null;
// local repository takes precedence
- BeanRepository local = localRepositoryEnabled ? localRepository.get()
: null;
+ BeanRepository local = getLocalBeanRepository();
if (local != null) {
found = local.findSingleByType(type);
}
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/CallNestedKameletTest.groovy
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/CallNestedKameletTest.groovy
new file mode 100644
index 00000000000..b51fc54c9e7
--- /dev/null
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/CallNestedKameletTest.groovy
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.yaml
+
+import org.apache.camel.component.mock.MockEndpoint
+import org.apache.camel.main.Main
+import spock.lang.Specification
+
+class CallNestedKameletTest extends Specification {
+
+ def 'call kamelet from within a kamelet'() {
+ given:
+ def main = new Main()
+ main.configure().withRoutesIncludePattern('routes2/*.yaml')
+ when:
+ main.start()
+ main.camelContext.getEndpoint("mock:result",
MockEndpoint.class).expectedBodiesReceived("HELLO WORLD")
+ main.camelTemplate.sendBody("direct:start", "world")
+ then:
+ main.camelContext.getEndpoint("mock:result",
MockEndpoint.class).assertIsSatisfied()
+ }
+}
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderErrorHandlerTest.groovy
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderErrorHandlerTest.groovy
new file mode 100644
index 00000000000..5f068ae0f5f
--- /dev/null
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderErrorHandlerTest.groovy
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.yaml
+
+import org.apache.camel.Exchange
+import org.apache.camel.Processor
+import org.apache.camel.component.mock.MockEndpoint
+import org.apache.camel.dsl.yaml.support.YamlTestSupport
+import org.apache.camel.model.ToDefinition
+import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition
+import org.apache.camel.model.errorhandler.DefaultErrorHandlerDefinition
+import org.apache.camel.model.errorhandler.NoErrorHandlerDefinition
+
+class PipeLoaderErrorHandlerTest extends YamlTestSupport {
+ @Override
+ def doSetup() {
+ context.start()
+ }
+
+ def "Pipe with kamelet error handler"() {
+ when:
+
+ // stub kafka for testing as it requires to setup connection to a real
kafka broker
+ context.addComponent("kafka", context.getComponent("stub"))
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: timer-event-source
+ spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: timer-source
+ properties:
+ message: "Hello world!"
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ errorHandler:
+ sink:
+ endpoint:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: error-handler
+ properties:
+ log-message: "ERROR!"
+ kafka-brokers: my-broker
+ kafka-topic: my-first-test
+ kafka-service-account-id: scott
+ kafka-service-account-secret: tiger
+ parameters:
+ maximumRedeliveries: 1
+ redeliveryDelay: 2000
+ ''')
+ then:
+ context.routeDefinitions.size() == 4
+
+ with (context.routeDefinitions[0]) {
+ errorHandlerFactory != null
+ errorHandlerFactory instanceof DeadLetterChannelDefinition
+ var eh = errorHandlerFactory as DeadLetterChannelDefinition
+ eh.deadLetterUri ==
'kamelet:error-handler?kafkaTopic=my-first-test&logMessage=ERROR%21&kafkaServiceAccountId=scott&kafkaBrokers=my-broker&kafkaServiceAccountSecret=tiger'
+ eh.redeliveryPolicy.maximumRedeliveries == "1"
+ eh.redeliveryPolicy.redeliveryDelay == "2000"
+ routeId == 'timer-event-source'
+ input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+ outputs.size() == 1
+ with (outputs[0], ToDefinition) {
+ endpointUri == 'kamelet:log-sink'
+ }
+ }
+ }
+
+ def "Pipe with error handler move to dlq"() {
+ when:
+
+ context.registry.bind 'chaos', new Processor() {
+ @Override
+ void process(Exchange exchange) throws Exception {
+ throw new IllegalArgumentException("Forced")
+ }
+ }
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: timer-event-source
+ spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: timer-source
+ properties:
+ message: "Hello world!"
+ steps:
+ - uri: bean:chaos
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ errorHandler:
+ sink:
+ endpoint:
+ uri: mock:dead
+ parameters:
+ maximumRedeliveries: 3
+ redeliveryDelay: 100
+ ''')
+ then:
+ context.routeDefinitions.size() == 3
+
+ MockEndpoint mock = context.getEndpoint("mock:dead",
MockEndpoint.class)
+ mock.expectedMinimumMessageCount(1)
+
+ mock.assertIsSatisfied()
+
+ with (context.routeDefinitions[0]) {
+ errorHandlerFactory != null
+ errorHandlerFactory instanceof DeadLetterChannelDefinition
+ var eh = errorHandlerFactory as DeadLetterChannelDefinition
+ eh.deadLetterUri == 'mock:dead'
+ eh.redeliveryPolicy.maximumRedeliveries == "3"
+ eh.redeliveryPolicy.redeliveryDelay == "100"
+ }
+ }
+
+ def "Pipe with log error handler"() {
+ when:
+
+ // stub kafka for testing as it requires to setup connection to a real
kafka broker
+ context.addComponent("kafka", context.getComponent("stub"))
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: timer-event-source
+ spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: timer-source
+ properties:
+ message: "Hello world!"
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ errorHandler:
+ log:
+ parameters:
+ use-original-message: true
+ maximumRedeliveries: 1
+ redeliveryDelay: 2000
+ ''')
+ then:
+ context.routeDefinitions.size() == 3
+
+ with (context.routeDefinitions[0]) {
+ errorHandlerFactory != null
+ errorHandlerFactory instanceof DefaultErrorHandlerDefinition
+ var eh = errorHandlerFactory as DefaultErrorHandlerDefinition
+ eh.redeliveryPolicy.maximumRedeliveries == "1"
+ eh.redeliveryPolicy.redeliveryDelay == "2000"
+ eh.getUseOriginalMessage() == "true"
+ routeId == 'timer-event-source'
+ input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+ outputs.size() == 1
+ with (outputs[0], ToDefinition) {
+ endpointUri == 'kamelet:log-sink'
+ }
+ }
+ }
+
+ def "Pipe with none error handler"() {
+ when:
+
+ // stub kafka for testing as it requires to setup connection to a real
kafka broker
+ context.addComponent("kafka", context.getComponent("stub"))
+
+ loadBindings('''
+ apiVersion: camel.apache.org/v1
+ kind: Pipe
+ metadata:
+ name: timer-event-source
+ spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: timer-source
+ properties:
+ message: "Hello world!"
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ errorHandler:
+ none:
+ ''')
+ then:
+ context.routeDefinitions.size() == 3
+
+ with (context.routeDefinitions[0]) {
+ errorHandlerFactory != null
+ errorHandlerFactory instanceof NoErrorHandlerDefinition
+ routeId == 'timer-event-source'
+ input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
+ outputs.size() == 1
+ with (outputs[0], ToDefinition) {
+ endpointUri == 'kamelet:log-sink'
+ }
+ }
+ }
+
+}
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
index d36f9f5ae7f..d7126afae54 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/PipeLoaderTest.groovy
@@ -16,17 +16,10 @@
*/
package org.apache.camel.dsl.yaml
-import org.apache.camel.Exchange
-import org.apache.camel.Processor
-import org.apache.camel.component.mock.MockEndpoint
import org.apache.camel.dsl.yaml.support.YamlTestSupport
import org.apache.camel.model.KameletDefinition
import org.apache.camel.model.ToDefinition
import org.apache.camel.model.TransformDefinition
-import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition
-import org.apache.camel.model.errorhandler.DefaultErrorHandlerDefinition
-import org.apache.camel.model.errorhandler.NoErrorHandlerDefinition
-import org.junit.jupiter.api.Disabled
class PipeLoaderTest extends YamlTestSupport {
@Override
@@ -371,154 +364,6 @@ class PipeLoaderTest extends YamlTestSupport {
}
}*/
- def "Pipe with error handler move to dlq"() {
- when:
-
- context.registry.bind 'chaos', new Processor() {
- @Override
- void process(Exchange exchange) throws Exception {
- throw new IllegalArgumentException("Forced")
- }
- }
-
- loadBindings('''
- apiVersion: camel.apache.org/v1
- kind: Pipe
- metadata:
- name: timer-event-source
- spec:
- source:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1
- name: timer-source
- properties:
- message: "Hello world!"
- steps:
- - uri: bean:chaos
- sink:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1
- name: log-sink
- errorHandler:
- sink:
- endpoint:
- uri: mock:dead
- parameters:
- maximumRedeliveries: 3
- redeliveryDelay: 100
- ''')
- then:
- context.routeDefinitions.size() == 3
-
- MockEndpoint mock = context.getEndpoint("mock:dead",
MockEndpoint.class)
- mock.expectedMinimumMessageCount(1)
-
- mock.assertIsSatisfied()
-
- with (context.routeDefinitions[0]) {
- errorHandlerFactory != null
- errorHandlerFactory instanceof DeadLetterChannelDefinition
- var eh = errorHandlerFactory as DeadLetterChannelDefinition
- eh.deadLetterUri == 'mock:dead'
- eh.redeliveryPolicy.maximumRedeliveries == "3"
- eh.redeliveryPolicy.redeliveryDelay == "100"
- }
- }
-
- def "Pipe with log error handler"() {
- when:
-
- // stub kafka for testing as it requires to setup connection to a real
kafka broker
- context.addComponent("kafka", context.getComponent("stub"))
-
- loadBindings('''
- apiVersion: camel.apache.org/v1
- kind: Pipe
- metadata:
- name: timer-event-source
- spec:
- source:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1
- name: timer-source
- properties:
- message: "Hello world!"
- sink:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1
- name: log-sink
- errorHandler:
- log:
- parameters:
- use-original-message: true
- maximumRedeliveries: 1
- redeliveryDelay: 2000
- ''')
- then:
- context.routeDefinitions.size() == 3
-
- with (context.routeDefinitions[0]) {
- errorHandlerFactory != null
- errorHandlerFactory instanceof DefaultErrorHandlerDefinition
- var eh = errorHandlerFactory as DefaultErrorHandlerDefinition
- eh.redeliveryPolicy.maximumRedeliveries == "1"
- eh.redeliveryPolicy.redeliveryDelay == "2000"
- eh.getUseOriginalMessage() == "true"
- routeId == 'timer-event-source'
- input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
- outputs.size() == 1
- with (outputs[0], ToDefinition) {
- endpointUri == 'kamelet:log-sink'
- }
- }
- }
-
- def "Pipe with none error handler"() {
- when:
-
- // stub kafka for testing as it requires to setup connection to a real
kafka broker
- context.addComponent("kafka", context.getComponent("stub"))
-
- loadBindings('''
- apiVersion: camel.apache.org/v1
- kind: Pipe
- metadata:
- name: timer-event-source
- spec:
- source:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1
- name: timer-source
- properties:
- message: "Hello world!"
- sink:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1
- name: log-sink
- errorHandler:
- none:
- ''')
- then:
- context.routeDefinitions.size() == 3
-
- with (context.routeDefinitions[0]) {
- errorHandlerFactory != null
- errorHandlerFactory instanceof NoErrorHandlerDefinition
- routeId == 'timer-event-source'
- input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
- outputs.size() == 1
- with (outputs[0], ToDefinition) {
- endpointUri == 'kamelet:log-sink'
- }
- }
- }
-
def "Pipe from kamelet to knative channel"() {
when:
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/prefix-and-upper-action.kamelet.yaml
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/prefix-and-upper-action.kamelet.yaml
new file mode 100644
index 00000000000..4b1b50b86ed
--- /dev/null
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/prefix-and-upper-action.kamelet.yaml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: camel.apache.org/v1
+kind: Kamelet
+metadata:
+ name: prefix-and-upper-action
+ labels:
+ camel.apache.org/kamelet.type: "step"
+spec:
+ definition:
+ title: "Greeting"
+ description: "A greeting message"
+ required:
+ - greeting
+ properties:
+ prefix:
+ title: Greeting
+ description: The greeting message
+ type: string
+ types:
+ in:
+ mediaType: text/plain
+ out:
+ mediaType: text/plain
+ template:
+ from:
+ uri: "kamelet:source"
+ steps:
+ - to: "kamelet:prefix-action?prefix={{greeting}}"
+ - set-body:
+ simple: "${body.toUpperCase()}"
\ No newline at end of file
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/upper-action.kamelet.yaml
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/upper-action.kamelet.yaml
new file mode 100644
index 00000000000..c245e500486
--- /dev/null
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/kamelets/upper-action.kamelet.yaml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: camel.apache.org/v1
+kind: Kamelet
+metadata:
+ name: upper-action
+ labels:
+ camel.apache.org/kamelet.type: "step"
+spec:
+ types:
+ in:
+ mediaType: text/plain
+ out:
+ mediaType: text/plain
+ template:
+ from:
+ uri: "kamelet:source"
+ steps:
+ - set-body:
+ simple: "${body.toUpperCase()}"
\ No newline at end of file
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/routes2/routes.yaml
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/routes2/routes.yaml
new file mode 100644
index 00000000000..6edda91519b
--- /dev/null
+++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/resources/routes2/routes.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+- from:
+ uri: "direct:start"
+ steps:
+ - to: "kamelet:prefix-and-upper-action?greeting=hello"
+ - to: "mock:result"
\ No newline at end of file