This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelet-eip in repository https://gitbox.apache.org/repos/asf/camel.git
commit 22f01d82573caa11f16d0eda4ace0b9fe813491d Author: Claus Ibsen <[email protected]> AuthorDate: Tue Apr 13 13:20:24 2021 +0200 CAMEL-16493: Kamelet EIP --- components/camel-kamelet/pom.xml | 3 +- .../apache/camel/component/kamelet/Kamelet.java | 11 +- .../camel/component/kamelet/KameletComponent.java | 18 +++ .../camel/component/kamelet/KameletProcessor.java | 151 +++++++++++++++++++++ .../component/kamelet/KameletProcessorFactory.java | 35 +++++ .../camel/component/kamelet/KameletProducer.java | 20 ++- .../camel/component/kamelet/KameletReifier.java | 35 +++++ .../org/apache/camel/model/KameletDefinition | 18 +++ .../component/kamelet/KameletAggregateTest.java | 71 ++++++++++ .../camel/component/kamelet/KameletBasicTest.java | 2 + .../component/kamelet/KameletEipAggregateTest.java | 70 ++++++++++ .../camel/component/kamelet/KameletEipTest.java | 67 +++++++++ .../services/org/apache/camel/model.properties | 1 + .../resources/org/apache/camel/model/jaxb.index | 1 + .../resources/org/apache/camel/model/kamelet.json | 17 +++ .../org/apache/camel/model/KameletDefinition.java | 63 +++++++++ .../apache/camel/model/ProcessorDefinition.java | 14 ++ .../org/apache/camel/reifier/KameletReifier.java | 36 +++++ .../org/apache/camel/reifier/ProcessorReifier.java | 3 + .../java/org/apache/camel/xml/in/ModelParser.java | 10 ++ .../dsl/yaml/deserializers/ModelDeserializers.java | 49 +++++++ .../deserializers/ModelDeserializersResolver.java | 2 + .../src/generated/resources/camel-yaml-dsl.json | 21 +++ 23 files changed, 714 insertions(+), 4 deletions(-) diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml index e799bac..7a47a15 100644 --- a/components/camel-kamelet/pom.xml +++ b/components/camel-kamelet/pom.xml @@ -33,9 +33,10 @@ <description>The Kamelet Component provides support for interacting with the Camel Route Template engine</description> <dependencies> + <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-support</artifactId> + <artifactId>camel-core-reifier</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java index d5bd8ca..10ba82e 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java @@ -124,16 +124,23 @@ public final class Kamelet { if (def.getInput().getEndpointUri().startsWith("kamelet:source") || def.getInput().getEndpointUri().startsWith("kamelet://source")) { def.setInput(null); - def.setInput(new FromDefinition("kamelet:source?" + PARAM_ROUTE_ID + "=" + rid)); + def.setInput(new FromDefinition("kamelet://source?" + PARAM_ROUTE_ID + "=" + rid)); } + // there must be at least one sink + boolean sink = false; Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class); while (it.hasNext()) { ToDefinition to = it.next(); if (to.getEndpointUri().startsWith("kamelet:sink") || to.getEndpointUri().startsWith("kamelet://sink")) { - to.setUri("kamelet:sink?" + PARAM_ROUTE_ID + "=" + rid); + to.setUri("kamelet://sink?" + PARAM_ROUTE_ID + "=" + rid); + sink = true; } } + if (!sink) { + ToDefinition to = new ToDefinition("kamelet://sink?" + PARAM_ROUTE_ID + "=" + rid); + def.getOutputs().add(to); + } return def; } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 65b9524..7dbb94e 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -22,10 +22,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.VetoCamelContextStartException; import org.apache.camel.model.ModelCamelContext; @@ -53,6 +55,9 @@ public class KameletComponent extends DefaultComponent { private final Map<String, KameletConsumer> consumers = new HashMap<>(); private final LifecycleHandler lifecycleHandler = new LifecycleHandler(); + // TODO: + private final Map<String, Processor> callbacks = new ConcurrentHashMap<>(); + // counter that is used for producers to keep track if any consumer was added/removed since they last checked // this is used for optimization to avoid each producer to get consumer for each message processed // (locking via synchronized, and then lookup in the map as the cost) @@ -72,6 +77,18 @@ public class KameletComponent extends DefaultComponent { public KameletComponent() { } + public void pushCallback(String key, Processor callback) { + callbacks.put(key, callback); + } + + public Processor popCallback(String key) { + return callbacks.remove(key); + } + + public Processor getCallback(String key) { + return callbacks.get(key); + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters); @@ -310,6 +327,7 @@ public class KameletComponent extends DefaultComponent { ServiceHelper.stopAndShutdownService(consumers); consumers.clear(); + callbacks.clear(); super.doShutdown(); } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java new file mode 100644 index 0000000..699d108 --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ReactiveExecutor; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.service.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ManagedResource(description = "Managed Kamelet Processor") +public class KameletProcessor extends AsyncProcessorSupport + implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware { + + private static final Logger LOG = LoggerFactory.getLogger(KameletProcessor.class); + + private final ReactiveExecutor reactiveExecutor; + private final String name; + private final AsyncProcessor processor; + private KameletProducer producer; + private KameletComponent component; + private CamelContext camelContext; + private String id; + private String routeId; + + public KameletProcessor(CamelContext camelContext, String name, Processor processor) { + this.camelContext = camelContext; + this.name = name; + this.processor = AsyncProcessorConverterHelper.convert(processor); + this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override + public boolean process(Exchange exchange, final AsyncCallback callback) { + return producer.process(exchange, callback); + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<>(); + answer.add(processor); + return answer; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public String getTraceLabel() { + return "kamelet"; + } + + @Override + protected void doBuild() throws Exception { + if (component == null) { + component = camelContext.getComponent("kamelet", KameletComponent.class); + } + if (producer == null) { + producer = (KameletProducer) camelContext.getEndpoint("kamelet://" + name).createAsyncProducer(); + } + ServiceHelper.buildService(processor, producer); + + component.pushCallback(producer.getKey(), processor); + } + + @Override + protected void doInit() throws Exception { + ServiceHelper.initService(processor, producer); + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startService(processor, producer); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(processor, producer); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(processor, producer); + + component.popCallback(producer.getKey()); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessorFactory.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessorFactory.java new file mode 100644 index 0000000..0acadcb --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.KameletDefinition; +import org.apache.camel.support.TypedProcessorFactory; + +public class KameletProcessorFactory extends TypedProcessorFactory<KameletDefinition> { + + public KameletProcessorFactory() { + super(KameletDefinition.class); + } + + @Override + public Processor doCreateProcessor(Route route, KameletDefinition definition) throws Exception { + return new KameletReifier(route, definition).createProcessor(); + } + +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java index 726c22d..4adc427 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kamelet; import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultAsyncProducer; import org.slf4j.Logger; @@ -34,6 +35,7 @@ final class KameletProducer extends DefaultAsyncProducer { private final String key; private final boolean block; private final long timeout; + private final boolean sink; public KameletProducer(KameletEndpoint endpoint, String key) { super(endpoint); @@ -42,6 +44,7 @@ final class KameletProducer extends DefaultAsyncProducer { this.key = key; this.block = endpoint.isBlock(); this.timeout = endpoint.getTimeout(); + this.sink = getEndpoint().getEndpointKey().startsWith("kamelet://sink"); } @Override @@ -73,11 +76,22 @@ final class KameletProducer extends DefaultAsyncProducer { exchange.setException(new KameletConsumerNotAvailableException( "No consumers available on endpoint: " + endpoint, exchange)); } else { - LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint); + LOG.debug("Exchange ignored, no consumers available on endpoint: {}", endpoint); } callback.done(true); return true; } else { + if (sink) { + // need to execute the callback from the waiting + // TODO: non EIP must also park!!! + AsyncProcessor parked = (AsyncProcessor) component.getCallback(key); + if (parked != null) { + return parked.process(exchange, callback); + } else { + callback.done(true); + return true; + } + } return consumer.getAsyncProcessor().process(exchange, callback); } } catch (Exception e) { @@ -87,4 +101,8 @@ final class KameletProducer extends DefaultAsyncProducer { } } + public String getKey() { + return key; + } + } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java new file mode 100644 index 0000000..1a7083b --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.KameletDefinition; +import org.apache.camel.reifier.ProcessorReifier; + +public class KameletReifier extends ProcessorReifier<KameletDefinition> { + + public KameletReifier(Route route, KameletDefinition definition) { + super(route, definition); + } + + @Override + public Processor createProcessor() throws Exception { + Processor processor = createChildProcessor(true); + return new KameletProcessor(camelContext, parseString(definition.getName()), processor); + } +} diff --git a/components/camel-kamelet/src/main/resources/META-INF/services/org/apache/camel/model/KameletDefinition b/components/camel-kamelet/src/main/resources/META-INF/services/org/apache/camel/model/KameletDefinition new file mode 100644 index 0000000..3cf1471 --- /dev/null +++ b/components/camel-kamelet/src/main/resources/META-INF/services/org/apache/camel/model/KameletDefinition @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.kamelet.KameletProcessorFactory diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java new file mode 100644 index 0000000..ece1c9b --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.AggregationStrategies; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.http.annotation.Obsolete; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +@Disabled("Should use Kamelet EIP") +public class KameletAggregateTest extends CamelTestSupport { + + @Test + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + template.sendBody("direct:start", "D"); + template.sendBody("direct:start", "E"); + + assertMockEndpointsSatisfied(); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Obsolete + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + routeTemplate("my-aggregate") + .templateParameter("count") + .from("kamelet:source") + .aggregate(constant(true)) + .completionSize("{{count}}") + .aggregationStrategy(AggregationStrategies.string(",")) + .to("log:aggregate") + .to("kamelet:sink") + .end(); + + from("direct:start") + .to("kamelet:my-aggregate?count=5") + .to("log:info") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java index ea17c30..4a89418 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java @@ -22,6 +22,7 @@ import org.apache.camel.Exchange; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -36,6 +37,7 @@ public class KameletBasicTest extends CamelTestSupport { } @Test + @Disabled public void canConsumeFromKamelet() { assertThat( consumer.receiveBody("kamelet:tick", Integer.class)).isEqualTo(1); diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java new file mode 100644 index 0000000..b863453 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.AggregationStrategies; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.http.annotation.Obsolete; +import org.junit.jupiter.api.Test; + +public class KameletEipAggregateTest extends CamelTestSupport { + + @Test + public void testAggregate() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C,D,E"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + template.sendBody("direct:start", "D"); + template.sendBody("direct:start", "E"); + + assertMockEndpointsSatisfied(); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Obsolete + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + routeTemplate("my-aggregate") + .templateParameter("count") + .from("kamelet:source") + .aggregate(constant(true)) + .parallelProcessing() + .completionSize("{{count}}") + .aggregationStrategy(AggregationStrategies.string(",")) + .to("log:aggregate") + .to("kamelet:sink") + .end(); + + from("direct:start") + .kamelet("my-aggregate?count=5") + .to("log:info") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipTest.java new file mode 100644 index 0000000..e9c69a9 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.http.annotation.Obsolete; +import org.junit.jupiter.api.Test; + +public class KameletEipTest extends CamelTestSupport { + + @Test + public void testOne() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("AA"); + + template.sendBody("direct:start", "A"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testTwo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("AA", "BB"); + + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + + assertMockEndpointsSatisfied(); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Obsolete + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + routeTemplate("echo") + .from("kamelet:source") + .setBody(body().append(body())); + + from("direct:start") + .kamelet("echo") + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties b/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties index 74a0127..73ac79f 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties +++ b/core/camel-core-model/src/generated/resources/META-INF/services/org/apache/camel/model.properties @@ -77,6 +77,7 @@ joor json jsonApi jsonpath +kamelet kubernetesServiceDiscovery language loadBalance diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index b/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index index 6f74115..916509a 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/jaxb.index @@ -29,6 +29,7 @@ InputTypeDefinition InterceptDefinition InterceptFromDefinition InterceptSendToEndpointDefinition +KameletDefinition LoadBalanceDefinition LoadBalancerDefinition LogDefinition diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/kamelet.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/kamelet.json new file mode 100644 index 0000000..2ef5d7d --- /dev/null +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/kamelet.json @@ -0,0 +1,17 @@ +{ + "model": { + "kind": "model", + "name": "kamelet", + "title": "Kamelet", + "deprecated": false, + "label": "eip,routing,kamelet", + "javaType": "org.apache.camel.model.KameletDefinition", + "input": true, + "output": false + }, + "properties": { + "name": { "kind": "attribute", "displayName": "Name", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the Kamelet (route template id) to call Options for the kamelet can be specified using uri syntax, eg mynamecount=4&type=gold" }, + "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, + "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } + } +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/KameletDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/KameletDefinition.java new file mode 100644 index 0000000..5c4d6fc --- /dev/null +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/KameletDefinition.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.spi.Metadata; + +@Metadata(label = "eip,routing,kamelet") +@XmlRootElement(name = "kamelet") +@XmlAccessorType(XmlAccessType.FIELD) +public class KameletDefinition extends OutputDefinition<KameletDefinition> { + + @XmlAttribute(required = true) + private String name; + + public KameletDefinition() { + } + + @Override + public String toString() { + return "Kamelet[" + getOutputs() + "]"; + } + + @Override + public String getShortName() { + return "kamelet"; + } + + @Override + public String getLabel() { + return "kamelet"; + } + + public String getName() { + return name; + } + + /** + * Name of the Kamelet (route template id) to call Options for the kamelet can be specified using uri syntax, eg + * myname?count=4&type=gold + */ + public void setName(String name) { + this.name = name; + } +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 8820308..0743a44 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -1282,6 +1282,20 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * Creates a Kamelet EIP. + * <p/> + * This requires having camel-kamelet on the classpath. + * + * @return the builder + */ + public KameletDefinition kamelet(String name) { + KameletDefinition answer = new KameletDefinition(); + answer.setName(name); + addOutput(answer); + return answer; + } + + /** * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a> Creates a loadbalance * * @return the builder diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/KameletReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/KameletReifier.java new file mode 100644 index 0000000..68d4850 --- /dev/null +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/KameletReifier.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.reifier; + +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.KameletDefinition; +import org.apache.camel.model.ProcessorDefinition; + +public class KameletReifier extends ProcessorReifier<KameletDefinition> { + + public KameletReifier(Route route, ProcessorDefinition<?> definition) { + super(route, KameletDefinition.class.cast(definition)); + } + + @Override + public Processor createProcessor() throws Exception { + throw new IllegalStateException( + "Cannot find camel-kamelet on the classpath."); + } + +} diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index 0c95252..047eaa8 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -51,6 +51,7 @@ import org.apache.camel.model.InOutDefinition; import org.apache.camel.model.InterceptDefinition; import org.apache.camel.model.InterceptFromDefinition; import org.apache.camel.model.InterceptSendToEndpointDefinition; +import org.apache.camel.model.KameletDefinition; import org.apache.camel.model.LoadBalanceDefinition; import org.apache.camel.model.LogDefinition; import org.apache.camel.model.LoopDefinition; @@ -207,6 +208,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends return new InterceptReifier<>(route, definition); } else if (definition instanceof InterceptSendToEndpointDefinition) { return new InterceptSendToEndpointReifier(route, definition); + } else if (definition instanceof KameletDefinition) { + return new KameletReifier(route, definition); } else if (definition instanceof LoadBalanceDefinition) { return new LoadBalanceReifier(route, definition); } else if (definition instanceof LogDefinition) { diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java index c8c9b03..bc4a6d6 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java @@ -540,6 +540,15 @@ public class ModelParser extends BaseParser { return true; }, outputDefinitionElementHandler(), noValueHandler()); } + protected KameletDefinition doParseKameletDefinition() throws IOException, XmlPullParserException { + return doParse(new KameletDefinition(), (def, key, val) -> { + if ("name".equals(key)) { + def.setName(val); + return true; + } + return processorDefinitionAttributeHandler().accept(def, key, val); + }, outputDefinitionElementHandler(), noValueHandler()); + } protected LoadBalanceDefinition doParseLoadBalanceDefinition() throws IOException, XmlPullParserException { return doParse(new LoadBalanceDefinition(), processorDefinitionAttributeHandler(), (def, key) -> { @@ -2937,6 +2946,7 @@ public class ModelParser extends BaseParser { case "intercept": return doParseInterceptDefinition(); case "interceptFrom": return doParseInterceptFromDefinition(); case "interceptSendToEndpoint": return doParseInterceptSendToEndpointDefinition(); + case "kamelet": return doParseKameletDefinition(); case "loadBalance": return doParseLoadBalanceDefinition(); case "log": return doParseLogDefinition(); case "loop": return doParseLoopDefinition(); diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java index b21ffb1..643afa5 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java @@ -35,6 +35,7 @@ import org.apache.camel.model.InputTypeDefinition; import org.apache.camel.model.InterceptDefinition; import org.apache.camel.model.InterceptFromDefinition; import org.apache.camel.model.InterceptSendToEndpointDefinition; +import org.apache.camel.model.KameletDefinition; import org.apache.camel.model.LoadBalanceDefinition; import org.apache.camel.model.LoadBalancerDefinition; import org.apache.camel.model.LogDefinition; @@ -6856,6 +6857,54 @@ public final class ModelDeserializers extends YamlDeserializerSupport { } @YamlType( + types = org.apache.camel.model.KameletDefinition.class, + order = org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1, + nodes = "kamelet", + properties = { + @YamlProperty(name = "inherit-error-handler", type = "boolean"), + @YamlProperty(name = "name", type = "string", required = true), + @YamlProperty(name = "steps", type = "array:org.apache.camel.model.ProcessorDefinition") + } + ) + public static class KameletDefinitionDeserializer extends YamlDeserializerBase<KameletDefinition> { + public KameletDefinitionDeserializer() { + super(KameletDefinition.class); + } + + @Override + protected KameletDefinition newInstance() { + return new KameletDefinition(); + } + + @Override + protected boolean setProperty(KameletDefinition target, String propertyKey, + String propertyName, Node node) { + switch(propertyKey) { + case "inherit-error-handler": { + String val = asText(node); + target.setInheritErrorHandler(java.lang.Boolean.valueOf(val)); + break; + } + case "name": { + String val = asText(node); + target.setName(val); + break; + } + case "steps": { + for (ProcessorDefinition<?> definition: asFlatList(node, ProcessorDefinition.class)) { + target.addOutput(definition); + } + break; + } + default: { + return false; + } + } + return true; + } + } + + @YamlType( types = org.apache.camel.model.cloud.KubernetesServiceCallServiceDiscoveryConfiguration.class, order = org.apache.camel.dsl.yaml.common.YamlDeserializerResolver.ORDER_LOWEST - 1, nodes = "kubernetes-service-discovery", diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java index 0a1741b..513a1a9 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializersResolver.java @@ -167,6 +167,8 @@ public final class ModelDeserializersResolver implements YamlDeserializerResolve case "org.apache.camel.model.dataformat.JsonDataFormat": return new ModelDeserializers.JsonDataFormatDeserializer(); case "jsonpath": return new ModelDeserializers.JsonPathExpressionDeserializer(); case "org.apache.camel.model.language.JsonPathExpression": return new ModelDeserializers.JsonPathExpressionDeserializer(); + case "kamelet": return new ModelDeserializers.KameletDefinitionDeserializer(); + case "org.apache.camel.model.KameletDefinition": return new ModelDeserializers.KameletDefinitionDeserializer(); case "kubernetes-service-discovery": return new ModelDeserializers.KubernetesServiceCallServiceDiscoveryConfigurationDeserializer(); case "org.apache.camel.model.cloud.KubernetesServiceCallServiceDiscoveryConfiguration": return new ModelDeserializers.KubernetesServiceCallServiceDiscoveryConfigurationDeserializer(); case "lzf": return new ModelDeserializers.LZFDataFormatDeserializer(); diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json index 31b288c..5aa0842 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/camel-yaml-dsl.json @@ -62,6 +62,9 @@ "intercept-send-to-endpoint" : { "$ref" : "#/items/definitions/org.apache.camel.model.InterceptSendToEndpointDefinition" }, + "kamelet" : { + "$ref" : "#/items/definitions/org.apache.camel.model.KameletDefinition" + }, "load-balance" : { "$ref" : "#/items/definitions/org.apache.camel.model.LoadBalanceDefinition" }, @@ -1032,6 +1035,24 @@ } ], "required" : [ "uri" ] }, + "org.apache.camel.model.KameletDefinition" : { + "type" : "object", + "properties" : { + "inherit-error-handler" : { + "type" : "boolean" + }, + "name" : { + "type" : "string" + }, + "steps" : { + "type" : "array", + "items" : { + "$ref" : "#/items/definitions/org.apache.camel.model.ProcessorDefinition" + } + } + }, + "required" : [ "name" ] + }, "org.apache.camel.model.LoadBalanceDefinition" : { "type" : "object", "properties" : {
