This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch poll-dyn
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a0832360edf354abe6774e3d60e5f0ffcdda89f9
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Feb 7 19:19:39 2025 +0100

    CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse 
endpoints during dynamic poll EIP
---
 .../org/apache/camel/processor/PollEnricher.java   | 101 ++++++++++++++++++---
 1 file changed, 90 insertions(+), 11 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 04736e0a3c2..e2aeb8f66b7 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -31,6 +31,8 @@ import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ExceptionHandler;
@@ -257,24 +259,58 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
 
         // use dynamic endpoint so calculate the endpoint to use
         Object recipient = null;
+        Processor preAwareProcessor = null;
+        Processor postAwareProcessor = null;
+        String staticUri = null;
         boolean prototype = cacheSize < 0;
         try {
-            // favour using expression to compute the recipient endpoint
-            recipient = expression != null ? expression.evaluate(exchange, 
Object.class) : uri;
-            recipient = prepareRecipient(exchange, recipient);
-            Endpoint existing = getExistingEndpoint(camelContext, recipient);
+            recipient = expression.evaluate(exchange, Object.class);
+            if (dynamicAware != null) {
+                // if its the same scheme as the pre-resolved dynamic aware 
then we can optimise to use it
+                String originalUri = uri;
+                String uri = resolveUri(exchange, recipient);
+                String scheme = resolveScheme(exchange, uri);
+                if (dynamicAware.getScheme().equals(scheme)) {
+                    SendDynamicAware.DynamicAwareEntry entry = 
dynamicAware.prepare(exchange, uri, originalUri);
+                    if (entry != null) {
+                        staticUri = dynamicAware.resolveStaticUri(exchange, 
entry);
+                        preAwareProcessor = 
dynamicAware.createPreProcessor(exchange, entry);
+                        postAwareProcessor = 
dynamicAware.createPostProcessor(exchange, entry);
+                        if (staticUri != null) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Optimising toD via SendDynamicAware 
component: {} to use static uri: {}", scheme,
+                                        URISupport.sanitizeUri(staticUri));
+                            }
+                        }
+                    }
+                }
+            }
+            Object targetRecipient = staticUri != null ? staticUri : recipient;
+            targetRecipient = prepareRecipient(exchange, targetRecipient);
+            if (targetRecipient == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Send dynamic evaluated as null so cannot send 
to any endpoint");
+                }
+                // no endpoint to send to, so ignore
+                callback.done(true);
+                return true;
+            }
+            Endpoint existing = getExistingEndpoint(exchange, targetRecipient);
             if (existing == null) {
-                endpoint = resolveEndpoint(camelContext, recipient, prototype);
+                endpoint = resolveEndpoint(exchange, targetRecipient, 
prototype);
             } else {
                 endpoint = existing;
                 // we have an existing endpoint then its not a prototype scope
                 prototype = false;
             }
+
             // acquire the consumer from the cache
             consumer = consumerCache.acquirePollingConsumer(endpoint);
         } catch (Exception e) {
             if (isIgnoreInvalidEndpoint()) {
-                LOG.debug("Endpoint uri is invalid: {}. This exception will be 
ignored.", recipient, e);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Endpoint uri is invalid: {}. This exception 
will be ignored.", recipient, e);
+                }
             } else {
                 exchange.setException(e);
             }
@@ -284,9 +320,15 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
 
         // grab the real delegate consumer that performs the actual polling
         final boolean bridgeErrorHandler = isBridgeErrorHandler(consumer);
+        final Processor preProcessor = preAwareProcessor;
+        final Processor postProcessor = postAwareProcessor;
 
         Exchange resourceExchange;
         try {
+            if (preProcessor != null) {
+                preProcessor.process(exchange);
+            }
+
             if (timeout < 0) {
                 LOG.debug("Consumer receive: {}", consumer);
                 resourceExchange = consumer.receive();
@@ -310,6 +352,13 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
             callback.done(true);
             return true;
         } finally {
+            try {
+                if (postProcessor != null) {
+                    postProcessor.process(exchange);
+                }
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
             // return the consumer back to the cache
             consumerCache.releasePollingConsumer(endpoint, consumer);
             // and stop prototype endpoints
@@ -427,14 +476,14 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
         return ProcessorHelper.prepareRecipient(exchange, recipient);
     }
 
-    protected static Endpoint getExistingEndpoint(CamelContext context, Object 
recipient) {
-        return ProcessorHelper.getExistingEndpoint(context, recipient);
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object 
recipient) {
+        return ProcessorHelper.getExistingEndpoint(exchange, recipient);
     }
 
-    protected static Endpoint resolveEndpoint(CamelContext camelContext, 
Object recipient, boolean prototype) {
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object 
recipient, boolean prototype) {
         return prototype
-                ? ExchangeHelper.resolvePrototypeEndpoint(camelContext, 
recipient)
-                : ExchangeHelper.resolveEndpoint(camelContext, recipient);
+                ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient)
+                : ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
@@ -460,6 +509,36 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
         return id;
     }
 
+    protected static String resolveUri(Exchange exchange, Object recipient) 
throws NoTypeConversionAvailableException {
+        if (recipient == null) {
+            return null;
+        }
+
+        String uri;
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String string) {
+            uri = string.trim();
+        } else if (recipient instanceof Endpoint endpoint) {
+            uri = endpoint.getEndpointKey();
+        } else {
+            // convert to a string type we can work with
+            uri = 
exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, 
exchange, recipient);
+        }
+
+        // in case path has property placeholders then try to let property 
component resolve those
+        try {
+            uri = 
EndpointHelper.resolveEndpointUriPropertyPlaceholders(exchange.getContext(), 
uri);
+        } catch (Exception e) {
+            throw new ResolveEndpointFailedException(uri, e);
+        }
+
+        return uri;
+    }
+
+    protected static String resolveScheme(Exchange exchange, String uri) {
+        return ExchangeHelper.resolveScheme(uri);
+    }
+
     @Override
     protected void doBuild() throws Exception {
         if (consumerCache == null) {

Reply via email to