This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a861f8eeef6 CAMEL-18858: camel-kamelet - Create a copy of exchange
when kamelet is acting as source, so the exchange is faked to be created
directly by the consumer itself, so it originate from the user route, and make
the kamelet as it was just like any other regular Camel component. (#13310)
a861f8eeef6 is described below
commit a861f8eeef69cf5b144feef19982108e43b5a5bf
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 27 06:35:49 2024 +0100
CAMEL-18858: camel-kamelet - Create a copy of exchange when kamelet is
acting as source, so the exchange is faked to be created directly by the
consumer itself, so it originate from the user route, and make the kamelet as
it was just like any other regular Camel component. (#13310)
---
.../camel/component/kamelet/KameletProducer.java | 45 +++++++++++++---------
.../camel/component/kamelet/KameletReifier.java | 7 +++-
.../org/apache/camel/support/ExchangeHelper.java | 1 +
3 files changed, 33 insertions(+), 20 deletions(-)
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 1e2d2960647..68f6a734f77 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
+import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ExchangeHelper;
@@ -40,6 +41,7 @@ final class KameletProducer extends DefaultAsyncProducer
implements RouteIdAware
private final long timeout;
private final boolean sink;
private String routeId;
+ boolean registerKamelets;
public KameletProducer(KameletEndpoint endpoint, String key) {
super(endpoint);
@@ -51,23 +53,6 @@ final class KameletProducer extends DefaultAsyncProducer
implements RouteIdAware
this.sink =
getEndpoint().getEndpointKey().startsWith("kamelet://sink");
}
- @Override
- public void process(Exchange exchange) throws Exception {
- if (consumer == null || stateCounter != component.getStateCounter()) {
- stateCounter = component.getStateCounter();
- consumer = component.getConsumer(key, block, timeout);
- }
- if (consumer == null) {
- if (endpoint.isFailIfNoConsumers()) {
- throw new KameletConsumerNotAvailableException("No consumers
available on endpoint: " + endpoint, exchange);
- } else {
- LOG.debug("message ignored, no consumers available on
endpoint: {}", endpoint);
- }
- } else {
- consumer.getProcessor().process(exchange);
- }
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
@@ -111,8 +96,22 @@ final class KameletProducer extends DefaultAsyncProducer
implements RouteIdAware
}
}
}
- // kamelet producer that calls its kamelet consumer to process
the incoming exchange
- return consumer.getAsyncProcessor().process(exchange,
callback);
+ if (registerKamelets) {
+ // kamelets are first-class registered as route (as old
behavior)
+ return consumer.getAsyncProcessor().process(exchange,
callback);
+ } else {
+ // kamelet producer that calls its kamelet consumer to
process the incoming exchange
+ // create exchange copy to let a new lifecycle originate
from the calling route (not the kamelet route)
+ final Exchange copy =
ExchangeHelper.createCorrelatedCopy(exchange, true, true);
+ // fake copy as being created by the consumer
+
copy.getExchangeExtension().setFromEndpoint(consumer.getEndpoint());
+
copy.getExchangeExtension().setFromRouteId(consumer.getRouteId());
+ return consumer.getAsyncProcessor().process(copy, doneSync
-> {
+ // copy result back after processing is done
+ ExchangeHelper.copyResults(exchange, copy);
+ callback.done(doneSync);
+ });
+ }
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -140,4 +139,12 @@ final class KameletProducer extends DefaultAsyncProducer
implements RouteIdAware
return key;
}
+ @Override
+ protected void doInit() throws Exception {
+ ManagementStrategy ms =
getEndpoint().getCamelContext().getManagementStrategy();
+ if (ms != null && ms.getManagementAgent() != null) {
+ registerKamelets =
ms.getManagementAgent().getRegisterRoutesCreateByKamelet();
+ }
+ }
+
}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
index 6218ca99170..0252e6f92d8 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
@@ -20,6 +20,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.KameletDefinition;
import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.support.PluginHelper;
public class KameletReifier extends ProcessorReifier<KameletDefinition> {
@@ -34,6 +35,10 @@ public class KameletReifier extends
ProcessorReifier<KameletDefinition> {
// use an empty noop processor, as there should be a single
processor
processor = new NoopProcessor();
}
- return new KameletProcessor(camelContext,
parseString(definition.getName()), processor);
+ // wrap in uow
+ Processor target = new KameletProcessor(camelContext,
parseString(definition.getName()), processor);
+ target = PluginHelper.getInternalProcessorFactory(camelContext)
+ .addUnitOfWorkProcessorAdvice(camelContext, target, null);
+ return target;
}
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index a74eb830b2f..2cdf7071450 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -374,6 +374,7 @@ public final class ExchangeHelper {
resultExtension.setNotifyEvent(sourceExtension.isNotifyEvent());
resultExtension.setRedeliveryExhausted(sourceExtension.isRedeliveryExhausted());
resultExtension.setErrorHandlerHandled(sourceExtension.getErrorHandlerHandled());
+ resultExtension.setFailureHandled(sourceExtension.isFailureHandled());
result.setException(source.getException());
}