This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelets-claus in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 2084696ce2da589a94d526721fdc4834da08ff10 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Jul 30 10:33:15 2020 +0200 kamelets: create a camel-kamelet component #375 --- .../camel/component/kamelet/KameletComponent.java | 87 ++++++++++++++++++++++ .../camel/component/kamelet/KameletEndpoint.java | 40 +++------- .../kamelet/KameletAddAfterCamelStartedTest.java | 76 +++++++++++++++++++ 3 files changed, 174 insertions(+), 29 deletions(-) diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 05a8abc..6088410 100644 --- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -16,13 +16,21 @@ */ package org.apache.camel.component.kamelet; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; +import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.StringHelper; @Component(Kamelet.SCHEME) @@ -35,6 +43,9 @@ public class KameletComponent extends DefaultComponent { super(context); } + private volatile RouteTemplateEventNotifier notifier; + private final List<KameletEndpoint> endpoints = new ArrayList<>(); + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { final String templateId = extractTemplateId(remaining); @@ -104,4 +115,80 @@ public class KameletComponent extends DefaultComponent { return properties; } + + @Override + protected void doInit() throws Exception { + super.doInit(); + + if (!getCamelContext().isRunAllowed()) { + // setup event listener which must be started to get triggered during initialization of camel context + notifier = new RouteTemplateEventNotifier(this); + ServiceHelper.startService(notifier); + getCamelContext().getManagementStrategy().addEventNotifier(notifier); + } + } + + @Override + protected void doStop() throws Exception { + if (notifier != null) { + ServiceHelper.stopService(notifier); + getCamelContext().getManagementStrategy().removeEventNotifier(notifier); + notifier = null; + } + super.doStop(); + } + + void onEndpointAdd(KameletEndpoint endpoint) { + if (notifier == null) { + try { + addRouteFromTemplate(endpoint); + } catch (Exception e) { + throw RuntimeCamelException.wrapRuntimeException(e); + } + } else { + // remember endpoints as we defer adding routes for them till later + this.endpoints.add(endpoint); + } + } + + void addRouteFromTemplate(KameletEndpoint endpoint) throws Exception { + ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class); + String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties()); + RouteDefinition def = context.getRouteDefinition(id); + if (!def.isPrepared()) { + List<RouteDefinition> list = new ArrayList<>(1); + list.add(def); + context.startRouteDefinitions(list); + } + } + + private static class RouteTemplateEventNotifier extends EventNotifierSupport { + + private final KameletComponent component; + + public RouteTemplateEventNotifier(KameletComponent component) { + this.component = component; + } + + @Override + public void notify(CamelEvent event) throws Exception { + for (KameletEndpoint endpoint : component.endpoints) { + component.addRouteFromTemplate(endpoint); + } + component.endpoints.clear(); + // we were only needed during initializing/starting up camel, so remove after use + ServiceHelper.stopService(this); + component.getCamelContext().getManagementStrategy().removeEventNotifier(this); + component.notifier = null; + } + + @Override + public boolean isEnabled(CamelEvent event) { + // we only care about this event during startup as its triggered when + // all route and route template definitions have been added and prepared + // so this allows us to hook into the right moment + return event instanceof CamelEvent.CamelContextInitializedEvent; + } + + } } diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java index 4300ea6..7609647 100644 --- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java +++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.kamelet; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import org.apache.camel.AsyncCallback; @@ -27,18 +25,12 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.event.CamelContextInitializedEvent; -import org.apache.camel.model.ModelCamelContext; -import org.apache.camel.model.RouteDefinition; -import org.apache.camel.spi.CamelEvent; -import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultEndpoint; -import org.apache.camel.support.EventNotifierSupport; import org.apache.camel.support.service.ServiceHelper; @UriEndpoint( @@ -74,6 +66,11 @@ public class KameletEndpoint extends DefaultEndpoint { this.kameletUri = "direct:" + routeId; } + @Override + public KameletComponent getComponent() { + return (KameletComponent) super.getComponent(); + } + public String getTemplateId() { return templateId; } @@ -82,6 +79,10 @@ public class KameletEndpoint extends DefaultEndpoint { return routeId; } + public Map<String, Object> getKameletProperties() { + return kameletProperties; + } + @Override public Producer createProducer() throws Exception { return new KameletProducer(); @@ -97,27 +98,8 @@ public class KameletEndpoint extends DefaultEndpoint { @Override protected void doInit() throws Exception { super.doInit(); - - // TODO: lets find a nicer way to do this - EventNotifier notifier = new EventNotifierSupport() { - @Override - public void notify(CamelEvent event) throws Exception { - String id = getCamelContext().addRouteFromTemplate(routeId, templateId, kameletProperties); - List<RouteDefinition> list = new ArrayList<>(1); - list.add(getCamelContext().adapt(ModelCamelContext.class).getRouteDefinition(id)); - getCamelContext().adapt(ModelCamelContext.class).startRouteDefinitions(list); - // no longer needed so we can remove ourselves - getCamelContext().getManagementStrategy().removeEventNotifier(this); - } - - @Override - public boolean isEnabled(CamelEvent event) { - return event instanceof CamelContextInitializedEvent; - } - }; - - ServiceHelper.startService(notifier); - getCamelContext().getManagementStrategy().addEventNotifier(notifier); + // only need to add during init phase + getComponent().onEndpointAdd(this); } // ********************************* diff --git a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java new file mode 100644 index 0000000..fdc9dc6 --- /dev/null +++ b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.assertj.core.api.Assertions.assertThat; + +public class KameletAddAfterCamelStartedTest { + private static final Logger LOGGER = LoggerFactory.getLogger(KameletAddAfterCamelStartedTest.class); + + @Test + public void test() throws Exception { + String body = UUID.randomUUID().toString(); + + CamelContext context = new DefaultCamelContext(); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + routeTemplate("setBody") + .templateParameter("bodyValue") + .from("direct:{{routeId}}") + .setBody().constant("{{bodyValue}}"); + } + }); + + /* + context.addRouteFromTemplate("setBody") + .routeId("test") + .parameter("routeId", "test") + .parameter("bodyValue", body) + .build(); + */ + + // start camel here and add routes with kamelts later + context.start(); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + // routes + from("direct:template") + .toF("kamelet:setBody/test?bodyValue=%s", body) + .to("log:1"); + } + }); + + assertThat( + context.createFluentProducerTemplate().to("direct:template").withBody("test").request(String.class) + ).isEqualTo(body); + + context.stop(); + } +}
