This is an automated email from the ASF dual-hosted git repository. oscerd pushed a commit to branch fix/CAMEL-23630 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 58d3d5d16daffd800025d9bbf6272cd17bfc2910 Author: Andrea Cosentino <[email protected]> AuthorDate: Tue Jun 9 10:40:57 2026 +0200 CAMEL-23630: add HeaderFilterStrategy to camel-dapr Signed-off-by: Andrea Cosentino <[email protected]> --- .../org/apache/camel/catalog/components/dapr.json | 6 +- .../component/dapr/DaprComponentConfigurer.java | 6 ++ .../component/dapr/DaprEndpointConfigurer.java | 6 ++ .../component/dapr/DaprEndpointUriFactory.java | 3 +- .../org/apache/camel/component/dapr/dapr.json | 6 +- .../apache/camel/component/dapr/DaprComponent.java | 18 +++- .../apache/camel/component/dapr/DaprEndpoint.java | 23 +++++- .../component/dapr/DaprHeaderFilterStrategy.java | 31 +++++++ .../dapr/consumer/DaprPubSubConsumer.java | 47 ++++++++--- .../dapr/consumer/DaprPubSubConsumerTest.java | 29 ++++++- .../component/dsl/DaprComponentBuilderFactory.java | 19 +++++ .../endpoint/dsl/DaprEndpointBuilderFactory.java | 96 ++++++++++++++++++++++ 12 files changed, 270 insertions(+), 20 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/dapr.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/dapr.json index 3c4eb4d58ee7..0d00adc9d053 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/dapr.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/dapr.json @@ -62,7 +62,8 @@ "workflowOperation": { "index": 35, "kind": "property", "displayName": "Workflow Operation", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.dapr.WorkflowOperation", "enum": [ "scheduleNew", "terminate", "purge", "suspend", "resume", "state", "waitForInstanceStart", "waitForInstanceCompletion", "raiseEvent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "scheduleNew", "configurationClas [...] "workflowStartTime": { "index": 36, "kind": "property", "displayName": "Workflow Start Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.Instant", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The start time of the new workflow" }, "workflowVersion": { "index": 37, "kind": "property", "displayName": "Workflow Version", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The version of the workflow to start" }, - "autowiredEnabled": { "index": 38, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...] + "autowiredEnabled": { "index": 38, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...] + "headerFilterStrategy": { "index": 39, "kind": "property", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." } }, "headers": { "CamelDaprServiceToInvoke": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Target service to invoke. Can be a Dapr App ID, a named HTTPEndpoint, or a FQDN\/public URL", "constantName": "org.apache.camel.component.dapr.DaprConstants#SERVICE_TO_INVOKE" }, @@ -164,6 +165,7 @@ "workflowOperation": { "index": 36, "kind": "parameter", "displayName": "Workflow Operation", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.dapr.WorkflowOperation", "enum": [ "scheduleNew", "terminate", "purge", "suspend", "resume", "state", "waitForInstanceStart", "waitForInstanceCompletion", "raiseEvent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "scheduleNew", "configurationCla [...] "workflowStartTime": { "index": 37, "kind": "parameter", "displayName": "Workflow Start Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.Instant", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The start time of the new workflow" }, "workflowVersion": { "index": 38, "kind": "parameter", "displayName": "Workflow Version", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The version of the workflow to start" }, - "lazyStartProducer": { "index": 39, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "lazyStartProducer": { "index": 39, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "headerFilterStrategy": { "index": 40, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." } } } diff --git a/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprComponentConfigurer.java b/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprComponentConfigurer.java index 71ae7f841ee1..c40e35b61d3a 100644 --- a/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprComponentConfigurer.java +++ b/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprComponentConfigurer.java @@ -56,6 +56,8 @@ public class DaprComponentConfigurer extends PropertyConfigurerSupport implement case "expiryInSeconds": getOrCreateConfiguration(target).setExpiryInSeconds(property(camelContext, java.lang.Integer.class, value)); return true; case "getworkflowio": case "getWorkflowIO": getOrCreateConfiguration(target).setGetWorkflowIO(property(camelContext, boolean.class, value)); return true; + case "headerfilterstrategy": + case "headerFilterStrategy": target.setHeaderFilterStrategy(property(camelContext, org.apache.camel.spi.HeaderFilterStrategy.class, value)); return true; case "httpextension": case "httpExtension": getOrCreateConfiguration(target).setHttpExtension(property(camelContext, io.dapr.client.domain.HttpExtension.class, value)); return true; case "key": getOrCreateConfiguration(target).setKey(property(camelContext, java.lang.String.class, value)); return true; @@ -137,6 +139,8 @@ public class DaprComponentConfigurer extends PropertyConfigurerSupport implement case "expiryInSeconds": return java.lang.Integer.class; case "getworkflowio": case "getWorkflowIO": return boolean.class; + case "headerfilterstrategy": + case "headerFilterStrategy": return org.apache.camel.spi.HeaderFilterStrategy.class; case "httpextension": case "httpExtension": return io.dapr.client.domain.HttpExtension.class; case "key": return java.lang.String.class; @@ -214,6 +218,8 @@ public class DaprComponentConfigurer extends PropertyConfigurerSupport implement case "expiryInSeconds": return getOrCreateConfiguration(target).getExpiryInSeconds(); case "getworkflowio": case "getWorkflowIO": return getOrCreateConfiguration(target).isGetWorkflowIO(); + case "headerfilterstrategy": + case "headerFilterStrategy": return target.getHeaderFilterStrategy(); case "httpextension": case "httpExtension": return getOrCreateConfiguration(target).getHttpExtension(); case "key": return getOrCreateConfiguration(target).getKey(); diff --git a/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointConfigurer.java b/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointConfigurer.java index d300547f6315..d273c56c8742 100644 --- a/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointConfigurer.java +++ b/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointConfigurer.java @@ -50,6 +50,8 @@ public class DaprEndpointConfigurer extends PropertyConfigurerSupport implements case "expiryInSeconds": target.getConfiguration().setExpiryInSeconds(property(camelContext, java.lang.Integer.class, value)); return true; case "getworkflowio": case "getWorkflowIO": target.getConfiguration().setGetWorkflowIO(property(camelContext, boolean.class, value)); return true; + case "headerfilterstrategy": + case "headerFilterStrategy": target.setHeaderFilterStrategy(property(camelContext, org.apache.camel.spi.HeaderFilterStrategy.class, value)); return true; case "httpextension": case "httpExtension": target.getConfiguration().setHttpExtension(property(camelContext, io.dapr.client.domain.HttpExtension.class, value)); return true; case "key": target.getConfiguration().setKey(property(camelContext, java.lang.String.class, value)); return true; @@ -132,6 +134,8 @@ public class DaprEndpointConfigurer extends PropertyConfigurerSupport implements case "expiryInSeconds": return java.lang.Integer.class; case "getworkflowio": case "getWorkflowIO": return boolean.class; + case "headerfilterstrategy": + case "headerFilterStrategy": return org.apache.camel.spi.HeaderFilterStrategy.class; case "httpextension": case "httpExtension": return io.dapr.client.domain.HttpExtension.class; case "key": return java.lang.String.class; @@ -210,6 +214,8 @@ public class DaprEndpointConfigurer extends PropertyConfigurerSupport implements case "expiryInSeconds": return target.getConfiguration().getExpiryInSeconds(); case "getworkflowio": case "getWorkflowIO": return target.getConfiguration().isGetWorkflowIO(); + case "headerfilterstrategy": + case "headerFilterStrategy": return target.getHeaderFilterStrategy(); case "httpextension": case "httpExtension": return target.getConfiguration().getHttpExtension(); case "key": return target.getConfiguration().getKey(); diff --git a/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointUriFactory.java b/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointUriFactory.java index a47043def60f..678143c8cdf3 100644 --- a/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointUriFactory.java +++ b/components/camel-dapr/src/generated/java/org/apache/camel/component/dapr/DaprEndpointUriFactory.java @@ -24,7 +24,7 @@ public class DaprEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> ENDPOINT_IDENTITY_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(40); + Set<String> props = new HashSet<>(41); props.add("bindingName"); props.add("bindingOperation"); props.add("bridgeErrorHandler"); @@ -40,6 +40,7 @@ public class DaprEndpointUriFactory extends org.apache.camel.support.component.E props.add("exchangePattern"); props.add("expiryInSeconds"); props.add("getWorkflowIO"); + props.add("headerFilterStrategy"); props.add("httpExtension"); props.add("key"); props.add("lazyStartProducer"); diff --git a/components/camel-dapr/src/generated/resources/META-INF/org/apache/camel/component/dapr/dapr.json b/components/camel-dapr/src/generated/resources/META-INF/org/apache/camel/component/dapr/dapr.json index 3c4eb4d58ee7..0d00adc9d053 100644 --- a/components/camel-dapr/src/generated/resources/META-INF/org/apache/camel/component/dapr/dapr.json +++ b/components/camel-dapr/src/generated/resources/META-INF/org/apache/camel/component/dapr/dapr.json @@ -62,7 +62,8 @@ "workflowOperation": { "index": 35, "kind": "property", "displayName": "Workflow Operation", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.dapr.WorkflowOperation", "enum": [ "scheduleNew", "terminate", "purge", "suspend", "resume", "state", "waitForInstanceStart", "waitForInstanceCompletion", "raiseEvent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "scheduleNew", "configurationClas [...] "workflowStartTime": { "index": 36, "kind": "property", "displayName": "Workflow Start Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.Instant", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The start time of the new workflow" }, "workflowVersion": { "index": 37, "kind": "property", "displayName": "Workflow Version", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The version of the workflow to start" }, - "autowiredEnabled": { "index": 38, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...] + "autowiredEnabled": { "index": 38, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching [...] + "headerFilterStrategy": { "index": 39, "kind": "property", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." } }, "headers": { "CamelDaprServiceToInvoke": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Target service to invoke. Can be a Dapr App ID, a named HTTPEndpoint, or a FQDN\/public URL", "constantName": "org.apache.camel.component.dapr.DaprConstants#SERVICE_TO_INVOKE" }, @@ -164,6 +165,7 @@ "workflowOperation": { "index": 36, "kind": "parameter", "displayName": "Workflow Operation", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.dapr.WorkflowOperation", "enum": [ "scheduleNew", "terminate", "purge", "suspend", "resume", "state", "waitForInstanceStart", "waitForInstanceCompletion", "raiseEvent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "scheduleNew", "configurationCla [...] "workflowStartTime": { "index": 37, "kind": "parameter", "displayName": "Workflow Start Time", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.time.Instant", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The start time of the new workflow" }, "workflowVersion": { "index": 38, "kind": "parameter", "displayName": "Workflow Version", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.dapr.DaprConfiguration", "configurationField": "configuration", "description": "The version of the workflow to start" }, - "lazyStartProducer": { "index": 39, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "lazyStartProducer": { "index": 39, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "headerFilterStrategy": { "index": 40, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." } } } diff --git a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprComponent.java b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprComponent.java index 81ee92d12815..bb75babcd452 100644 --- a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprComponent.java +++ b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprComponent.java @@ -19,6 +19,7 @@ package org.apache.camel.component.dapr; import java.util.Map; import org.apache.camel.Endpoint; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; @@ -28,6 +29,9 @@ public class DaprComponent extends DefaultComponent { @Metadata private DaprConfiguration configuration = new DaprConfiguration(); + @Metadata(label = "advanced", + description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.") + private HeaderFilterStrategy headerFilterStrategy; protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { if (remaining == null || remaining.isBlank()) { @@ -37,7 +41,8 @@ public class DaprComponent extends DefaultComponent { final DaprConfiguration config = this.configuration != null ? this.configuration.copy() : new DaprConfiguration(); config.setOperation(DaprOperation.valueOf(remaining)); - Endpoint endpoint = new DaprEndpoint(uri, this, config); + DaprEndpoint endpoint = new DaprEndpoint(uri, this, config); + endpoint.setHeaderFilterStrategy(headerFilterStrategy); setProperties(endpoint, parameters); return endpoint; @@ -53,4 +58,15 @@ public class DaprComponent extends DefaultComponent { public void setConfiguration(DaprConfiguration configuration) { this.configuration = configuration; } + + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + + /** + * To use a custom {@link org.apache.camel.spi.HeaderFilterStrategy} to filter header to and from Camel message. + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } } diff --git a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprEndpoint.java b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprEndpoint.java index 28ef1ce0d384..2b658452f32e 100644 --- a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprEndpoint.java +++ b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprEndpoint.java @@ -26,6 +26,8 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.dapr.consumer.DaprConfigurationConsumer; import org.apache.camel.component.dapr.consumer.DaprPubSubConsumer; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.support.DefaultEndpoint; @@ -35,10 +37,13 @@ import org.apache.camel.support.DefaultEndpoint; */ @UriEndpoint(firstVersion = "4.12.0", scheme = "dapr", title = "Dapr", syntax = "dapr:operation", category = { Category.CLOUD, Category.SAAS }, headersClass = DaprConstants.class) -public class DaprEndpoint extends DefaultEndpoint { +public class DaprEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { @UriParam private DaprConfiguration configuration; + @UriParam(label = "advanced", + description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.") + private HeaderFilterStrategy headerFilterStrategy; private DaprClient client; private DaprPreviewClient previewClient; private DaprWorkflowClient workflowClient; @@ -89,6 +94,22 @@ public class DaprEndpoint extends DefaultEndpoint { this.configuration = config; } + @Override + public HeaderFilterStrategy getHeaderFilterStrategy() { + if (headerFilterStrategy == null) { + headerFilterStrategy = new DaprHeaderFilterStrategy(); + } + return headerFilterStrategy; + } + + /** + * To use a custom {@link org.apache.camel.spi.HeaderFilterStrategy} to filter header to and from Camel message. + */ + @Override + public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } + /** * The DaprClient */ diff --git a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprHeaderFilterStrategy.java b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprHeaderFilterStrategy.java new file mode 100644 index 000000000000..0b3d3f5a8fc9 --- /dev/null +++ b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/DaprHeaderFilterStrategy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.dapr; + +import org.apache.camel.support.DefaultHeaderFilterStrategy; + +/** + * Default header filter strategy for Dapr endpoints. + */ +public class DaprHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + + public DaprHeaderFilterStrategy() { + setLowerCase(true); + setOutFilterStartsWith(CAMEL_FILTER_STARTS_WITH); + setInFilterStartsWith(CAMEL_FILTER_STARTS_WITH); + } +} diff --git a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumer.java b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumer.java index 087e73f5d3a9..0b49b1a2bd1b 100644 --- a/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumer.java +++ b/components/camel-dapr/src/main/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumer.java @@ -102,17 +102,42 @@ public class DaprPubSubConsumer extends DefaultConsumer { message.setBody(cloudEvent.getData()); // set headers - message.setHeader(DaprConstants.PUBSUB_NAME, cloudEvent.getPubsubName()); - message.setHeader(DaprConstants.TOPIC, cloudEvent.getTopic()); - message.setHeader(DaprConstants.ID, cloudEvent.getId()); - message.setHeader(DaprConstants.SOURCE, cloudEvent.getSource()); - message.setHeader(DaprConstants.TYPE, cloudEvent.getType()); - message.setHeader(DaprConstants.SPECIFIC_VERSION, cloudEvent.getSpecversion()); - message.setHeader(DaprConstants.DATA_CONTENT_TYPE, cloudEvent.getDatacontenttype()); - message.setHeader(DaprConstants.BINARY_DATA, cloudEvent.getBinaryData()); - message.setHeader(DaprConstants.TIME, cloudEvent.getTime()); - message.setHeader(DaprConstants.TRACE_PARENT, cloudEvent.getTraceParent()); - message.setHeader(DaprConstants.TRACE_STATE, cloudEvent.getTraceState()); + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.ID, cloudEvent.getId(), + exchange)) { + message.setHeader(DaprConstants.ID, cloudEvent.getId()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.SOURCE, cloudEvent.getSource(), + exchange)) { + message.setHeader(DaprConstants.SOURCE, cloudEvent.getSource()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.TYPE, cloudEvent.getType(), + exchange)) { + message.setHeader(DaprConstants.TYPE, cloudEvent.getType()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.SPECIFIC_VERSION, + cloudEvent.getSpecversion(), exchange)) { + message.setHeader(DaprConstants.SPECIFIC_VERSION, cloudEvent.getSpecversion()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.DATA_CONTENT_TYPE, + cloudEvent.getDatacontenttype(), exchange)) { + message.setHeader(DaprConstants.DATA_CONTENT_TYPE, cloudEvent.getDatacontenttype()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.BINARY_DATA, + cloudEvent.getBinaryData(), exchange)) { + message.setHeader(DaprConstants.BINARY_DATA, cloudEvent.getBinaryData()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.TIME, cloudEvent.getTime(), + exchange)) { + message.setHeader(DaprConstants.TIME, cloudEvent.getTime()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.TRACE_PARENT, + cloudEvent.getTraceParent(), exchange)) { + message.setHeader(DaprConstants.TRACE_PARENT, cloudEvent.getTraceParent()); + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToExternalHeaders(DaprConstants.TRACE_STATE, + cloudEvent.getTraceState(), exchange)) { + message.setHeader(DaprConstants.TRACE_STATE, cloudEvent.getTraceState()); + } return exchange; } diff --git a/components/camel-dapr/src/test/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumerTest.java b/components/camel-dapr/src/test/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumerTest.java index 36d3a9e24cde..8ce122b02ec2 100644 --- a/components/camel-dapr/src/test/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumerTest.java +++ b/components/camel-dapr/src/test/java/org/apache/camel/component/dapr/consumer/DaprPubSubConsumerTest.java @@ -33,7 +33,9 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.component.dapr.DaprConfiguration; import org.apache.camel.component.dapr.DaprConstants; import org.apache.camel.component.dapr.DaprEndpoint; +import org.apache.camel.component.dapr.DaprHeaderFilterStrategy; import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultExchange; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.BeforeEach; @@ -43,6 +45,7 @@ import org.mockito.ArgumentCaptor; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; @@ -78,6 +81,7 @@ public class DaprPubSubConsumerTest extends CamelTestSupport { .thenAnswer(inv -> DefaultExchange.newFromEndpoint(inv.getArgument(0))); when(endpoint.getCamelContext()).thenReturn(context); when(endpoint.getConfiguration()).thenReturn(configuration); + when(endpoint.getHeaderFilterStrategy()).thenReturn(new DaprHeaderFilterStrategy()); when(configuration.getPubSubName()).thenReturn("testPubSub"); when(configuration.getTopic()).thenReturn("testTopic"); when(configuration.getPreviewClient()).thenReturn(mockClient); @@ -131,8 +135,8 @@ public class DaprPubSubConsumerTest extends CamelTestSupport { String body = new String(exchange.getIn().getBody(byte[].class), StandardCharsets.UTF_8); assertEquals(payload, body); - assertEquals(pubSubName, exchange.getIn().getHeader(DaprConstants.PUBSUB_NAME)); - assertEquals(topic, exchange.getIn().getHeader(DaprConstants.TOPIC)); + assertNull(exchange.getIn().getHeader(DaprConstants.PUBSUB_NAME)); + assertNull(exchange.getIn().getHeader(DaprConstants.TOPIC)); assertEquals(id, exchange.getIn().getHeader(DaprConstants.ID)); assertEquals(ver, exchange.getIn().getHeader(DaprConstants.SPECIFIC_VERSION)); assertArrayEquals(payload.getBytes(), (byte[]) exchange.getIn().getHeader(DaprConstants.BINARY_DATA)); @@ -145,4 +149,25 @@ public class DaprPubSubConsumerTest extends CamelTestSupport { verify(mockSubscription).close(); verify(mockClient).close(); } + + @Test + void testConsumerWithHeaderFilterStrategy() throws Exception { + HeaderFilterStrategy strategy = mock(HeaderFilterStrategy.class); + when(endpoint.getHeaderFilterStrategy()).thenReturn(strategy); + when(strategy.applyFilterToExternalHeaders(anyString(), any(), any())).thenReturn(true); + + consumer.doStart(); + + CloudEvent<byte[]> cloudEvent = mock(CloudEvent.class); + when(cloudEvent.getData()).thenReturn("test".getBytes()); + + listenerCaptor.getValue().onEvent(cloudEvent).block(); + + verify(processor).process(exchangeCaptor.capture(), any()); + + Exchange exchange = exchangeCaptor.getValue(); + assertNotNull(exchange); + // All headers filtered + assertNull(exchange.getIn().getHeader(DaprConstants.ID)); + } } diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DaprComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DaprComponentBuilderFactory.java index 560e4d71cb36..577a8863ade8 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DaprComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DaprComponentBuilderFactory.java @@ -701,6 +701,24 @@ public interface DaprComponentBuilderFactory { doSetProperty("autowiredEnabled", autowiredEnabled); return this; } + + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option is a: + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> + * type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default DaprComponentBuilder headerFilterStrategy(org.apache.camel.spi.HeaderFilterStrategy headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } } class DaprComponentBuilderImpl @@ -761,6 +779,7 @@ public interface DaprComponentBuilderFactory { case "workflowStartTime": getOrCreateConfiguration((DaprComponent) component).setWorkflowStartTime((java.time.Instant) value); return true; case "workflowVersion": getOrCreateConfiguration((DaprComponent) component).setWorkflowVersion((java.lang.String) value); return true; case "autowiredEnabled": ((DaprComponent) component).setAutowiredEnabled((boolean) value); return true; + case "headerFilterStrategy": ((DaprComponent) component).setHeaderFilterStrategy((org.apache.camel.spi.HeaderFilterStrategy) value); return true; default: return false; } } diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DaprEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DaprEndpointBuilderFactory.java index 545c7b589b7b..84b62b01e2c9 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DaprEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DaprEndpointBuilderFactory.java @@ -303,6 +303,38 @@ public interface DaprEndpointBuilderFactory { doSetProperty("exchangePattern", exchangePattern); return this; } + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option is a: + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default AdvancedDaprEndpointConsumerBuilder headerFilterStrategy(org.apache.camel.spi.HeaderFilterStrategy headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option will be converted to a + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default AdvancedDaprEndpointConsumerBuilder headerFilterStrategy(String headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } } /** @@ -1094,6 +1126,38 @@ public interface DaprEndpointBuilderFactory { doSetProperty("lazyStartProducer", lazyStartProducer); return this; } + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option is a: + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default AdvancedDaprEndpointProducerBuilder headerFilterStrategy(org.apache.camel.spi.HeaderFilterStrategy headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option will be converted to a + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default AdvancedDaprEndpointProducerBuilder headerFilterStrategy(String headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } } /** @@ -1252,6 +1316,38 @@ public interface DaprEndpointBuilderFactory { return (DaprEndpointBuilder) this; } + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option is a: + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default AdvancedDaprEndpointBuilder headerFilterStrategy(org.apache.camel.spi.HeaderFilterStrategy headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } + /** + * To use a custom HeaderFilterStrategy to filter header to and from + * Camel message. + * + * The option will be converted to a + * <code>org.apache.camel.spi.HeaderFilterStrategy</code> type. + * + * Group: advanced + * + * @param headerFilterStrategy the value to set + * @return the dsl builder + */ + default AdvancedDaprEndpointBuilder headerFilterStrategy(String headerFilterStrategy) { + doSetProperty("headerFilterStrategy", headerFilterStrategy); + return this; + } } public interface DaprBuilders {
