This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch poll-dyn
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bfe8c9afb9a00d21215496946685dac381016d26
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Feb 7 16:51:25 2025 +0100

    CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse 
endpoints during dynamic poll EIP
---
 .../org/apache/camel/model/pollEnrich.json         |  3 +-
 .../apache/camel/model/PollEnrichDefinition.java   | 22 ++++++++
 .../org/apache/camel/processor/PollEnricher.java   | 59 ++++++++++++++++++++--
 .../apache/camel/reifier/PollEnrichReifier.java    |  3 ++
 .../java/org/apache/camel/xml/in/ModelParser.java  |  1 +
 .../java/org/apache/camel/xml/out/ModelWriter.java |  1 +
 .../org/apache/camel/yaml/out/ModelWriter.java     |  1 +
 7 files changed, 85 insertions(+), 5 deletions(-)

diff --git 
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json
 
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json
index 85409de3ace..5644c4a8a54 100644
--- 
a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json
+++ 
b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json
@@ -24,7 +24,8 @@
     "timeout": { "index": 9, "kind": "attribute", "displayName": "Timeout", 
"group": "common", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "-1", "description": "Timeout in millis when polling from the 
external service. The timeout has influence about the poll enrich behavior. It 
basically operations in three different modes: negative value - Waits until a 
message is available and then ret [...]
     "cacheSize": { "index": 10, "kind": "attribute", "displayName": "Cache 
Size", "group": "advanced", "label": "advanced", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "description": "Sets the maximum size used by the 
org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers 
when uris are reused. Beware that when using dynamic endpoints then it affects 
how well the cache can be utiliz [...]
     "ignoreInvalidEndpoint": { "index": 11, "kind": "attribute", 
"displayName": "Ignore Invalid Endpoint", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Ignore the invalidate endpoint exception 
when try to create a producer with that endpoint" },
-    "autoStartComponents": { "index": 12, "kind": "attribute", "displayName": 
"Auto Start Components", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether to auto startup components when poll enricher is starting up." }
+    "allowOptimisedComponents": { "index": 12, "kind": "attribute", 
"displayName": "Allow Optimised Components", "group": "advanced", "label": 
"advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "description": "Whether to allow components to optimise 
if they are org.apache.camel.spi.SendDynamicAware ." },
+    "autoStartComponents": { "index": 13, "kind": "attribute", "displayName": 
"Auto Start Components", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether to auto startup components when poll enricher is starting up." }
   },
   "exchangeProperties": {
     "CamelToEndpoint": { "index": 0, "kind": "exchangeProperty", 
"displayName": "To Endpoint", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Endpoint URI where this Exchange is being sent to" }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index db1cb2ac36a..a13f9df7f69 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -65,6 +65,9 @@ public class PollEnrichDefinition extends ExpressionNode
     private String ignoreInvalidEndpoint;
     @XmlAttribute
     @Metadata(label = "advanced", defaultValue = "true", javaType = 
"java.lang.Boolean")
+    private String allowOptimisedComponents;
+    @XmlAttribute
+    @Metadata(label = "advanced", defaultValue = "true", javaType = 
"java.lang.Boolean")
     private String autoStartComponents;
 
     public PollEnrichDefinition() {
@@ -86,6 +89,7 @@ public class PollEnrichDefinition extends ExpressionNode
         this.timeout = source.timeout;
         this.cacheSize = source.cacheSize;
         this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint;
+        this.allowOptimisedComponents = source.allowOptimisedComponents;
         this.autoStartComponents = source.autoStartComponents;
     }
 
@@ -276,6 +280,16 @@ public class PollEnrichDefinition extends ExpressionNode
         return this;
     }
 
+    /**
+     * Whether to allow components to optimise if they are {@link 
org.apache.camel.spi.SendDynamicAware}.
+     *
+     * @return the builder
+     */
+    public PollEnrichDefinition allowOptimisedComponents(String 
allowOptimisedComponents) {
+        setAllowOptimisedComponents(allowOptimisedComponents);
+        return this;
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
@@ -374,6 +388,14 @@ public class PollEnrichDefinition extends ExpressionNode
         this.autoStartComponents = autoStartComponents;
     }
 
+    public String getAllowOptimisedComponents() {
+        return allowOptimisedComponents;
+    }
+
+    public void setAllowOptimisedComponents(String allowOptimisedComponents) {
+        this.allowOptimisedComponents = allowOptimisedComponents;
+    }
+
     @Override
     public PollEnrichDefinition copyDefinition() {
         return new PollEnrichDefinition(this);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 0d3fa37fd95..04736e0a3c2 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -36,6 +37,7 @@ import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.SendDynamicAware;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
@@ -44,6 +46,7 @@ import org.apache.camel.support.EventDrivenPollingConsumer;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.cache.DefaultConsumerCache;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,10 +67,11 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PollEnricher.class);
 
+    private SendDynamicAware dynamicAware;
+    private volatile String scheme;
     private CamelContext camelContext;
     private ConsumerCache consumerCache;
     private HeadersMapFactory headersMapFactory;
-    private volatile String scheme;
     private String id;
     private String routeId;
     private String variableReceive;
@@ -79,6 +83,7 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
     private int cacheSize;
     private boolean ignoreInvalidEndpoint;
     private boolean autoStartupComponents = true;
+    private boolean allowOptimisedComponents = true;
 
     /**
      * Creates a new {@link PollEnricher}.
@@ -135,6 +140,10 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
         this.routeId = routeId;
     }
 
+    public SendDynamicAware getDynamicAware() {
+        return dynamicAware;
+    }
+
     public String getUri() {
         return uri;
     }
@@ -215,6 +224,14 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
         this.autoStartupComponents = autoStartupComponents;
     }
 
+    public boolean isAllowOptimisedComponents() {
+        return allowOptimisedComponents;
+    }
+
+    public void setAllowOptimisedComponents(boolean allowOptimisedComponents) {
+        this.allowOptimisedComponents = allowOptimisedComponents;
+    }
+
     /**
      * Enriches the input data (<code>exchange</code>) by first obtaining 
additional data from an endpoint represented
      * by an endpoint <code>producer</code> and second by aggregating input 
data and additional data. Aggregation of
@@ -470,9 +487,43 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
             scheme = ExchangeHelper.resolveScheme(u);
         }
 
+        if (isAllowOptimisedComponents() && uri != null) {
+            try {
+                if (scheme != null) {
+                    // find out if the component can be optimised for 
send-dynamic
+                    SendDynamicAwareResolver resolver = new 
SendDynamicAwareResolver();
+                    dynamicAware = resolver.resolve(camelContext, scheme);
+                    if (dynamicAware == null) {
+                        // okay fallback and try with default component name
+                        Component comp = camelContext.getComponent(scheme, 
false, isAutoStartupComponents());
+                        if (comp != null) {
+                            String defaultScheme = comp.getDefaultName();
+                            if (!scheme.equals(defaultScheme)) {
+                                dynamicAware = resolver.resolve(camelContext, 
defaultScheme);
+                                dynamicAware.setScheme(scheme);
+                            }
+                        }
+                    }
+                    if (dynamicAware != null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Detected SendDynamicAware component: {} 
optimising poll: {}", scheme,
+                                    URISupport.sanitizeUri(uri));
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                // ignore
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Error creating optimised SendDynamicAwareResolver 
for uri: {} due to {}. This exception is ignored",
+                            URISupport.sanitizeUri(uri), e.getMessage(), e);
+                }
+            }
+        }
+
         headersMapFactory = 
camelContext.getCamelContextExtension().getHeadersMapFactory();
 
-        ServiceHelper.initService(consumerCache, aggregationStrategy);
+        ServiceHelper.initService(consumerCache, aggregationStrategy, 
dynamicAware);
     }
 
     @Override
@@ -482,12 +533,12 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
             camelContext.getComponent(scheme);
         }
 
-        ServiceHelper.startService(consumerCache, aggregationStrategy);
+        ServiceHelper.startService(consumerCache, aggregationStrategy, 
dynamicAware);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(aggregationStrategy, consumerCache);
+        ServiceHelper.stopService(aggregationStrategy, consumerCache, 
dynamicAware);
     }
 
     @Override
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
index 2fb2f72a281..10616641780 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
@@ -73,6 +73,9 @@ public class PollEnrichReifier extends 
ProcessorReifier<PollEnrichDefinition> {
         if (definition.getAutoStartComponents() != null) {
             
enricher.setAutoStartupComponents(parseBoolean(definition.getAutoStartComponents(),
 true));
         }
+        if (definition.getAllowOptimisedComponents() != null) {
+            
enricher.setAllowOptimisedComponents(parseBoolean(definition.getAllowOptimisedComponents(),
 true));
+        }
 
         return enricher;
     }
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index e0aa413639d..c0b72173aa4 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -672,6 +672,7 @@ public class ModelParser extends BaseParser {
                 case "aggregationStrategy": def.setAggregationStrategy(val); 
yield true;
                 case "aggregationStrategyMethodAllowNull": 
def.setAggregationStrategyMethodAllowNull(val); yield true;
                 case "aggregationStrategyMethodName": 
def.setAggregationStrategyMethodName(val); yield true;
+                case "allowOptimisedComponents": 
def.setAllowOptimisedComponents(val); yield true;
                 case "autoStartComponents": def.setAutoStartComponents(val); 
yield true;
                 case "cacheSize": def.setCacheSize(val); yield true;
                 case "ignoreInvalidEndpoint": 
def.setIgnoreInvalidEndpoint(val); yield true;
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
index 08d86619ceb..a0b81f396b9 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
@@ -1371,6 +1371,7 @@ public class ModelWriter extends BaseWriter {
         doWriteAttribute("aggregationStrategy", def.getAggregationStrategy(), 
null);
         doWriteAttribute("ignoreInvalidEndpoint", 
def.getIgnoreInvalidEndpoint(), null);
         doWriteAttribute("autoStartComponents", def.getAutoStartComponents(), 
"true");
+        doWriteAttribute("allowOptimisedComponents", 
def.getAllowOptimisedComponents(), "true");
         doWriteAttribute("aggregateOnException", 
def.getAggregateOnException(), null);
         doWriteAttribute("aggregationStrategyMethodName", 
def.getAggregationStrategyMethodName(), null);
         doWriteAttribute("timeout", def.getTimeout(), "-1");
diff --git 
a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
 
b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
index 5175a869386..89d590b00c7 100644
--- 
a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
+++ 
b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
@@ -1371,6 +1371,7 @@ public class ModelWriter extends BaseWriter {
         doWriteAttribute("aggregationStrategy", def.getAggregationStrategy(), 
null);
         doWriteAttribute("ignoreInvalidEndpoint", 
def.getIgnoreInvalidEndpoint(), null);
         doWriteAttribute("autoStartComponents", def.getAutoStartComponents(), 
"true");
+        doWriteAttribute("allowOptimisedComponents", 
def.getAllowOptimisedComponents(), "true");
         doWriteAttribute("aggregateOnException", 
def.getAggregateOnException(), null);
         doWriteAttribute("aggregationStrategyMethodName", 
def.getAggregationStrategyMethodName(), null);
         doWriteAttribute("timeout", def.getTimeout(), "-1");

Reply via email to