This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch poll-dyn in repository https://gitbox.apache.org/repos/asf/camel.git
commit bfe8c9afb9a00d21215496946685dac381016d26 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Feb 7 16:51:25 2025 +0100 CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse endpoints during dynamic poll EIP --- .../org/apache/camel/model/pollEnrich.json | 3 +- .../apache/camel/model/PollEnrichDefinition.java | 22 ++++++++ .../org/apache/camel/processor/PollEnricher.java | 59 ++++++++++++++++++++-- .../apache/camel/reifier/PollEnrichReifier.java | 3 ++ .../java/org/apache/camel/xml/in/ModelParser.java | 1 + .../java/org/apache/camel/xml/out/ModelWriter.java | 1 + .../org/apache/camel/yaml/out/ModelWriter.java | 1 + 7 files changed, 85 insertions(+), 5 deletions(-) diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json index 85409de3ace..5644c4a8a54 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/pollEnrich.json @@ -24,7 +24,8 @@ "timeout": { "index": 9, "kind": "attribute", "displayName": "Timeout", "group": "common", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "-1", "description": "Timeout in millis when polling from the external service. The timeout has influence about the poll enrich behavior. It basically operations in three different modes: negative value - Waits until a message is available and then ret [...] "cacheSize": { "index": 10, "kind": "attribute", "displayName": "Cache Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utiliz [...] "ignoreInvalidEndpoint": { "index": 11, "kind": "attribute", "displayName": "Ignore Invalid Endpoint", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Ignore the invalidate endpoint exception when try to create a producer with that endpoint" }, - "autoStartComponents": { "index": 12, "kind": "attribute", "displayName": "Auto Start Components", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to auto startup components when poll enricher is starting up." } + "allowOptimisedComponents": { "index": 12, "kind": "attribute", "displayName": "Allow Optimised Components", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to allow components to optimise if they are org.apache.camel.spi.SendDynamicAware ." }, + "autoStartComponents": { "index": 13, "kind": "attribute", "displayName": "Auto Start Components", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to auto startup components when poll enricher is starting up." } }, "exchangeProperties": { "CamelToEndpoint": { "index": 0, "kind": "exchangeProperty", "displayName": "To Endpoint", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "autowired": false, "secret": false, "description": "Endpoint URI where this Exchange is being sent to" } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index db1cb2ac36a..a13f9df7f69 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -65,6 +65,9 @@ public class PollEnrichDefinition extends ExpressionNode private String ignoreInvalidEndpoint; @XmlAttribute @Metadata(label = "advanced", defaultValue = "true", javaType = "java.lang.Boolean") + private String allowOptimisedComponents; + @XmlAttribute + @Metadata(label = "advanced", defaultValue = "true", javaType = "java.lang.Boolean") private String autoStartComponents; public PollEnrichDefinition() { @@ -86,6 +89,7 @@ public class PollEnrichDefinition extends ExpressionNode this.timeout = source.timeout; this.cacheSize = source.cacheSize; this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint; + this.allowOptimisedComponents = source.allowOptimisedComponents; this.autoStartComponents = source.autoStartComponents; } @@ -276,6 +280,16 @@ public class PollEnrichDefinition extends ExpressionNode return this; } + /** + * Whether to allow components to optimise if they are {@link org.apache.camel.spi.SendDynamicAware}. + * + * @return the builder + */ + public PollEnrichDefinition allowOptimisedComponents(String allowOptimisedComponents) { + setAllowOptimisedComponents(allowOptimisedComponents); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -374,6 +388,14 @@ public class PollEnrichDefinition extends ExpressionNode this.autoStartComponents = autoStartComponents; } + public String getAllowOptimisedComponents() { + return allowOptimisedComponents; + } + + public void setAllowOptimisedComponents(String allowOptimisedComponents) { + this.allowOptimisedComponents = allowOptimisedComponents; + } + @Override public PollEnrichDefinition copyDefinition() { return new PollEnrichDefinition(this); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index 0d3fa37fd95..04736e0a3c2 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; +import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -36,6 +37,7 @@ import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.SendDynamicAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.support.DefaultConsumer; @@ -44,6 +46,7 @@ import org.apache.camel.support.EventDrivenPollingConsumer; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.cache.DefaultConsumerCache; import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,10 +67,11 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); + private SendDynamicAware dynamicAware; + private volatile String scheme; private CamelContext camelContext; private ConsumerCache consumerCache; private HeadersMapFactory headersMapFactory; - private volatile String scheme; private String id; private String routeId; private String variableReceive; @@ -79,6 +83,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout private int cacheSize; private boolean ignoreInvalidEndpoint; private boolean autoStartupComponents = true; + private boolean allowOptimisedComponents = true; /** * Creates a new {@link PollEnricher}. @@ -135,6 +140,10 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout this.routeId = routeId; } + public SendDynamicAware getDynamicAware() { + return dynamicAware; + } + public String getUri() { return uri; } @@ -215,6 +224,14 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout this.autoStartupComponents = autoStartupComponents; } + public boolean isAllowOptimisedComponents() { + return allowOptimisedComponents; + } + + public void setAllowOptimisedComponents(boolean allowOptimisedComponents) { + this.allowOptimisedComponents = allowOptimisedComponents; + } + /** * Enriches the input data (<code>exchange</code>) by first obtaining additional data from an endpoint represented * by an endpoint <code>producer</code> and second by aggregating input data and additional data. Aggregation of @@ -470,9 +487,43 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout scheme = ExchangeHelper.resolveScheme(u); } + if (isAllowOptimisedComponents() && uri != null) { + try { + if (scheme != null) { + // find out if the component can be optimised for send-dynamic + SendDynamicAwareResolver resolver = new SendDynamicAwareResolver(); + dynamicAware = resolver.resolve(camelContext, scheme); + if (dynamicAware == null) { + // okay fallback and try with default component name + Component comp = camelContext.getComponent(scheme, false, isAutoStartupComponents()); + if (comp != null) { + String defaultScheme = comp.getDefaultName(); + if (!scheme.equals(defaultScheme)) { + dynamicAware = resolver.resolve(camelContext, defaultScheme); + dynamicAware.setScheme(scheme); + } + } + } + if (dynamicAware != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Detected SendDynamicAware component: {} optimising poll: {}", scheme, + URISupport.sanitizeUri(uri)); + } + } + } + } catch (Exception e) { + // ignore + if (LOG.isDebugEnabled()) { + LOG.debug( + "Error creating optimised SendDynamicAwareResolver for uri: {} due to {}. This exception is ignored", + URISupport.sanitizeUri(uri), e.getMessage(), e); + } + } + } + headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory(); - ServiceHelper.initService(consumerCache, aggregationStrategy); + ServiceHelper.initService(consumerCache, aggregationStrategy, dynamicAware); } @Override @@ -482,12 +533,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout camelContext.getComponent(scheme); } - ServiceHelper.startService(consumerCache, aggregationStrategy); + ServiceHelper.startService(consumerCache, aggregationStrategy, dynamicAware); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(aggregationStrategy, consumerCache); + ServiceHelper.stopService(aggregationStrategy, consumerCache, dynamicAware); } @Override diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java index 2fb2f72a281..10616641780 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java @@ -73,6 +73,9 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> { if (definition.getAutoStartComponents() != null) { enricher.setAutoStartupComponents(parseBoolean(definition.getAutoStartComponents(), true)); } + if (definition.getAllowOptimisedComponents() != null) { + enricher.setAllowOptimisedComponents(parseBoolean(definition.getAllowOptimisedComponents(), true)); + } return enricher; } diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java index e0aa413639d..c0b72173aa4 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java @@ -672,6 +672,7 @@ public class ModelParser extends BaseParser { case "aggregationStrategy": def.setAggregationStrategy(val); yield true; case "aggregationStrategyMethodAllowNull": def.setAggregationStrategyMethodAllowNull(val); yield true; case "aggregationStrategyMethodName": def.setAggregationStrategyMethodName(val); yield true; + case "allowOptimisedComponents": def.setAllowOptimisedComponents(val); yield true; case "autoStartComponents": def.setAutoStartComponents(val); yield true; case "cacheSize": def.setCacheSize(val); yield true; case "ignoreInvalidEndpoint": def.setIgnoreInvalidEndpoint(val); yield true; diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java index 08d86619ceb..a0b81f396b9 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java @@ -1371,6 +1371,7 @@ public class ModelWriter extends BaseWriter { doWriteAttribute("aggregationStrategy", def.getAggregationStrategy(), null); doWriteAttribute("ignoreInvalidEndpoint", def.getIgnoreInvalidEndpoint(), null); doWriteAttribute("autoStartComponents", def.getAutoStartComponents(), "true"); + doWriteAttribute("allowOptimisedComponents", def.getAllowOptimisedComponents(), "true"); doWriteAttribute("aggregateOnException", def.getAggregateOnException(), null); doWriteAttribute("aggregationStrategyMethodName", def.getAggregationStrategyMethodName(), null); doWriteAttribute("timeout", def.getTimeout(), "-1"); diff --git a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java index 5175a869386..89d590b00c7 100644 --- a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java +++ b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java @@ -1371,6 +1371,7 @@ public class ModelWriter extends BaseWriter { doWriteAttribute("aggregationStrategy", def.getAggregationStrategy(), null); doWriteAttribute("ignoreInvalidEndpoint", def.getIgnoreInvalidEndpoint(), null); doWriteAttribute("autoStartComponents", def.getAutoStartComponents(), "true"); + doWriteAttribute("allowOptimisedComponents", def.getAllowOptimisedComponents(), "true"); doWriteAttribute("aggregateOnException", def.getAggregateOnException(), null); doWriteAttribute("aggregationStrategyMethodName", def.getAggregationStrategyMethodName(), null); doWriteAttribute("timeout", def.getTimeout(), "-1");
