This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new ed03386dad18 CAMEL-23142 - camel-google-pubsub: Add
maxDeliveryAttempts enforcemen… (#21798)
ed03386dad18 is described below
commit ed03386dad18bc770e7963393943a0c30a749c80
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Mar 6 20:48:26 2026 +0100
CAMEL-23142 - camel-google-pubsub: Add maxDeliveryAttempts enforcemen…
(#21798)
* CAMEL-23142 - camel-google-pubsub: Add maxDeliveryAttempts enforcement to
prevent infinite redelivery loops
Signed-off-by: Andrea Cosentino <[email protected]>
* Regen
Signed-off-by: Andrea Cosentino <[email protected]>
---------
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../camel/catalog/components/google-pubsub.json | 25 ++---
.../jbang/camel-jbang-configuration-metadata.json | 2 +
.../pubsub/GooglePubsubEndpointConfigurer.java | 6 ++
.../pubsub/GooglePubsubEndpointUriFactory.java | 3 +-
.../component/google/pubsub/google-pubsub.json | 25 ++---
.../src/main/docs/google-pubsub-component.adoc | 27 +++++
.../google/pubsub/GooglePubsubComponent.java | 15 +++
.../google/pubsub/GooglePubsubConsumer.java | 56 ++++++++++
.../google/pubsub/GooglePubsubEndpoint.java | 24 +++++
.../pubsub/consumer/CamelMessageReceiver.java | 9 ++
.../pubsub/integration/MaxDeliveryAttemptsIT.java | 115 +++++++++++++++++++++
.../PubsubEndpointMaxDeliveryAttemptsTest.java | 77 ++++++++++++++
.../dsl/GooglePubsubEndpointBuilderFactory.java | 46 +++++++++
13 files changed, 405 insertions(+), 25 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
index 00b807df40db..ac37c22f8741 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
@@ -54,17 +54,18 @@
"exceptionHandler": { "index": 5, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By def [...]
"exchangePattern": { "index": 6, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "enum", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
"maxAckExtensionPeriod": { "index": 7, "kind": "parameter", "displayName":
"Max Ack Extension Period", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 3600,
"description": "Set the maximum period a message ack deadline will be extended.
Value in seconds" },
- "maxMessagesPerPoll": { "index": 8, "kind": "parameter", "displayName":
"Max Messages Per Poll", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Integer", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 1, "description": "The max number of messages to receive from
the server in a single API call" },
- "synchronousPull": { "index": 9, "kind": "parameter", "displayName":
"Synchronous Pull", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Synchronously pull batches of messages"
},
- "lazyStartProducer": { "index": 10, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "messageOrderingEnabled": { "index": 11, "kind": "parameter",
"displayName": "Message Ordering Enabled", "group": "producer (advanced)",
"label": "producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Should message ordering be enabled" },
- "pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName":
"Pubsub Endpoint", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Pub\/Sub endpoint to use. Required when using message ordering,
and ensures that messages are received in order even when multiple publishers
are used" },
- "retry": { "index": 13, "kind": "parameter", "displayName": "Retry",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "object", "javaType":
"com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired":
false, "secret": false, "description": "A custom RetrySettings to control how
the publisher handles retry-able failures" },
- "serializer": { "index": 14, "kind": "parameter", "displayName":
"Serializer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"description": "A custom GooglePubsubSerializer to use for serializing message
payloads in the producer" },
- "headerFilterStrategy": { "index": 15, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "description": "To use a custom HeaderFilterStrategy to
filter headers to and from Camel message." },
- "includeAllGoogleProperties": { "index": 16, "kind": "parameter",
"displayName": "Include All Google Properties", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether to include all Google headers when mapping from
Pubsub to Camel Message. Setting this to true will include properties such as
x-goog etc." },
- "loggerId": { "index": 17, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
- "authenticate": { "index": 18, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
- "serviceAccountKey": { "index": 19, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
+ "maxDeliveryAttempts": { "index": 8, "kind": "parameter", "displayName":
"Max Delivery Attempts", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 0,
"description": "The maximum number of delivery attempts for each message. When
set to a positive value, the consumer will automatically nack any message whose
delivery attempt count is greater t [...]
+ "maxMessagesPerPoll": { "index": 9, "kind": "parameter", "displayName":
"Max Messages Per Poll", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Integer", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 1, "description": "The max number of messages to receive from
the server in a single API call" },
+ "synchronousPull": { "index": 10, "kind": "parameter", "displayName":
"Synchronous Pull", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Synchronously pull batches of messages"
},
+ "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "messageOrderingEnabled": { "index": 12, "kind": "parameter",
"displayName": "Message Ordering Enabled", "group": "producer (advanced)",
"label": "producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Should message ordering be enabled" },
+ "pubsubEndpoint": { "index": 13, "kind": "parameter", "displayName":
"Pubsub Endpoint", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Pub\/Sub endpoint to use. Required when using message ordering,
and ensures that messages are received in order even when multiple publishers
are used" },
+ "retry": { "index": 14, "kind": "parameter", "displayName": "Retry",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "object", "javaType":
"com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired":
false, "secret": false, "description": "A custom RetrySettings to control how
the publisher handles retry-able failures" },
+ "serializer": { "index": 15, "kind": "parameter", "displayName":
"Serializer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"description": "A custom GooglePubsubSerializer to use for serializing message
payloads in the producer" },
+ "headerFilterStrategy": { "index": 16, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "description": "To use a custom HeaderFilterStrategy to
filter headers to and from Camel message." },
+ "includeAllGoogleProperties": { "index": 17, "kind": "parameter",
"displayName": "Include All Google Properties", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether to include all Google headers when mapping from
Pubsub to Camel Message. Setting this to true will include properties such as
x-goog etc." },
+ "loggerId": { "index": 18, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
+ "authenticate": { "index": 19, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
+ "serviceAccountKey": { "index": 20, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
}
}
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/jbang/camel-jbang-configuration-metadata.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/jbang/camel-jbang-configuration-metadata.json
index 01c0f4bbcdec..22ca438bb84d 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/jbang/camel-jbang-configuration-metadata.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/jbang/camel-jbang-configuration-metadata.json
@@ -3,6 +3,7 @@
{ "name": "camel.jbang", "description": "Camel JBang configurations" }
],
"properties": [
+ { "name": "camel.jbang.buildTool", "required": false, "description":
"Build tool to use (Maven or Gradle)", "type": "string", "javaType": "String",
"defaultValue": "Maven", "secret": false },
{ "name": "camel.jbang.camel-version", "required": false, "description":
"The version of Apache Camel to use", "type": "string", "javaType": "String",
"secret": false },
{ "name": "camel.jbang.camelSpringBootVersion", "required": false,
"description": "To use a custom Camel version when running or export to Spring
Boot", "label": "spring-boot", "type": "string", "javaType": "String",
"secret": false },
{ "name": "camel.jbang.classpathFiles", "required": false, "description":
"Additional files to add to classpath (Use commas to separate multiple
files).", "type": "string", "javaType": "String", "secret": false },
@@ -16,6 +17,7 @@
{ "name": "camel.jbang.excludes", "required": false, "description":
"Exclude files by name or pattern (Use commas to separate multiple files)",
"type": "string", "javaType": "String", "secret": false },
{ "name": "camel.jbang.exportDir", "required": false, "description":
"Directory where the project will be exported", "type": "string", "javaType":
"String", "defaultValue": ".", "secret": false },
{ "name": "camel.jbang.gav", "required": false, "description": "Maven
coordinate (groupId:artifactId:version)", "type": "string", "javaType":
"String", "secret": false },
+ { "name": "camel.jbang.gradleWrapper", "required": false, "description":
"Include Gradle Wrapper files in the exported project", "type": "boolean",
"javaType": "boolean", "defaultValue": true, "secret": false },
{ "name": "camel.jbang.groovyFiles", "required": false, "description":
"Additional groovy source files to export to src\/main\/resources\/camel-groovy
directory (Use commas to separate multiple files)", "type": "string",
"javaType": "String", "secret": false },
{ "name": "camel.jbang.health", "required": false, "description": "Health
check at \/observe\/health on local HTTP server (port 8080 by default)",
"type": "boolean", "javaType": "boolean", "defaultValue": false, "secret":
false, "deprecated": true },
{ "name": "camel.jbang.ignoreLoadingError", "required": false,
"description": "Whether to ignore route loading and compilation errors (use
this with care!)", "label": "advanced", "type": "boolean", "javaType":
"boolean", "defaultValue": false, "secret": false },
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
index 6f60fab7267e..131534da3d08 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
+++
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
@@ -44,6 +44,8 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "loggerId": target.setLoggerId(property(camelContext,
java.lang.String.class, value)); return true;
case "maxackextensionperiod":
case "maxAckExtensionPeriod":
target.setMaxAckExtensionPeriod(property(camelContext, int.class, value));
return true;
+ case "maxdeliveryattempts":
+ case "maxDeliveryAttempts":
target.setMaxDeliveryAttempts(property(camelContext, int.class, value)); return
true;
case "maxmessagesperpoll":
case "maxMessagesPerPoll":
target.setMaxMessagesPerPoll(property(camelContext, java.lang.Integer.class,
value)); return true;
case "messageorderingenabled":
@@ -89,6 +91,8 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "loggerId": return java.lang.String.class;
case "maxackextensionperiod":
case "maxAckExtensionPeriod": return int.class;
+ case "maxdeliveryattempts":
+ case "maxDeliveryAttempts": return int.class;
case "maxmessagesperpoll":
case "maxMessagesPerPoll": return java.lang.Integer.class;
case "messageorderingenabled":
@@ -130,6 +134,8 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "loggerId": return target.getLoggerId();
case "maxackextensionperiod":
case "maxAckExtensionPeriod": return target.getMaxAckExtensionPeriod();
+ case "maxdeliveryattempts":
+ case "maxDeliveryAttempts": return target.getMaxDeliveryAttempts();
case "maxmessagesperpoll":
case "maxMessagesPerPoll": return target.getMaxMessagesPerPoll();
case "messageorderingenabled":
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
index 2649a9a6eef3..61b8d3cbb091 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
+++
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class GooglePubsubEndpointUriFactory extends
org.apache.camel.support.com
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(20);
+ Set<String> props = new HashSet<>(21);
props.add("ackMode");
props.add("authenticate");
props.add("bridgeErrorHandler");
@@ -36,6 +36,7 @@ public class GooglePubsubEndpointUriFactory extends
org.apache.camel.support.com
props.add("lazyStartProducer");
props.add("loggerId");
props.add("maxAckExtensionPeriod");
+ props.add("maxDeliveryAttempts");
props.add("maxMessagesPerPoll");
props.add("messageOrderingEnabled");
props.add("projectId");
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
index 00b807df40db..ac37c22f8741 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
+++
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -54,17 +54,18 @@
"exceptionHandler": { "index": 5, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By def [...]
"exchangePattern": { "index": 6, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "enum", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
"maxAckExtensionPeriod": { "index": 7, "kind": "parameter", "displayName":
"Max Ack Extension Period", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 3600,
"description": "Set the maximum period a message ack deadline will be extended.
Value in seconds" },
- "maxMessagesPerPoll": { "index": 8, "kind": "parameter", "displayName":
"Max Messages Per Poll", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Integer", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 1, "description": "The max number of messages to receive from
the server in a single API call" },
- "synchronousPull": { "index": 9, "kind": "parameter", "displayName":
"Synchronous Pull", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Synchronously pull batches of messages"
},
- "lazyStartProducer": { "index": 10, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "messageOrderingEnabled": { "index": 11, "kind": "parameter",
"displayName": "Message Ordering Enabled", "group": "producer (advanced)",
"label": "producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Should message ordering be enabled" },
- "pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName":
"Pubsub Endpoint", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Pub\/Sub endpoint to use. Required when using message ordering,
and ensures that messages are received in order even when multiple publishers
are used" },
- "retry": { "index": 13, "kind": "parameter", "displayName": "Retry",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "object", "javaType":
"com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired":
false, "secret": false, "description": "A custom RetrySettings to control how
the publisher handles retry-able failures" },
- "serializer": { "index": 14, "kind": "parameter", "displayName":
"Serializer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"description": "A custom GooglePubsubSerializer to use for serializing message
payloads in the producer" },
- "headerFilterStrategy": { "index": 15, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "description": "To use a custom HeaderFilterStrategy to
filter headers to and from Camel message." },
- "includeAllGoogleProperties": { "index": 16, "kind": "parameter",
"displayName": "Include All Google Properties", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether to include all Google headers when mapping from
Pubsub to Camel Message. Setting this to true will include properties such as
x-goog etc." },
- "loggerId": { "index": 17, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
- "authenticate": { "index": 18, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
- "serviceAccountKey": { "index": 19, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
+ "maxDeliveryAttempts": { "index": 8, "kind": "parameter", "displayName":
"Max Delivery Attempts", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 0,
"description": "The maximum number of delivery attempts for each message. When
set to a positive value, the consumer will automatically nack any message whose
delivery attempt count is greater t [...]
+ "maxMessagesPerPoll": { "index": 9, "kind": "parameter", "displayName":
"Max Messages Per Poll", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Integer", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 1, "description": "The max number of messages to receive from
the server in a single API call" },
+ "synchronousPull": { "index": 10, "kind": "parameter", "displayName":
"Synchronous Pull", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Synchronously pull batches of messages"
},
+ "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "messageOrderingEnabled": { "index": 12, "kind": "parameter",
"displayName": "Message Ordering Enabled", "group": "producer (advanced)",
"label": "producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Should message ordering be enabled" },
+ "pubsubEndpoint": { "index": 13, "kind": "parameter", "displayName":
"Pubsub Endpoint", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Pub\/Sub endpoint to use. Required when using message ordering,
and ensures that messages are received in order even when multiple publishers
are used" },
+ "retry": { "index": 14, "kind": "parameter", "displayName": "Retry",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "object", "javaType":
"com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired":
false, "secret": false, "description": "A custom RetrySettings to control how
the publisher handles retry-able failures" },
+ "serializer": { "index": 15, "kind": "parameter", "displayName":
"Serializer", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"description": "A custom GooglePubsubSerializer to use for serializing message
payloads in the producer" },
+ "headerFilterStrategy": { "index": 16, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "description": "To use a custom HeaderFilterStrategy to
filter headers to and from Camel message." },
+ "includeAllGoogleProperties": { "index": 17, "kind": "parameter",
"displayName": "Include All Google Properties", "group": "advanced", "label":
"advanced", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether to include all Google headers when mapping from
Pubsub to Camel Message. Setting this to true will include properties such as
x-goog etc." },
+ "loggerId": { "index": 18, "kind": "parameter", "displayName": "Logger
Id", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": true, "autowired":
false, "secret": false, "description": "To use a custom logger name" },
+ "authenticate": { "index": 19, "kind": "parameter", "displayName":
"Authenticate", "group": "security", "label": "security", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "description": "Use Credentials
when interacting with PubSub service (no authentication is required when using
emulator)." },
+ "serviceAccountKey": { "index": 20, "kind": "parameter", "displayName":
"Service Account Key", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The Service account key
that can be used as credentials for the PubSub publisher\/subscriber. It can be
loaded by default from classpath, but you can prefix with classpath:, file:, or
http: to load the resour [...]
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 68ab8eb44d91..d795cf6c2101 100644
---
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -116,6 +116,33 @@
from("google-pubsub:{{project.name}}:{{subscription.name}}")
With a dead-letter policy, after the configured maximum delivery attempts are
exceeded,
the message will automatically be forwarded to the dead-letter topic by Google
PubSub.
+==== Automatic Max Delivery Attempts Enforcement
+
+The component can enforce the subscription's `maxDeliveryAttempts` setting at
the consumer level.
+When enabled, messages whose delivery attempt count is greater than or equal
to the configured maximum
+will be automatically nacked without processing, allowing Pub/Sub to route
them to the dead-letter topic.
+This prevents infinite redelivery loops that can occur with short retry delays.
+
+The `maxDeliveryAttempts` value is resolved as follows:
+
+1. If explicitly set via the endpoint option, that value is used.
+2. If not explicitly set, the component attempts to auto-fetch the value from
the subscription's
+ dead-letter policy at consumer startup.
+3. If auto-fetch fails (e.g., insufficient permissions or no dead-letter
policy), enforcement is
+ disabled and a warning is logged.
+4. A value of `0` disables enforcement.
+
+[source,java]
+----
+// Explicit configuration
+from("google-pubsub:{{project.name}}:{{subscription.name}}?maxDeliveryAttempts=5")
+ .to("direct:process");
+
+// Auto-fetch from subscription dead-letter policy (default behavior when not
set)
+from("google-pubsub:{{project.name}}:{{subscription.name}}")
+ .to("direct:process");
+----
+
=== Message Body
The consumer endpoint returns the content of the message as `byte[]`. Exactly
as the underlying system sends it.
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
index fd2fe9c00dff..9a98f73413e2 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
@@ -37,6 +37,8 @@ import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
@@ -215,6 +217,19 @@ public class GooglePubsubComponent extends
HeaderFilterStrategyComponent {
return builder.build().createStub();
}
+ public SubscriptionAdminClient
getSubscriptionAdminClient(GooglePubsubEndpoint googlePubsubEndpoint) throws
IOException {
+ SubscriptionAdminSettings.Builder builder =
SubscriptionAdminSettings.newBuilder();
+
+ if (StringHelper.trimToNull(endpoint) != null) {
+ ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+ TransportChannelProvider channelProvider
+ =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ builder.setTransportChannelProvider(channelProvider);
+ }
+
builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint));
+ return SubscriptionAdminClient.create(builder.build());
+ }
+
private CredentialsProvider getCredentialsProvider(GooglePubsubEndpoint
endpoint) throws IOException {
CredentialsProvider credentialsProvider;
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index ef40e7605580..ab01edd20f21 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -32,13 +32,16 @@ import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.base.Strings;
+import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.Subscription;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion;
@@ -64,6 +67,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
private final List<Subscriber> subscribers;
private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;
private final HeaderFilterStrategy headerFilterStrategy;
+ private int resolvedMaxDeliveryAttempts;
GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -86,6 +90,13 @@ public class GooglePubsubConsumer extends DefaultConsumer {
super.doStart();
localLog.info("Starting Google PubSub consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
+
+ resolvedMaxDeliveryAttempts = resolveMaxDeliveryAttempts();
+ if (resolvedMaxDeliveryAttempts > 0) {
+ localLog.info("Max delivery attempts enforcement enabled: {} for
subscription {}",
+ resolvedMaxDeliveryAttempts,
endpoint.getDestinationName());
+ }
+
executor = endpoint.createExecutor(this);
for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
executor.submit(new SubscriberWrapper());
@@ -128,6 +139,40 @@ public class GooglePubsubConsumer extends DefaultConsumer {
pendingSynchronousPullResponses.clear();
}
+ private int resolveMaxDeliveryAttempts() {
+ if (endpoint.isMaxDeliveryAttemptsExplicitlySet()) {
+ localLog.debug("Using explicitly configured maxDeliveryAttempts:
{}", endpoint.getMaxDeliveryAttempts());
+ return endpoint.getMaxDeliveryAttempts();
+ }
+
+ String subscriptionName =
ProjectSubscriptionName.format(endpoint.getProjectId(),
endpoint.getDestinationName());
+ try (SubscriptionAdminClient adminClient =
endpoint.getComponent().getSubscriptionAdminClient(endpoint)) {
+ Subscription subscription =
adminClient.getSubscription(subscriptionName);
+ if (subscription.hasDeadLetterPolicy()) {
+ DeadLetterPolicy dlp = subscription.getDeadLetterPolicy();
+ int maxAttempts = dlp.getMaxDeliveryAttempts();
+ if (maxAttempts > 0) {
+ localLog.info("Auto-fetched maxDeliveryAttempts={} from
subscription dead-letter policy for {}",
+ maxAttempts, endpoint.getDestinationName());
+ return maxAttempts;
+ }
+ }
+ localLog.debug("No dead-letter policy found on subscription {},
maxDeliveryAttempts enforcement disabled",
+ endpoint.getDestinationName());
+ } catch (Exception e) {
+ localLog.warn("Failed to auto-fetch maxDeliveryAttempts from
subscription {}: {}. "
+ + "Max delivery attempts enforcement will be
disabled. "
+ + "Set the maxDeliveryAttempts endpoint option
explicitly to enable enforcement.",
+ endpoint.getDestinationName(), e.getMessage());
+ localLog.debug("Auto-fetch failure details", e);
+ }
+ return 0;
+ }
+
+ public int getResolvedMaxDeliveryAttempts() {
+ return resolvedMaxDeliveryAttempts;
+ }
+
private class SubscriberWrapper implements Runnable {
private final String subscriptionName;
@@ -259,6 +304,17 @@ public class GooglePubsubConsumer extends DefaultConsumer {
exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT,
deliveryAttempt);
}
+ // Enforce maxDeliveryAttempts: nack without
processing if limit reached
+ if (resolvedMaxDeliveryAttempts > 0 && deliveryAttempt
>= resolvedMaxDeliveryAttempts) {
+ localLog.info(
+ "Message {} has reached max delivery
attempts ({}/{}), nacking to route to dead-letter topic",
+ pubsubMessage.getMessageId(),
deliveryAttempt, resolvedMaxDeliveryAttempts);
+ GooglePubsubAcknowledge ack = new AcknowledgeSync(
+ () ->
endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName);
+ ack.nack(exchange);
+ continue;
+ }
+
//existing subscriber can not be propagated, because
it will be closed at the end of this block
//subscriber will be created at the moment of use
// (see
https://issues.apache.org/jira/browse/CAMEL-18447)
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index 2fdec64f2e7c..f0000a0eb7c8 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -81,6 +81,17 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
implements EndpointSer
description = "Set the maximum period a message ack deadline
will be extended. Value in seconds",
defaultValue = "3600")
private int maxAckExtensionPeriod = 3600;
+ @UriParam(label = "consumer,advanced", name = "maxDeliveryAttempts",
+ description = "The maximum number of delivery attempts for each
message. "
+ + "When set to a positive value, the consumer will
automatically nack any message whose delivery attempt count "
+ + "is greater than or equal to this value,
allowing Pub/Sub to route it to the dead-letter topic "
+ + "without processing it. This prevents infinite
redelivery loops when short retry delays are configured. "
+ + "If not explicitly set and the subscription has
a dead-letter policy, "
+ + "the value is automatically fetched from the
subscription configuration at consumer startup. "
+ + "Set to 0 to disable enforcement.",
+ defaultValue = "0")
+ private int maxDeliveryAttempts;
+ private boolean maxDeliveryAttemptsExplicitlySet;
@UriParam(label = "producer,advanced",
description = "Should message ordering be enabled")
private boolean messageOrderingEnabled;
@@ -238,6 +249,19 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
implements EndpointSer
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
}
+ public int getMaxDeliveryAttempts() {
+ return maxDeliveryAttempts;
+ }
+
+ public void setMaxDeliveryAttempts(int maxDeliveryAttempts) {
+ this.maxDeliveryAttempts = maxDeliveryAttempts;
+ this.maxDeliveryAttemptsExplicitlySet = true;
+ }
+
+ public boolean isMaxDeliveryAttemptsExplicitlySet() {
+ return maxDeliveryAttemptsExplicitlySet;
+ }
+
public GooglePubsubSerializer getSerializer() {
return serializer;
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index 0967baa5a164..c2240b31c24b 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -75,6 +75,15 @@ public class CamelMessageReceiver implements MessageReceiver
{
exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT,
deliveryAttempt);
}
+ // Enforce maxDeliveryAttempts: nack without processing if limit
reached
+ int maxDeliveryAttempts = consumer.getResolvedMaxDeliveryAttempts();
+ if (maxDeliveryAttempts > 0 && deliveryAttempt != null &&
deliveryAttempt >= maxDeliveryAttempts) {
+ localLog.info("Message {} has reached max delivery attempts
({}/{}), nacking to route to dead-letter topic",
+ pubsubMessage.getMessageId(), deliveryAttempt,
maxDeliveryAttempts);
+ ackReplyConsumer.nack();
+ return;
+ }
+
GooglePubsubAcknowledge acknowledge = new
AcknowledgeAsync(ackReplyConsumer);
if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
exchange.getExchangeExtension().addOnCompletion(new
AcknowledgeCompletion(acknowledge));
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/MaxDeliveryAttemptsIT.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/MaxDeliveryAttemptsIT.java
new file mode 100644
index 000000000000..3bfcedb3718b
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/MaxDeliveryAttemptsIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import com.google.pubsub.v1.DeadLetterPolicy;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+import com.google.pubsub.v1.TopicName;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.GooglePubsubConsumer;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MaxDeliveryAttemptsIT extends PubsubTestSupport {
+
+ private static final String TOPIC_NAME = "camel.max-delivery-topic";
+ private static final String SUBSCRIPTION_NAME =
"camel.max-delivery-subscription";
+ private static final String DLQ_TOPIC_NAME =
"camel.max-delivery-dlq-topic";
+ private static final String DLQ_SUBSCRIPTION_NAME =
"camel.max-delivery-dlq-subscription";
+ private static final int MAX_DELIVERY_ATTEMPTS = 5;
+
+ // Consumer without explicit maxDeliveryAttempts - should auto-fetch from
subscription
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME)
+ private Endpoint autoFetchSubscription;
+
+ @EndpointInject("mock:auto-fetch-result")
+ private MockEndpoint autoFetchResult;
+
+ @Produce("google-pubsub:{{project.id}}:" + TOPIC_NAME)
+ private ProducerTemplate producer;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(autoFetchSubscription)
+ .routeId("auto-fetch-consumer")
+ .to(autoFetchResult);
+ }
+ };
+ }
+
+ @Override
+ public void createTopicSubscription() {
+ TopicName projectTopicName = TopicName.of(PROJECT_ID, TOPIC_NAME);
+ TopicName projectDlqTopicName = TopicName.of(PROJECT_ID,
DLQ_TOPIC_NAME);
+ ProjectSubscriptionName projectSubscriptionName =
ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_NAME);
+ ProjectSubscriptionName projectDlqSubscriptionName =
ProjectSubscriptionName.of(PROJECT_ID, DLQ_SUBSCRIPTION_NAME);
+
+ Topic topic =
Topic.newBuilder().setName(projectTopicName.toString()).build();
+ Topic dlqTopic =
Topic.newBuilder().setName(projectDlqTopicName.toString()).build();
+
+ Subscription subscription = Subscription.newBuilder()
+ .setName(projectSubscriptionName.toString())
+ .setTopic(topic.getName())
+ .setDeadLetterPolicy(DeadLetterPolicy.newBuilder()
+ .setDeadLetterTopic(dlqTopic.getName())
+ .setMaxDeliveryAttempts(MAX_DELIVERY_ATTEMPTS).build())
+ .build();
+ Subscription dlqSubscription = Subscription.newBuilder()
+ .setName(projectDlqSubscriptionName.toString())
+ .setTopic(dlqTopic.getName())
+ .build();
+
+ createTopicSubscriptionPair(dlqTopic, dlqSubscription);
+ createTopicSubscriptionPair(topic, subscription);
+ }
+
+ /**
+ * Tests that the consumer auto-fetches maxDeliveryAttempts from the
subscription's dead-letter policy at startup
+ * and that messages are processed normally when the delivery attempt is
below the threshold. On first delivery the
+ * delivery attempt is either not set or 1, which is below
maxDeliveryAttempts=5.
+ */
+ @Test
+ public void testAutoFetchMaxDeliveryAttemptsAndProcessBelowThreshold()
throws Exception {
+ // Verify auto-fetch resolved the correct value
+ GooglePubsubConsumer consumer
+ = (GooglePubsubConsumer)
context.getRoute("auto-fetch-consumer").getConsumer();
+ assertEquals(MAX_DELIVERY_ATTEMPTS,
consumer.getResolvedMaxDeliveryAttempts(),
+ "Consumer should auto-fetch maxDeliveryAttempts from
subscription dead-letter policy");
+
+ // Verify messages are processed normally when below the threshold
+ autoFetchResult.expectedMessageCount(1);
+
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody("test message");
+ producer.send(exchange);
+
+ autoFetchResult.assertIsSatisfied(5000);
+ }
+}
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointMaxDeliveryAttemptsTest.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointMaxDeliveryAttemptsTest.java
new file mode 100644
index 000000000000..78fae89145f5
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointMaxDeliveryAttemptsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.unit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PubsubEndpointMaxDeliveryAttemptsTest extends PubsubTestSupport {
+
+ private static final String SUBSCRIPTION_WITH_MAX =
"test-max-delivery?maxDeliveryAttempts=5";
+ private static final String SUBSCRIPTION_WITHOUT_MAX =
"test-no-max-delivery";
+
+ @EndpointInject("google-pubsub://{{project.id}}:" + SUBSCRIPTION_WITH_MAX)
+ private Endpoint endpointWithMax;
+
+ @EndpointInject("google-pubsub://{{project.id}}:" +
SUBSCRIPTION_WITHOUT_MAX)
+ private Endpoint endpointWithoutMax;
+
+ @Test
+ public void testMaxDeliveryAttemptsExplicitlySet() {
+ Endpoint endpoint
+ = context.hasEndpoint(String.format("google-pubsub:%s:%s",
PROJECT_ID, SUBSCRIPTION_WITH_MAX));
+ assertNotNull(endpoint);
+
+ assertTrue(endpoint instanceof GooglePubsubEndpoint);
+ GooglePubsubEndpoint pubsubEndpoint = (GooglePubsubEndpoint) endpoint;
+
+ assertEquals(5, pubsubEndpoint.getMaxDeliveryAttempts());
+ assertTrue(pubsubEndpoint.isMaxDeliveryAttemptsExplicitlySet());
+ }
+
+ @Test
+ public void testMaxDeliveryAttemptsDefaultValue() {
+ Endpoint endpoint
+ = context.hasEndpoint(String.format("google-pubsub:%s:%s",
PROJECT_ID, SUBSCRIPTION_WITHOUT_MAX));
+ assertNotNull(endpoint);
+
+ assertTrue(endpoint instanceof GooglePubsubEndpoint);
+ GooglePubsubEndpoint pubsubEndpoint = (GooglePubsubEndpoint) endpoint;
+
+ assertEquals(0, pubsubEndpoint.getMaxDeliveryAttempts());
+ assertFalse(pubsubEndpoint.isMaxDeliveryAttemptsExplicitlySet());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(endpointWithMax).to("direct:to1");
+ from(endpointWithoutMax).to("direct:to2");
+ }
+ };
+ }
+}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
index 3763dcbeb773..0a54df2f5c16 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
@@ -317,6 +317,52 @@ public interface GooglePubsubEndpointBuilderFactory {
doSetProperty("maxAckExtensionPeriod", maxAckExtensionPeriod);
return this;
}
+ /**
+ * The maximum number of delivery attempts for each message. When set
to
+ * a positive value, the consumer will automatically nack any message
+ * whose delivery attempt count is greater than or equal to this value,
+ * allowing Pub/Sub to route it to the dead-letter topic without
+ * processing it. This prevents infinite redelivery loops when short
+ * retry delays are configured. If not explicitly set and the
+ * subscription has a dead-letter policy, the value is automatically
+ * fetched from the subscription configuration at consumer startup. Set
+ * to 0 to disable enforcement.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 0
+ * Group: consumer (advanced)
+ *
+ * @param maxDeliveryAttempts the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointConsumerBuilder
maxDeliveryAttempts(int maxDeliveryAttempts) {
+ doSetProperty("maxDeliveryAttempts", maxDeliveryAttempts);
+ return this;
+ }
+ /**
+ * The maximum number of delivery attempts for each message. When set
to
+ * a positive value, the consumer will automatically nack any message
+ * whose delivery attempt count is greater than or equal to this value,
+ * allowing Pub/Sub to route it to the dead-letter topic without
+ * processing it. This prevents infinite redelivery loops when short
+ * retry delays are configured. If not explicitly set and the
+ * subscription has a dead-letter policy, the value is automatically
+ * fetched from the subscription configuration at consumer startup. Set
+ * to 0 to disable enforcement.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 0
+ * Group: consumer (advanced)
+ *
+ * @param maxDeliveryAttempts the value to set
+ * @return the dsl builder
+ */
+ default AdvancedGooglePubsubEndpointConsumerBuilder
maxDeliveryAttempts(String maxDeliveryAttempts) {
+ doSetProperty("maxDeliveryAttempts", maxDeliveryAttempts);
+ return this;
+ }
/**
* The max number of messages to receive from the server in a single
API
* call.