This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.14.x by this push:
new 2813c1d9cb1 feat(components): google-pubsub inherit headers
2813c1d9cb1 is described below
commit 2813c1d9cb134bf5b927aa5081720d039ebc5344
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Tue Sep 9 12:41:33 2025 +0200
feat(components): google-pubsub inherit headers
* Add an header strategy to allow all headers which do not belong to the
protocol to passthrough Camel exchange.
* Deprecated the CamelGooglePubsubAttributes header, which was the way used
so far to pass properties and does not apply to third party applications
externally to Camel.
Closes #CAMEL-22403
---
.../camel/catalog/components/google-pubsub.json | 9 +-
.../pubsub/GooglePubsubEndpointConfigurer.java | 6 +
.../pubsub/GooglePubsubEndpointUriFactory.java | 3 +-
.../component/google/pubsub/google-pubsub.json | 9 +-
.../google/pubsub/GooglePubsubConstants.java | 2 +
.../google/pubsub/GooglePubsubConsumer.java | 16 ++-
.../google/pubsub/GooglePubsubEndpoint.java | 20 +++-
.../pubsub/GooglePubsubHeaderFilterStrategy.java | 48 ++++++++
.../google/pubsub/GooglePubsubProducer.java | 18 +++
.../pubsub/consumer/CamelMessageReceiver.java | 16 +++
.../SingleExchangeRoundAllHeadersIT.java | 122 +++++++++++++++++++++
.../dsl/GooglePubsubEndpointBuilderFactory.java | 103 +++++++++++++++++
12 files changed, 358 insertions(+), 14 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
index ac1b5a25ae9..f6eed89c297 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
@@ -39,7 +39,7 @@
"CamelGooglePubsubMessageId": { "index": 0, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ID of the message, assigned by the
server when the message is published.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#MESSAGE_ID" },
"CamelGooglePubsubMsgAckId": { "index": 1, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ID used to acknowledge the received
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID" },
"CamelGooglePubsubPublishTime": { "index": 2, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "com.google.protobuf.Timestamp", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
time at which the message was published", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
- "CamelGooglePubsubAttributes": { "index": 3, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The attributes of the
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
+ "CamelGooglePubsubAttributes": { "index": 3, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The attributes of the
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
"CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "If non-empty, identifies related
messages for which publish order should be respected.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
"CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Can be used to manually acknowledge or
negative-acknowledge a message when ackMode=NONE.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePub [...]
},
@@ -59,8 +59,9 @@
"pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName":
"Pubsub Endpoint", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Pub\/Sub endpoint to use. Required when using message ordering,
and ensures that messages are received in order even when multiple publishers
are used" },
"retry": { "index": 13, "kind": "parameter", "displayName": "Retry",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "object", "javaType":
"com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired":
false, "secret": false, "description": "A custom RetrySettings to control how
the publisher handles retry-able failures" },
"serializer": { "index": 14, "kind": "parameter", "displayName":
"Serializer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"description": "A custom GooglePubsubSerializer to use for serializing message
payloads in the producer" },
- "loggerId": { "index": 15, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
- "authenticate": { "index": 16, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
- "serviceAccountKey": { "index": 17, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
+ "includeAllGoogleProperties": { "index": 15, "kind": "parameter",
"displayName": "Include All Google Properties", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether to include all Google headers when mapping from
Pubsub to Camel Message. Setting this to true will include properties such as
x-goog etc." },
+ "loggerId": { "index": 16, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
+ "authenticate": { "index": 17, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
+ "serviceAccountKey": { "index": 18, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
index c6b2217cbe7..c9b1c82003b 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
+++
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
@@ -34,6 +34,8 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "exceptionHandler":
target.setExceptionHandler(property(camelContext,
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
case "exchangepattern":
case "exchangePattern":
target.setExchangePattern(property(camelContext,
org.apache.camel.ExchangePattern.class, value)); return true;
+ case "includeallgoogleproperties":
+ case "includeAllGoogleProperties":
target.setIncludeAllGoogleProperties(property(camelContext, boolean.class,
value)); return true;
case "lazystartproducer":
case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
case "loggerid":
@@ -75,6 +77,8 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "exceptionHandler": return
org.apache.camel.spi.ExceptionHandler.class;
case "exchangepattern":
case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+ case "includeallgoogleproperties":
+ case "includeAllGoogleProperties": return boolean.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
case "loggerid":
@@ -112,6 +116,8 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "exceptionHandler": return target.getExceptionHandler();
case "exchangepattern":
case "exchangePattern": return target.getExchangePattern();
+ case "includeallgoogleproperties":
+ case "includeAllGoogleProperties": return
target.isIncludeAllGoogleProperties();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
case "loggerid":
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
index 996447b78ee..2621f51a5b9 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
+++
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class GooglePubsubEndpointUriFactory extends
org.apache.camel.support.com
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(18);
+ Set<String> props = new HashSet<>(19);
props.add("ackMode");
props.add("authenticate");
props.add("bridgeErrorHandler");
@@ -31,6 +31,7 @@ public class GooglePubsubEndpointUriFactory extends
org.apache.camel.support.com
props.add("destinationName");
props.add("exceptionHandler");
props.add("exchangePattern");
+ props.add("includeAllGoogleProperties");
props.add("lazyStartProducer");
props.add("loggerId");
props.add("maxAckExtensionPeriod");
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
index ac1b5a25ae9..f6eed89c297 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
+++
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -39,7 +39,7 @@
"CamelGooglePubsubMessageId": { "index": 0, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ID of the message, assigned by the
server when the message is published.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#MESSAGE_ID" },
"CamelGooglePubsubMsgAckId": { "index": 1, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The ID used to acknowledge the received
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID" },
"CamelGooglePubsubPublishTime": { "index": 2, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "com.google.protobuf.Timestamp", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
time at which the message was published", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
- "CamelGooglePubsubAttributes": { "index": 3, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The attributes of the
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
+ "CamelGooglePubsubAttributes": { "index": 3, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The attributes of the
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
"CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "If non-empty, identifies related
messages for which publish order should be respected.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
"CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Can be used to manually acknowledge or
negative-acknowledge a message when ackMode=NONE.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePub [...]
},
@@ -59,8 +59,9 @@
"pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName":
"Pubsub Endpoint", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Pub\/Sub endpoint to use. Required when using message ordering,
and ensures that messages are received in order even when multiple publishers
are used" },
"retry": { "index": 13, "kind": "parameter", "displayName": "Retry",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "object", "javaType":
"com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired":
false, "secret": false, "description": "A custom RetrySettings to control how
the publisher handles retry-able failures" },
"serializer": { "index": 14, "kind": "parameter", "displayName":
"Serializer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"description": "A custom GooglePubsubSerializer to use for serializing message
payloads in the producer" },
- "loggerId": { "index": 15, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
- "authenticate": { "index": 16, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
- "serviceAccountKey": { "index": 17, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
+ "includeAllGoogleProperties": { "index": 15, "kind": "parameter",
"displayName": "Include All Google Properties", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether to include all Google headers when mapping from
Pubsub to Camel Message. Setting this to true will include properties such as
x-goog etc." },
+ "loggerId": { "index": 16, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
+ "authenticate": { "index": 17, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
+ "serviceAccountKey": { "index": 18, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index efe6974356e..d5eb0937a23 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -27,6 +27,7 @@ public final class GooglePubsubConstants {
@Metadata(label = "consumer", description = "The time at which the message
was published",
javaType = "com.google.protobuf.Timestamp")
public static final String PUBLISH_TIME = "CamelGooglePubsubPublishTime";
+ @Deprecated(since = "4.15")
@Metadata(description = "The attributes of the message.", javaType =
"Map<String, String>")
public static final String ATTRIBUTES = "CamelGooglePubsubAttributes";
@Metadata(label = "producer",
@@ -38,6 +39,7 @@ public final class GooglePubsubConstants {
"message when ackMode=NONE.",
javaType =
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge")
public static final String GOOGLE_PUBSUB_ACKNOWLEDGE =
"CamelGooglePubsubAcknowledge";
+ @Deprecated(since = "4.15")
public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX =
"goog";
public enum AckMode {
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index e83bebe362e..1ca912dcde9 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -44,6 +44,7 @@ import
org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
import
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
private ExecutorService executor;
private final List<Subscriber> subscribers;
private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;
+ private final HeaderFilterStrategy headerFilterStrategy;
GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -71,6 +73,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
}
localLog = LoggerFactory.getLogger(loggerId);
+ headerFilterStrategy = new
GooglePubsubHeaderFilterStrategy(endpoint.isIncludeAllGoogleProperties());
}
@Override
@@ -187,9 +190,11 @@ public class GooglePubsubConsumer extends DefaultConsumer {
Exchange exchange = createExchange(true);
exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
+ // Standard headers
exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId());
exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
pubsubMessage.getMessageId());
exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME,
pubsubMessage.getPublishTime());
+ // Deprecated: replaced by headerFilterStrategy
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
pubsubMessage.getAttributesMap());
//existing subscriber can not be propagated, because
it will be closed at the end of this block
@@ -197,13 +202,22 @@ public class GooglePubsubConsumer extends DefaultConsumer
{
// (see
https://issues.apache.org/jira/browse/CAMEL-18447)
GooglePubsubAcknowledge acknowledge = new
AcknowledgeSync(
() ->
endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName);
-
if (endpoint.getAckMode() !=
GooglePubsubConstants.AckMode.NONE) {
exchange.getExchangeExtension().addOnCompletion(new
AcknowledgeCompletion(acknowledge));
} else {
exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
acknowledge);
}
+ // Inherit the rest of headers
+ for (String pubSubHeader :
pubsubMessage.getAttributesMap().keySet()) {
+ String value =
pubsubMessage.getAttributesMap().get(pubSubHeader);
+ if (headerFilterStrategy != null
+ &&
headerFilterStrategy.applyFilterToExternalHeaders(pubSubHeader, value,
exchange)) {
+ continue;
+ }
+ exchange.getIn().setHeader(pubSubHeader, value);
+ }
+
try {
processor.process(exchange);
} catch (Exception e) {
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index 62a5d3ae0b3..48f15df8a83 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -94,6 +94,10 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
implements EndpointSer
@UriParam(label = "producer,advanced",
description = "A custom RetrySettings to control how the
publisher handles retry-able failures")
private RetrySettings retry;
+ @UriParam(label = "advanced",
+ description = "Whether to include all Google headers when
mapping from Pubsub to Camel Message."
+ + " Setting this to true will include properties
such as x-goog etc.")
+ private boolean includeAllGoogleProperties;
public GooglePubsubEndpoint(String uri, Component component) {
super(uri, component);
@@ -203,12 +207,12 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
implements EndpointSer
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- public boolean isSynchronousPull() {
- return synchronousPull;
+ public boolean isIncludeAllGoogleProperties() {
+ return includeAllGoogleProperties;
}
- public void setSynchronousPull(Boolean synchronousPull) {
- this.synchronousPull = synchronousPull;
+ public void setIncludeAllGoogleProperties(Boolean
includeAllGoogleProperties) {
+ this.includeAllGoogleProperties = includeAllGoogleProperties;
}
public GooglePubsubConstants.AckMode getAckMode() {
@@ -251,6 +255,14 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
implements EndpointSer
this.retry = retry;
}
+ public boolean isSynchronousPull() {
+ return synchronousPull;
+ }
+
+ public void setSynchronousPull(Boolean synchronousPull) {
+ this.synchronousPull = synchronousPull;
+ }
+
public String getPubsubEndpoint() {
return this.pubsubEndpoint;
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubHeaderFilterStrategy.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubHeaderFilterStrategy.java
new file mode 100644
index 00000000000..7232a5c2c6d
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubHeaderFilterStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub;
+
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+
+public class GooglePubsubHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
+
+ public GooglePubsubHeaderFilterStrategy() {
+ this(false);
+ }
+
+ public GooglePubsubHeaderFilterStrategy(boolean
includeAllGoogleProperties) {
+
setOutFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH);
+
setInFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH);
+ getOutFilter().add("authorization");
+ if (!includeAllGoogleProperties) {
+ ignoreGoogProperties();
+ }
+ }
+
+ protected void ignoreGoogProperties() {
+ String[] filterStartWith = new
String[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length + 2];
+ System.arraycopy(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH,
0,
+ filterStartWith, 0,
DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length);
+
filterStartWith[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length] =
"x-goog";
+
filterStartWith[DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH.length +
1] = "X-GOOG";
+ setOutFilterStartsWith(filterStartWith);
+ setInFilterStartsWith(filterStartWith);
+ getOutFilter().add("google-cloud-resource-prefix");
+ getOutFilter().add("grpc-timeout");
+ }
+
+}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
index 22b50890773..9c2bc1f7b66 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
@@ -25,6 +25,7 @@ import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import static
org.apache.camel.component.google.pubsub.GooglePubsubConstants.RES
public class GooglePubsubProducer extends DefaultProducer {
public Logger logger;
+ private final HeaderFilterStrategy headerFilterStrategy;
public GooglePubsubProducer(GooglePubsubEndpoint endpoint) {
super(endpoint);
@@ -50,6 +52,7 @@ public class GooglePubsubProducer extends DefaultProducer {
}
logger = LoggerFactory.getLogger(loggerId);
+ headerFilterStrategy = new
GooglePubsubHeaderFilterStrategy(endpoint.isIncludeAllGoogleProperties());
}
/**
@@ -101,6 +104,9 @@ public class GooglePubsubProducer extends DefaultProducer {
}
PubsubMessage.Builder messageBuilder =
PubsubMessage.newBuilder().setData(byteString);
+
+ // Deprecated: start
+ // Replaced by headerFilterStrategy
Map<String, String> attributes =
exchange.getIn().getHeader(ATTRIBUTES, Map.class);
if (attributes != null) {
for (Map.Entry<String, String> attribute : attributes.entrySet()) {
@@ -109,11 +115,23 @@ public class GooglePubsubProducer extends DefaultProducer
{
}
}
}
+ // Deprecated: end
+
String orderingKey = exchange.getIn().getHeader(ORDERING_KEY,
String.class);
if (orderingKey != null) {
messageBuilder.setOrderingKey(orderingKey);
}
+ // Inherit the rest of headers
+ for (String camelHeader : exchange.getIn().getHeaders().keySet()) {
+ String value = exchange.getIn().getHeader(camelHeader,
String.class);
+ if (headerFilterStrategy != null
+ &&
headerFilterStrategy.applyFilterToExternalHeaders(camelHeader, value,
exchange)) {
+ continue;
+ }
+ messageBuilder.putAttributes(camelHeader, value);
+ }
+
PubsubMessage message = messageBuilder.build();
ApiFuture<String> messageIdFuture = publisher.publish(message);
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index 8a123df7236..4169c2630f3 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -25,6 +25,8 @@ import org.apache.camel.Processor;
import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
import org.apache.camel.component.google.pubsub.GooglePubsubConsumer;
import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
+import
org.apache.camel.component.google.pubsub.GooglePubsubHeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +36,7 @@ public class CamelMessageReceiver implements MessageReceiver {
private final GooglePubsubConsumer consumer;
private final GooglePubsubEndpoint endpoint;
private final Processor processor;
+ private final HeaderFilterStrategy headerFilterStrategy;
public CamelMessageReceiver(GooglePubsubConsumer consumer,
GooglePubsubEndpoint endpoint, Processor processor) {
this.consumer = consumer;
@@ -44,6 +47,7 @@ public class CamelMessageReceiver implements MessageReceiver {
loggerId = this.getClass().getName();
}
localLog = LoggerFactory.getLogger(loggerId);
+ headerFilterStrategy = new
GooglePubsubHeaderFilterStrategy(endpoint.isIncludeAllGoogleProperties());
}
@Override
@@ -55,8 +59,10 @@ public class CamelMessageReceiver implements MessageReceiver
{
Exchange exchange = consumer.createExchange(true);
exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
+ // Standard headers
exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
pubsubMessage.getMessageId());
exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME,
pubsubMessage.getPublishTime());
+ // Deprecated: replaced by headerFilterStrategy
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
pubsubMessage.getAttributesMap());
GooglePubsubAcknowledge acknowledge = new
AcknowledgeAsync(ackReplyConsumer);
@@ -66,6 +72,16 @@ public class CamelMessageReceiver implements MessageReceiver
{
exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
acknowledge);
}
+ // Inherit the rest of headers
+ for (String pubSubHeader : pubsubMessage.getAttributesMap().keySet()) {
+ String value = pubsubMessage.getAttributesMap().get(pubSubHeader);
+ if (headerFilterStrategy != null
+ &&
headerFilterStrategy.applyFilterToExternalHeaders(pubSubHeader, value,
exchange)) {
+ continue;
+ }
+ exchange.getIn().setHeader(pubSubHeader, value);
+ }
+
try {
processor.process(exchange);
} catch (Exception e) {
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundAllHeadersIT.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundAllHeadersIT.java
new file mode 100644
index 00000000000..e401bdc3efe
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundAllHeadersIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import java.util.List;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class SingleExchangeRoundAllHeadersIT extends PubsubTestSupport {
+
+ private static final String TOPIC_NAME = "singleSend";
+ private static final String SUBSCRIPTION_NAME = "singleReceive";
+
+ @EndpointInject("direct:from")
+ private Endpoint directIn;
+
+ @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
+ private Endpoint pubsubTopic;
+
+ @EndpointInject("mock:sendResult")
+ private MockEndpoint sendResult;
+
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true")
+ private Endpoint pubsubSubscription;
+
+ @EndpointInject("mock:receiveResult")
+ private MockEndpoint receiveResult;
+
+ @Produce("direct:from")
+ private ProducerTemplate producer;
+
+ @Override
+ public void createTopicSubscription() {
+ createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+
from(directIn).routeId("Single_Send").to(pubsubTopic).to(sendResult);
+
+
from(pubsubSubscription).routeId("Single_Receive").to("direct:one");
+
+ from("direct:one").to(receiveResult);
+ }
+ };
+ }
+
+ @Test
+ public void testIncludeHeaders() throws Exception {
+
+ Exchange exchange = new DefaultExchange(context);
+
+ String attributeKey = "ATTRIBUTE-TEST-KEY";
+ String attributeValue = "ATTRIBUTE-TEST-VALUE";
+ String hiddenAttributeKey = "x-goog-attr";
+ String hiddenAttributeValue = "ATTRIBUTE-HIDDEN-VALUE";
+
+ exchange.getIn().setBody("Single : " + exchange.getExchangeId());
+ exchange.getIn().setHeader(attributeKey, attributeValue);
+ exchange.getIn().setHeader(hiddenAttributeKey, hiddenAttributeValue);
+
+ receiveResult.expectedMessageCount(1);
+
receiveResult.expectedBodiesReceivedInAnyOrder(exchange.getIn().getBody());
+
+ producer.send(exchange);
+
+ List<Exchange> sentExchanges = sendResult.getExchanges();
+ assertEquals(1, sentExchanges.size(), "Sent exchanges");
+
+ Exchange sentExchange = sentExchanges.get(0);
+
+
assertEquals(exchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID),
+
sentExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID), "Sent ID");
+
+ receiveResult.assertIsSatisfied(5000);
+
+ List<Exchange> receivedExchanges = receiveResult.getExchanges();
+
+ assertNotNull(receivedExchanges, "Received exchanges");
+
+ Exchange receivedExchange = receivedExchanges.get(0);
+
+
assertNotNull(receivedExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID),
"PUBSUB Message ID Property");
+
assertNotNull(receivedExchange.getIn().getHeader(GooglePubsubConstants.PUBLISH_TIME),
"PUBSUB Published Time");
+
+ assertEquals(attributeValue,
receivedExchange.getIn().getHeader(attributeKey));
+ assertNull(receivedExchange.getIn().getHeader(hiddenAttributeKey));
+
+
assertEquals(sentExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID),
+
receivedExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID));
+ }
+}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
index f72cd982651..0cad250eaca 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
@@ -380,6 +380,40 @@ public interface GooglePubsubEndpointBuilderFactory {
doSetProperty("synchronousPull", synchronousPull);
return this;
}
+ /**
+ * Whether to include all Google headers when mapping from Pubsub to
+ * Camel Message. Setting this to true will include properties such as
+ * x-goog etc.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param includeAllGoogleProperties the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointConsumerBuilder
includeAllGoogleProperties(boolean includeAllGoogleProperties) {
+ doSetProperty("includeAllGoogleProperties",
includeAllGoogleProperties);
+ return this;
+ }
+ /**
+ * Whether to include all Google headers when mapping from Pubsub to
+ * Camel Message. Setting this to true will include properties such as
+ * x-goog etc.
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param includeAllGoogleProperties the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointConsumerBuilder
includeAllGoogleProperties(String includeAllGoogleProperties) {
+ doSetProperty("includeAllGoogleProperties",
includeAllGoogleProperties);
+ return this;
+ }
/**
* To use a custom logger name.
*
@@ -622,6 +656,40 @@ public interface GooglePubsubEndpointBuilderFactory {
doSetProperty("serializer", serializer);
return this;
}
+ /**
+ * Whether to include all Google headers when mapping from Pubsub to
+ * Camel Message. Setting this to true will include properties such as
+ * x-goog etc.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param includeAllGoogleProperties the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointProducerBuilder
includeAllGoogleProperties(boolean includeAllGoogleProperties) {
+ doSetProperty("includeAllGoogleProperties",
includeAllGoogleProperties);
+ return this;
+ }
+ /**
+ * Whether to include all Google headers when mapping from Pubsub to
+ * Camel Message. Setting this to true will include properties such as
+ * x-goog etc.
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param includeAllGoogleProperties the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointProducerBuilder
includeAllGoogleProperties(String includeAllGoogleProperties) {
+ doSetProperty("includeAllGoogleProperties",
includeAllGoogleProperties);
+ return this;
+ }
/**
* To use a custom logger name.
*
@@ -712,6 +780,40 @@ public interface GooglePubsubEndpointBuilderFactory {
return (GooglePubsubEndpointBuilder) this;
}
+ /**
+ * Whether to include all Google headers when mapping from Pubsub to
+ * Camel Message. Setting this to true will include properties such as
+ * x-goog etc.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param includeAllGoogleProperties the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointBuilder
includeAllGoogleProperties(boolean includeAllGoogleProperties) {
+ doSetProperty("includeAllGoogleProperties",
includeAllGoogleProperties);
+ return this;
+ }
+ /**
+ * Whether to include all Google headers when mapping from Pubsub to
+ * Camel Message. Setting this to true will include properties such as
+ * x-goog etc.
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param includeAllGoogleProperties the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointBuilder
includeAllGoogleProperties(String includeAllGoogleProperties) {
+ doSetProperty("includeAllGoogleProperties",
includeAllGoogleProperties);
+ return this;
+ }
/**
* To use a custom logger name.
*
@@ -852,6 +954,7 @@ public interface GooglePubsubEndpointBuilderFactory {
*
* @return the name of the header {@code GooglePubsubAttributes}.
*/
+ @Deprecated
public String googlePubsubAttributes() {
return "CamelGooglePubsubAttributes";
}