This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelets-claus in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 18f15c8ae87ce3f9101dfacad958dd5efcc416e9 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jul 29 16:54:00 2020 +0200 kamelets: polished #375 --- camel-kamelet/pom.xml | 2 +- .../camel/component/kamelet/KameletEndpoint.java | 35 ++++++++++------------ 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/camel-kamelet/pom.xml b/camel-kamelet/pom.xml index 33e09c3..7083b87 100644 --- a/camel-kamelet/pom.xml +++ b/camel-kamelet/pom.xml @@ -43,7 +43,7 @@ <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-core-engine</artifactId> + <artifactId>camel-support</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java index 22cd543..d3f0eac 100644 --- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java +++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java @@ -18,6 +18,8 @@ package org.apache.camel.component.kamelet; import java.util.Map; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProducer; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -26,9 +28,9 @@ import org.apache.camel.Producer; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultEndpoint; -import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.service.ServiceHelper; @UriEndpoint( @@ -81,7 +83,6 @@ public class KameletEndpoint extends DefaultEndpoint { public Consumer createConsumer(Processor processor) throws Exception { Consumer answer = new KemeletConsumer(processor); configureConsumer(answer); - return answer; } @@ -117,52 +118,46 @@ public class KameletEndpoint extends DefaultEndpoint { endpoint = getCamelContext().getEndpoint(kameletUri); consumer = endpoint.createConsumer(getProcessor()); - ServiceHelper.startService(endpoint); - ServiceHelper.startService(consumer); - + ServiceHelper.startService(endpoint, consumer); super.doStart(); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(endpoint); - ServiceHelper.stopService(consumer); - + ServiceHelper.stopService(consumer, endpoint); super.doStop(); } } - private class KameletProducer extends DefaultProducer { + private class KameletProducer extends DefaultAsyncProducer { private volatile Endpoint endpoint; - private volatile Producer producer; + private volatile AsyncProducer producer; public KameletProducer() { super(KameletEndpoint.this); } @Override - public void process(Exchange exchange) throws Exception { + public boolean process(Exchange exchange, AsyncCallback callback) { if (producer != null) { - producer.process(exchange); + return producer.process(exchange, callback); + } else { + callback.done(true); + return true; } } @Override protected void doStart() throws Exception { endpoint = getCamelContext().getEndpoint(kameletUri); - producer = endpoint.createProducer(); - - ServiceHelper.startService(endpoint); - ServiceHelper.startService(producer); - + producer = endpoint.createAsyncProducer(); + ServiceHelper.startService(endpoint, producer); super.doStart(); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(endpoint); - ServiceHelper.stopService(producer); - + ServiceHelper.stopService(producer, endpoint); super.doStop(); } }
