This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelet-eip in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2a03c829aa84303f89beb906ed9e3966fb202e28 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Apr 13 13:53:46 2021 +0200 CAMEL-16493: Kamelet EIP --- .../camel/component/kamelet/KameletComponent.java | 22 +++++++-------- .../camel/component/kamelet/KameletProcessor.java | 4 +-- .../camel/component/kamelet/KameletProducer.java | 31 +++++++++++++++++----- .../component/kamelet/KameletAggregateTest.java | 3 ++- .../camel/component/kamelet/KameletBasicTest.java | 2 -- .../component/kamelet/KameletConsumeOnlyTest.java | 2 -- .../component/kamelet/KameletEipAggregateTest.java | 2 -- 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 7dbb94e..7f3aaeb 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -51,12 +51,12 @@ import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID; public class KameletComponent extends DefaultComponent { private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class); - // active consumers - private final Map<String, KameletConsumer> consumers = new HashMap<>(); private final LifecycleHandler lifecycleHandler = new LifecycleHandler(); - // TODO: - private final Map<String, Processor> callbacks = new ConcurrentHashMap<>(); + // active consumers + private final Map<String, KameletConsumer> consumers = new HashMap<>(); + // active kamelet EIPs + private final Map<String, Processor> kameletEips = new ConcurrentHashMap<>(); // counter that is used for producers to keep track if any consumer was added/removed since they last checked // this is used for optimization to avoid each producer to get consumer for each message processed @@ -77,16 +77,16 @@ public class KameletComponent extends DefaultComponent { public KameletComponent() { } - public void pushCallback(String key, Processor callback) { - callbacks.put(key, callback); + public void addKameletEip(String key, Processor callback) { + kameletEips.put(key, callback); } - public Processor popCallback(String key) { - return callbacks.remove(key); + public Processor removeKameletEip(String key) { + return kameletEips.remove(key); } - public Processor getCallback(String key) { - return callbacks.get(key); + public Processor getKameletEip(String key) { + return kameletEips.get(key); } @Override @@ -327,7 +327,7 @@ public class KameletComponent extends DefaultComponent { ServiceHelper.stopAndShutdownService(consumers); consumers.clear(); - callbacks.clear(); + kameletEips.clear(); super.doShutdown(); } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java index 699d108..70e2f2d 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java @@ -124,7 +124,7 @@ public class KameletProcessor extends AsyncProcessorSupport } ServiceHelper.buildService(processor, producer); - component.pushCallback(producer.getKey(), processor); + component.addKameletEip(producer.getKey(), processor); } @Override @@ -146,6 +146,6 @@ public class KameletProcessor extends AsyncProcessorSupport protected void doShutdown() throws Exception { ServiceHelper.stopAndShutdownServices(processor, producer); - component.popCallback(producer.getKey()); + component.removeKameletEip(producer.getKey()); } } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java index d76937e..8762f85 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java @@ -19,7 +19,9 @@ package org.apache.camel.component.kamelet; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.Route; import org.apache.camel.support.DefaultAsyncProducer; +import org.apache.camel.support.ExchangeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,16 +83,33 @@ final class KameletProducer extends DefaultAsyncProducer { callback.done(true); return true; } else { + // the kamelet producer has multiple purposes at this point + // it is capable of linking the kamelet component with the kamelet EIP + // to ensure the EIP and the component are wired together with their + // kamelet:source and kamelet:sink endpoints so when calling the sink + // then we continue processing the EIP child processors + + // if no EIP is in use, then its _just_ a regular camel component + // with producer and consumers linked together via the component + if (sink) { - // need to execute the callback from the waiting - AsyncProcessor parked = (AsyncProcessor) component.getCallback(key); - if (parked != null) { - return parked.process(exchange, callback); + // when calling a kamelet:sink then lookup any waiting processor + // from the Kamelet EIP to continue routing + AsyncProcessor eip = (AsyncProcessor) component.getKameletEip(key); + if (eip != null) { + return eip.process(exchange, callback); } else { - callback.done(true); - return true; + // if the current route is from a kamelet source then we should + // break out as otherwise we would end up calling ourselves again + Route route = ExchangeHelper.getRoute(exchange); + boolean source = route != null && route.getConsumer() instanceof KameletConsumer; + if (source) { + callback.done(true); + return true; + } } } + // kamelet producer that calls its kamelet consumer to process the incoming exchange return consumer.getAsyncProcessor().process(exchange, callback); } } catch (Exception e) { diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java index cb27980..4f23055 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java @@ -24,7 +24,7 @@ import org.apache.http.annotation.Obsolete; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -@Disabled("Should use Kamelet EIP") +@Disabled public class KameletAggregateTest extends CamelTestSupport { @Test @@ -62,6 +62,7 @@ public class KameletAggregateTest extends CamelTestSupport { .end(); from("direct:start") + // this is not possible, you must use kamelet EIP instead .to("kamelet:my-aggregate?count=5") .to("log:info") .to("mock:result"); diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java index 52f1239..0920161 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java @@ -22,7 +22,6 @@ import org.apache.camel.Exchange; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit5.CamelTestSupport; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -38,7 +37,6 @@ public class KameletBasicTest extends CamelTestSupport { } @Test - @Disabled public void canConsumeFromKamelet() { assertThat( consumer.receiveBody("kamelet:tick", Integer.class)).isEqualTo(1); diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java index 15c5d2c..065e7bb 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java @@ -20,12 +20,10 @@ import org.apache.camel.Exchange; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit5.CamelTestSupport; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -@Disabled public class KameletConsumeOnlyTest extends CamelTestSupport { @Test diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java index 2aa911e..ae1eb8f 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java @@ -20,7 +20,6 @@ import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.AggregationStrategies; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit5.CamelTestSupport; -import org.apache.http.annotation.Obsolete; import org.junit.jupiter.api.Test; public class KameletEipAggregateTest extends CamelTestSupport { @@ -44,7 +43,6 @@ public class KameletEipAggregateTest extends CamelTestSupport { // // ********************************************** - @Obsolete protected RoutesBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override
