This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.14.x by this push:
     new c2a522f089c5 CAMEL-22947 - Camel-Google-pubsub: consumer does not 
properly trigger ACK/NACK callbacks and lacks deliveryAttempt visibility 
(#21213)
c2a522f089c5 is described below

commit c2a522f089c58e3df8fb24177b9fcd2cf25bbed3
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Feb 2 12:19:15 2026 +0100

    CAMEL-22947 - Camel-Google-pubsub: consumer does not properly trigger 
ACK/NACK callbacks and lacks deliveryAttempt visibility (#21213)
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../camel/catalog/components/google-pubsub.json    |   3 +-
 .../component/google/pubsub/google-pubsub.json     |   3 +-
 .../src/main/docs/google-pubsub-component.adoc     |  28 ++++
 .../google/pubsub/GooglePubsubConstants.java       |   6 +
 .../google/pubsub/GooglePubsubConsumer.java        |  24 ++-
 .../pubsub/consumer/CamelMessageReceiver.java      |  23 ++-
 .../pubsub/integration/AckNackCallbackIT.java      | 171 +++++++++++++++++++++
 .../dsl/GooglePubsubEndpointBuilderFactory.java    |  15 ++
 8 files changed, 269 insertions(+), 4 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
index 853e005f8298..b378b385d84f 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
@@ -42,7 +42,8 @@
     "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "com.google.protobuf.Timestamp", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
time at which the message was published", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
     "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The attributes of the 
message.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
     "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "If non-empty, identifies related 
messages for which publish order should be respected.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
-    "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Can be used to manually acknowledge or 
negative-acknowledge a message when ackMode=NONE.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePub [...]
+    "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Can be used to manually acknowledge or 
negative-acknowledge a message when ackMode=NONE.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePub [...]
+    "CamelGooglePubsubDeliveryAttempt": { "index": 6, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The delivery attempt counter received 
from PubSub. This is the approximate number of times the message has been 
delivered. This will be 1 for the first delivery. This feature requires a 
dead-letter policy to be configure [...]
   },
   "properties": {
     "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", 
"group": "common", "label": "common", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The Google Cloud PubSub 
Project Id" },
diff --git 
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
 
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
index 853e005f8298..b378b385d84f 100644
--- 
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
+++ 
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -42,7 +42,8 @@
     "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "com.google.protobuf.Timestamp", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
time at which the message was published", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
     "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The attributes of the 
message.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
     "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "If non-empty, identifies related 
messages for which publish order should be respected.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
-    "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Can be used to manually acknowledge or 
negative-acknowledge a message when ackMode=NONE.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePub [...]
+    "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Can be used to manually acknowledge or 
negative-acknowledge a message when ackMode=NONE.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePub [...]
+    "CamelGooglePubsubDeliveryAttempt": { "index": 6, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The delivery attempt counter received 
from PubSub. This is the approximate number of times the message has been 
delivered. This will be 1 for the first delivery. This feature requires a 
dead-letter policy to be configure [...]
   },
   "properties": {
     "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", 
"group": "common", "label": "common", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The Google Cloud PubSub 
Project Id" },
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
 
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 90dc45cd53a3..66011105caec 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++ 
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -99,6 +99,34 @@ To ack/nack the message the component uses Acknowledgement 
ID stored as header `
 If the header is removed or tampered with, the ack will fail and the message 
will be redelivered
 again after the ack deadline.
 
+=== Delivery Attempts and Dead Letter Queues
+
+When a subscription has a 
https://cloud.google.com/pubsub/docs/dead-letter-topics[dead-letter policy] 
configured,
+the component exposes the delivery attempt count via the header 
`GooglePubsubConstants.DELIVERY_ATTEMPT`.
+This header contains an `Integer` representing the approximate number of times 
the message has been delivered.
+The first delivery will have a value of `1`.
+
+NOTE: The delivery attempt header is only available when the subscription has 
a dead-letter policy configured.
+Without a dead-letter policy, the header will not be set.
+
+This allows routes to implement custom retry logic based on the delivery 
attempt count:
+
+[source,java]
+----
+from("google-pubsub:{{project.name}}:{{subscription.name}}")
+    .process(exchange -> {
+        Integer deliveryAttempt = 
exchange.getIn().getHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, 
Integer.class);
+        if (deliveryAttempt != null && deliveryAttempt > 3) {
+            // Custom handling for messages that have been redelivered 
multiple times
+            log.warn("Message has been delivered {} times", deliveryAttempt);
+        }
+        // Process the message
+    });
+----
+
+With a dead-letter policy, after the configured maximum delivery attempts are 
exceeded,
+the message will automatically be forwarded to the dead-letter topic by Google 
PubSub.
+
 === Message Body
 
 The consumer endpoint returns the content of the message as `byte[]`. Exactly 
as the underlying system sends it.
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index d5eb0937a23d..df0c5e350502 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -39,6 +39,12 @@ public final class GooglePubsubConstants {
                                                 "message when ackMode=NONE.",
               javaType = 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge")
     public static final String GOOGLE_PUBSUB_ACKNOWLEDGE = 
"CamelGooglePubsubAcknowledge";
+    @Metadata(label = "consumer",
+              description = "The delivery attempt counter received from 
PubSub. This is the approximate number of times " +
+                            "the message has been delivered. This will be 1 
for the first delivery. This feature requires " +
+                            "a dead-letter policy to be configured on the 
subscription.",
+              javaType = "Integer")
+    public static final String DELIVERY_ATTEMPT = 
"CamelGooglePubsubDeliveryAttempt";
     @Deprecated(since = "4.15")
     public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX = 
"goog";
 
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index a5973fb574b9..64a52920d29f 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -45,7 +45,9 @@ import 
org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
 import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
 import 
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -197,6 +199,12 @@ public class GooglePubsubConsumer extends DefaultConsumer {
                         // Deprecated:  replaced by headerFilterStrategy
                         
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, 
pubsubMessage.getAttributesMap());
 
+                        // Delivery attempt (requires dead-letter policy on 
subscription)
+                        int deliveryAttempt = message.getDeliveryAttempt();
+                        if (deliveryAttempt > 0) {
+                            
exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, 
deliveryAttempt);
+                        }
+
                         //existing subscriber can not be propagated, because 
it will be closed at the end of this block
                         //subscriber will be created at the moment of use
                         // (see  
https://issues.apache.org/jira/browse/CAMEL-18447)
@@ -221,7 +229,21 @@ public class GooglePubsubConsumer extends DefaultConsumer {
                         try {
                             processor.process(exchange);
                         } catch (Exception e) {
-                            getExceptionHandler().handleException(e);
+                            exchange.setException(e);
+                        }
+
+                        // Handle exception if one occurred
+                        if (exchange.getException() != null) {
+                            getExceptionHandler().handleException("Error 
processing exchange", exchange,
+                                    exchange.getException());
+                        }
+
+                        // Execute synchronization callbacks (ACK/NACK) based 
on exchange status
+                        // This is required because we are directly calling 
processor.process() outside of the normal
+                        // Camel routing engine, so we must manually trigger 
the OnCompletion callbacks
+                        if (endpoint.getAckMode() != 
GooglePubsubConstants.AckMode.NONE) {
+                            List<Synchronization> synchronizations = 
exchange.getExchangeExtension().handoverCompletions();
+                            UnitOfWorkHelper.doneSynchronizations(exchange, 
synchronizations);
                         }
                     }
                 } catch (CancellationException e) {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index ae412505d3d2..0967baa5a164 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.component.google.pubsub.consumer;
 
+import java.util.List;
+
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
 import com.google.common.base.Strings;
 import com.google.pubsub.v1.PubsubMessage;
 import org.apache.camel.Exchange;
@@ -26,6 +29,8 @@ import 
org.apache.camel.component.google.pubsub.GooglePubsubConstants;
 import org.apache.camel.component.google.pubsub.GooglePubsubConsumer;
 import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +69,12 @@ public class CamelMessageReceiver implements MessageReceiver 
{
         // Deprecated: replaced by headerFilterStrategy
         exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, 
pubsubMessage.getAttributesMap());
 
+        // Delivery attempt (requires dead-letter policy on subscription)
+        Integer deliveryAttempt = Subscriber.getDeliveryAttempt(pubsubMessage);
+        if (deliveryAttempt != null) {
+            exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, 
deliveryAttempt);
+        }
+
         GooglePubsubAcknowledge acknowledge = new 
AcknowledgeAsync(ackReplyConsumer);
         if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
             exchange.getExchangeExtension().addOnCompletion(new 
AcknowledgeCompletion(acknowledge));
@@ -86,8 +97,18 @@ public class CamelMessageReceiver implements MessageReceiver 
{
         } catch (Exception e) {
             exchange.setException(e);
         }
+
+        // Handle exception if one occurred
         if (exchange.getException() != null) {
-            
consumer.getExceptionHandler().handleException(exchange.getException());
+            consumer.getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+        }
+
+        // Execute synchronization callbacks (ACK/NACK) based on exchange 
status
+        // This is required because we are directly calling 
processor.process() outside of the normal
+        // Camel routing engine, so we must manually trigger the OnCompletion 
callbacks
+        if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+            List<Synchronization> synchronizations = 
exchange.getExchangeExtension().handoverCompletions();
+            UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations);
         }
     }
 
diff --git 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java
 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java
new file mode 100644
index 000000000000..ac51fa948abd
--- /dev/null
+++ 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test that validates the ACK/NACK callback behavior for 
synchronous pull.
+ *
+ * This test ensures that: 1. Messages are properly NACK'd on failure 
(exception thrown) and redelivered 2. Messages are
+ * properly ACK'd on successful processing
+ *
+ * The fix in CamelMessageReceiver and GooglePubsubConsumer ensures that 
OnCompletion synchronization callbacks are
+ * properly triggered after message processing, which was previously not 
happening.
+ */
+public class AckNackCallbackIT extends PubsubTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AckNackCallbackIT.class);
+
+    private static final String TOPIC_NAME = "ackNackCallbackTopic";
+    private static final String SUBSCRIPTION_NAME = "ackNackCallbackSub";
+
+    // Counter to track how many times a message has been processed
+    private static final AtomicInteger processCount = new AtomicInteger(0);
+
+    // Flag to control failure behavior - fail on first attempt only
+    private static volatile boolean shouldFail = false;
+
+    @EndpointInject("direct:in")
+    private Endpoint directIn;
+
+    @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
+    private Endpoint pubsubTopic;
+
+    @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?synchronousPull=true")
+    private Endpoint pubsubSubscription;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Produce("direct:in")
+    private ProducerTemplate producer;
+
+    @Override
+    public void createTopicSubscription() {
+        // Create topic/subscription pair with short ack deadline for faster 
test execution
+        createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 10);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                // Producer route
+                from(directIn).routeId("Producer").to(pubsubTopic);
+
+                // Consumer route - can be configured to fail on first attempt
+                from(pubsubSubscription)
+                        .routeId("Consumer")
+                        .autoStartup(true)
+                        .process(exchange -> {
+                            int count = processCount.incrementAndGet();
+                            LOG.info("Consumer processing attempt #{}", count);
+
+                            if (shouldFail && count == 1) {
+                                LOG.info("Consumer throwing exception on first 
attempt (will trigger NACK)");
+                                throw new RuntimeException("Simulated failure 
on first attempt");
+                            }
+                            LOG.info("Consumer processing successful on 
attempt #{}", count);
+                        })
+                        .to(result);
+            }
+        };
+    }
+
+    /**
+     * Test that validates both NACK/redelivery and ACK behavior.
+     *
+     * Stage 1: Send a message that will fail on first attempt - Message is 
received and processed (first attempt) -
+     * Exception thrown, NACK sent - Message redelivered by PubSub - Message 
received and processed successfully (second
+     * attempt) - ACK sent, message removed from subscription
+     *
+     * Stage 2: Send a message that succeeds on first attempt - Message is 
received and processed successfully - ACK
+     * sent immediately, no redelivery
+     */
+    @Test
+    public void testAckNackCallbackBehavior() throws Exception {
+        // === Stage 1: Test NACK and redelivery ===
+        LOG.info("Stage 1: Testing NACK on failure and redelivery");
+
+        processCount.set(0);
+        shouldFail = true;
+
+        Exchange failExchange = new DefaultExchange(context);
+        failExchange.getIn().setBody("Test message for NACK/redelivery: " + 
failExchange.getExchangeId());
+
+        // We expect the message to eventually succeed after redelivery
+        result.expectedMessageCount(1);
+
+        // Send the message
+        producer.send(failExchange);
+
+        // Wait for the message to be processed (may take up to ack deadline + 
processing time)
+        result.assertIsSatisfied(15000);
+
+        // Verify that the message was processed at least twice (first attempt 
failed, second succeeded)
+        int nackProcessCount = processCount.get();
+        LOG.info("Stage 1: Consumer processed message {} times", 
nackProcessCount);
+        assertTrue(nackProcessCount >= 2,
+                "Message should have been processed at least twice due to 
NACK/redelivery, but was processed "
+                                          + nackProcessCount + " times");
+
+        // === Stage 2: Test immediate ACK on success ===
+        LOG.info("Stage 2: Testing immediate ACK on success");
+
+        result.reset();
+        processCount.set(0);
+        shouldFail = false;
+
+        Exchange successExchange = new DefaultExchange(context);
+        successExchange.getIn().setBody("Test message for immediate ACK: " + 
successExchange.getExchangeId());
+
+        result.expectedMessageCount(1);
+
+        // Send the message
+        producer.send(successExchange);
+
+        // Wait for message to be processed
+        result.assertIsSatisfied(5000);
+
+        // Wait a bit more to ensure no redelivery happens
+        Thread.sleep(3000);
+
+        // Verify message was processed exactly once (properly ACK'd, no 
redelivery)
+        int ackProcessCount = processCount.get();
+        LOG.info("Stage 2: Consumer processed message {} times (expecting 1)", 
ackProcessCount);
+        assertTrue(ackProcessCount == 1,
+                "Message should have been processed exactly once (properly 
ACK'd), but was processed "
+                                         + ackProcessCount + " times");
+
+        LOG.info("All stages passed - ACK/NACK callback behavior is working 
correctly");
+    }
+}
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
index d6ee6fa5a903..3763dcbeb773 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
@@ -1081,6 +1081,21 @@ public interface GooglePubsubEndpointBuilderFactory {
         public String googlePubsubAcknowledge() {
             return "CamelGooglePubsubAcknowledge";
         }
+        /**
+         * The delivery attempt counter received from PubSub. This is the
+         * approximate number of times the message has been delivered. This 
will
+         * be 1 for the first delivery. This feature requires a dead-letter
+         * policy to be configured on the subscription.
+         * 
+         * The option is a: {@code Integer} type.
+         * 
+         * Group: consumer
+         * 
+         * @return the name of the header {@code GooglePubsubDeliveryAttempt}.
+         */
+        public String googlePubsubDeliveryAttempt() {
+            return "CamelGooglePubsubDeliveryAttempt";
+        }
     }
     static GooglePubsubEndpointBuilder endpointBuilder(String componentName, 
String path) {
         class GooglePubsubEndpointBuilderImpl extends AbstractEndpointBuilder 
implements GooglePubsubEndpointBuilder, AdvancedGooglePubsubEndpointBuilder {

Reply via email to