This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch poll-dyn in repository https://gitbox.apache.org/repos/asf/camel.git
commit a0832360edf354abe6774e3d60e5f0ffcdda89f9 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Feb 7 19:19:39 2025 +0100 CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse endpoints during dynamic poll EIP --- .../org/apache/camel/processor/PollEnricher.java | 101 ++++++++++++++++++--- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index 04736e0a3c2..e2aeb8f66b7 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -31,6 +31,8 @@ import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.PollingConsumer; +import org.apache.camel.Processor; +import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ExceptionHandler; @@ -257,24 +259,58 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // use dynamic endpoint so calculate the endpoint to use Object recipient = null; + Processor preAwareProcessor = null; + Processor postAwareProcessor = null; + String staticUri = null; boolean prototype = cacheSize < 0; try { - // favour using expression to compute the recipient endpoint - recipient = expression != null ? expression.evaluate(exchange, Object.class) : uri; - recipient = prepareRecipient(exchange, recipient); - Endpoint existing = getExistingEndpoint(camelContext, recipient); + recipient = expression.evaluate(exchange, Object.class); + if (dynamicAware != null) { + // if its the same scheme as the pre-resolved dynamic aware then we can optimise to use it + String originalUri = uri; + String uri = resolveUri(exchange, recipient); + String scheme = resolveScheme(exchange, uri); + if (dynamicAware.getScheme().equals(scheme)) { + SendDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri, originalUri); + if (entry != null) { + staticUri = dynamicAware.resolveStaticUri(exchange, entry); + preAwareProcessor = dynamicAware.createPreProcessor(exchange, entry); + postAwareProcessor = dynamicAware.createPostProcessor(exchange, entry); + if (staticUri != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Optimising toD via SendDynamicAware component: {} to use static uri: {}", scheme, + URISupport.sanitizeUri(staticUri)); + } + } + } + } + } + Object targetRecipient = staticUri != null ? staticUri : recipient; + targetRecipient = prepareRecipient(exchange, targetRecipient); + if (targetRecipient == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); + } + // no endpoint to send to, so ignore + callback.done(true); + return true; + } + Endpoint existing = getExistingEndpoint(exchange, targetRecipient); if (existing == null) { - endpoint = resolveEndpoint(camelContext, recipient, prototype); + endpoint = resolveEndpoint(exchange, targetRecipient, prototype); } else { endpoint = existing; // we have an existing endpoint then its not a prototype scope prototype = false; } + // acquire the consumer from the cache consumer = consumerCache.acquirePollingConsumer(endpoint); } catch (Exception e) { if (isIgnoreInvalidEndpoint()) { - LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e); + if (LOG.isDebugEnabled()) { + LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e); + } } else { exchange.setException(e); } @@ -284,9 +320,15 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // grab the real delegate consumer that performs the actual polling final boolean bridgeErrorHandler = isBridgeErrorHandler(consumer); + final Processor preProcessor = preAwareProcessor; + final Processor postProcessor = postAwareProcessor; Exchange resourceExchange; try { + if (preProcessor != null) { + preProcessor.process(exchange); + } + if (timeout < 0) { LOG.debug("Consumer receive: {}", consumer); resourceExchange = consumer.receive(); @@ -310,6 +352,13 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout callback.done(true); return true; } finally { + try { + if (postProcessor != null) { + postProcessor.process(exchange); + } + } catch (Exception e) { + exchange.setException(e); + } // return the consumer back to the cache consumerCache.releasePollingConsumer(endpoint, consumer); // and stop prototype endpoints @@ -427,14 +476,14 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout return ProcessorHelper.prepareRecipient(exchange, recipient); } - protected static Endpoint getExistingEndpoint(CamelContext context, Object recipient) { - return ProcessorHelper.getExistingEndpoint(context, recipient); + protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) { + return ProcessorHelper.getExistingEndpoint(exchange, recipient); } - protected static Endpoint resolveEndpoint(CamelContext camelContext, Object recipient, boolean prototype) { + protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) { return prototype - ? ExchangeHelper.resolvePrototypeEndpoint(camelContext, recipient) - : ExchangeHelper.resolveEndpoint(camelContext, recipient); + ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) + : ExchangeHelper.resolveEndpoint(exchange, recipient); } /** @@ -460,6 +509,36 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout return id; } + protected static String resolveUri(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { + if (recipient == null) { + return null; + } + + String uri; + // trim strings as end users might have added spaces between separators + if (recipient instanceof String string) { + uri = string.trim(); + } else if (recipient instanceof Endpoint endpoint) { + uri = endpoint.getEndpointKey(); + } else { + // convert to a string type we can work with + uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); + } + + // in case path has property placeholders then try to let property component resolve those + try { + uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(exchange.getContext(), uri); + } catch (Exception e) { + throw new ResolveEndpointFailedException(uri, e); + } + + return uri; + } + + protected static String resolveScheme(Exchange exchange, String uri) { + return ExchangeHelper.resolveScheme(uri); + } + @Override protected void doBuild() throws Exception { if (consumerCache == null) {
