This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f7dd31cd6c3a CAMEL-22947 - Camel-Google-pubsub: consumer does not
properly trigger ACK/NACK callbacks and lacks deliveryAttempt visibility
(#21212)
f7dd31cd6c3a is described below
commit f7dd31cd6c3a80e414a7a9b7a21df3f73e2f88ff
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Feb 2 12:18:25 2026 +0100
CAMEL-22947 - Camel-Google-pubsub: consumer does not properly trigger
ACK/NACK callbacks and lacks deliveryAttempt visibility (#21212)
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../camel/catalog/components/google-pubsub.json | 3 +-
.../component/google/pubsub/google-pubsub.json | 3 +-
.../src/main/docs/google-pubsub-component.adoc | 28 ++++
.../google/pubsub/GooglePubsubConstants.java | 6 +
.../google/pubsub/GooglePubsubConsumer.java | 24 ++-
.../pubsub/consumer/CamelMessageReceiver.java | 23 ++-
.../pubsub/integration/AckNackCallbackIT.java | 171 +++++++++++++++++++++
.../dsl/GooglePubsubEndpointBuilderFactory.java | 15 ++
8 files changed, 269 insertions(+), 4 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
index bf38f55fcef1..76b522d4f8f0 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json
@@ -42,7 +42,8 @@
"CamelGooglePubsubPublishTime": { "index": 2, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "com.google.protobuf.Timestamp", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
time at which the message was published", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
"CamelGooglePubsubAttributes": { "index": 3, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The attributes of the
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
"CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "If non-empty, identifies related
messages for which publish order should be respected.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
- "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Can be used to manually acknowledge or
negative-acknowledge a message when ackMode=NONE.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePub [...]
+ "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Can be used to manually acknowledge or
negative-acknowledge a message when ackMode=NONE.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePub [...]
+ "CamelGooglePubsubDeliveryAttempt": { "index": 6, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The delivery attempt counter received
from PubSub. This is the approximate number of times the message has been
delivered. This will be 1 for the first delivery. This feature requires a
dead-letter policy to be configure [...]
},
"properties": {
"projectId": { "index": 0, "kind": "path", "displayName": "Project Id",
"group": "common", "label": "common", "required": true, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The Google Cloud PubSub
Project Id" },
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
index bf38f55fcef1..76b522d4f8f0 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
+++
b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -42,7 +42,8 @@
"CamelGooglePubsubPublishTime": { "index": 2, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "com.google.protobuf.Timestamp", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description": "The
time at which the message was published", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
"CamelGooglePubsubAttributes": { "index": 3, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The attributes of the
message.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
"CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "If non-empty, identifies related
messages for which publish order should be respected.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
- "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Can be used to manually acknowledge or
negative-acknowledge a message when ackMode=NONE.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePub [...]
+ "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType":
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Can be used to manually acknowledge or
negative-acknowledge a message when ackMode=NONE.", "constantName":
"org.apache.camel.component.google.pubsub.GooglePub [...]
+ "CamelGooglePubsubDeliveryAttempt": { "index": 6, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The delivery attempt counter received
from PubSub. This is the approximate number of times the message has been
delivered. This will be 1 for the first delivery. This feature requires a
dead-letter policy to be configure [...]
},
"properties": {
"projectId": { "index": 0, "kind": "path", "displayName": "Project Id",
"group": "common", "label": "common", "required": true, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "description": "The Google Cloud PubSub
Project Id" },
diff --git
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 1223abf76150..68ab8eb44d91 100644
---
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -88,6 +88,34 @@ To ack/nack the message the component uses Acknowledgement
ID stored as header `
If the header is removed or tampered with, the ack will fail and the message
will be redelivered
again after the ack deadline.
+=== Delivery Attempts and Dead Letter Queues
+
+When a subscription has a
https://cloud.google.com/pubsub/docs/dead-letter-topics[dead-letter policy]
configured,
+the component exposes the delivery attempt count via the header
`GooglePubsubConstants.DELIVERY_ATTEMPT`.
+This header contains an `Integer` representing the approximate number of times
the message has been delivered.
+The first delivery will have a value of `1`.
+
+NOTE: The delivery attempt header is only available when the subscription has
a dead-letter policy configured.
+Without a dead-letter policy, the header will not be set.
+
+This allows routes to implement custom retry logic based on the delivery
attempt count:
+
+[source,java]
+----
+from("google-pubsub:{{project.name}}:{{subscription.name}}")
+ .process(exchange -> {
+ Integer deliveryAttempt =
exchange.getIn().getHeader(GooglePubsubConstants.DELIVERY_ATTEMPT,
Integer.class);
+ if (deliveryAttempt != null && deliveryAttempt > 3) {
+ // Custom handling for messages that have been redelivered
multiple times
+ log.warn("Message has been delivered {} times", deliveryAttempt);
+ }
+ // Process the message
+ });
+----
+
+With a dead-letter policy, after the configured maximum delivery attempts are
exceeded,
+the message will automatically be forwarded to the dead-letter topic by Google
PubSub.
+
=== Message Body
The consumer endpoint returns the content of the message as `byte[]`. Exactly
as the underlying system sends it.
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index d5eb0937a23d..df0c5e350502 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -39,6 +39,12 @@ public final class GooglePubsubConstants {
"message when ackMode=NONE.",
javaType =
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge")
public static final String GOOGLE_PUBSUB_ACKNOWLEDGE =
"CamelGooglePubsubAcknowledge";
+ @Metadata(label = "consumer",
+ description = "The delivery attempt counter received from
PubSub. This is the approximate number of times " +
+ "the message has been delivered. This will be 1
for the first delivery. This feature requires " +
+ "a dead-letter policy to be configured on the
subscription.",
+ javaType = "Integer")
+ public static final String DELIVERY_ATTEMPT =
"CamelGooglePubsubDeliveryAttempt";
@Deprecated(since = "4.15")
public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX =
"goog";
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 0848f88f6c98..1f13e556cf50 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -45,7 +45,9 @@ import
org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
import
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.UnitOfWorkHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -207,6 +209,12 @@ public class GooglePubsubConsumer extends DefaultConsumer {
// Deprecated: replaced by headerFilterStrategy
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
pubsubMessage.getAttributesMap());
+ // Delivery attempt (requires dead-letter policy on
subscription)
+ int deliveryAttempt = message.getDeliveryAttempt();
+ if (deliveryAttempt > 0) {
+
exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT,
deliveryAttempt);
+ }
+
//existing subscriber can not be propagated, because
it will be closed at the end of this block
//subscriber will be created at the moment of use
// (see
https://issues.apache.org/jira/browse/CAMEL-18447)
@@ -231,7 +239,21 @@ public class GooglePubsubConsumer extends DefaultConsumer {
try {
processor.process(exchange);
} catch (Exception e) {
- getExceptionHandler().handleException(e);
+ exchange.setException(e);
+ }
+
+ // Handle exception if one occurred
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error
processing exchange", exchange,
+ exchange.getException());
+ }
+
+ // Execute synchronization callbacks (ACK/NACK) based
on exchange status
+ // This is required because we are directly calling
processor.process() outside of the normal
+ // Camel routing engine, so we must manually trigger
the OnCompletion callbacks
+ if (endpoint.getAckMode() !=
GooglePubsubConstants.AckMode.NONE) {
+ List<Synchronization> synchronizations =
exchange.getExchangeExtension().handoverCompletions();
+ UnitOfWorkHelper.doneSynchronizations(exchange,
synchronizations);
}
}
} catch (CancellationException e) {
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index ae412505d3d2..0967baa5a164 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -16,8 +16,11 @@
*/
package org.apache.camel.component.google.pubsub.consumer;
+import java.util.List;
+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.base.Strings;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.camel.Exchange;
@@ -26,6 +29,8 @@ import
org.apache.camel.component.google.pubsub.GooglePubsubConstants;
import org.apache.camel.component.google.pubsub.GooglePubsubConsumer;
import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.UnitOfWorkHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +69,12 @@ public class CamelMessageReceiver implements MessageReceiver
{
// Deprecated: replaced by headerFilterStrategy
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
pubsubMessage.getAttributesMap());
+ // Delivery attempt (requires dead-letter policy on subscription)
+ Integer deliveryAttempt = Subscriber.getDeliveryAttempt(pubsubMessage);
+ if (deliveryAttempt != null) {
+ exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT,
deliveryAttempt);
+ }
+
GooglePubsubAcknowledge acknowledge = new
AcknowledgeAsync(ackReplyConsumer);
if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
exchange.getExchangeExtension().addOnCompletion(new
AcknowledgeCompletion(acknowledge));
@@ -86,8 +97,18 @@ public class CamelMessageReceiver implements MessageReceiver
{
} catch (Exception e) {
exchange.setException(e);
}
+
+ // Handle exception if one occurred
if (exchange.getException() != null) {
-
consumer.getExceptionHandler().handleException(exchange.getException());
+ consumer.getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
+ }
+
+ // Execute synchronization callbacks (ACK/NACK) based on exchange
status
+ // This is required because we are directly calling
processor.process() outside of the normal
+ // Camel routing engine, so we must manually trigger the OnCompletion
callbacks
+ if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+ List<Synchronization> synchronizations =
exchange.getExchangeExtension().handoverCompletions();
+ UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations);
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java
new file mode 100644
index 000000000000..ac51fa948abd
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test that validates the ACK/NACK callback behavior for
synchronous pull.
+ *
+ * This test ensures that: 1. Messages are properly NACK'd on failure
(exception thrown) and redelivered 2. Messages are
+ * properly ACK'd on successful processing
+ *
+ * The fix in CamelMessageReceiver and GooglePubsubConsumer ensures that
OnCompletion synchronization callbacks are
+ * properly triggered after message processing, which was previously not
happening.
+ */
+public class AckNackCallbackIT extends PubsubTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(AckNackCallbackIT.class);
+
+ private static final String TOPIC_NAME = "ackNackCallbackTopic";
+ private static final String SUBSCRIPTION_NAME = "ackNackCallbackSub";
+
+ // Counter to track how many times a message has been processed
+ private static final AtomicInteger processCount = new AtomicInteger(0);
+
+ // Flag to control failure behavior - fail on first attempt only
+ private static volatile boolean shouldFail = false;
+
+ @EndpointInject("direct:in")
+ private Endpoint directIn;
+
+ @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
+ private Endpoint pubsubTopic;
+
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true")
+ private Endpoint pubsubSubscription;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Produce("direct:in")
+ private ProducerTemplate producer;
+
+ @Override
+ public void createTopicSubscription() {
+ // Create topic/subscription pair with short ack deadline for faster
test execution
+ createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 10);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // Producer route
+ from(directIn).routeId("Producer").to(pubsubTopic);
+
+ // Consumer route - can be configured to fail on first attempt
+ from(pubsubSubscription)
+ .routeId("Consumer")
+ .autoStartup(true)
+ .process(exchange -> {
+ int count = processCount.incrementAndGet();
+ LOG.info("Consumer processing attempt #{}", count);
+
+ if (shouldFail && count == 1) {
+ LOG.info("Consumer throwing exception on first
attempt (will trigger NACK)");
+ throw new RuntimeException("Simulated failure
on first attempt");
+ }
+ LOG.info("Consumer processing successful on
attempt #{}", count);
+ })
+ .to(result);
+ }
+ };
+ }
+
+ /**
+ * Test that validates both NACK/redelivery and ACK behavior.
+ *
+ * Stage 1: Send a message that will fail on first attempt - Message is
received and processed (first attempt) -
+ * Exception thrown, NACK sent - Message redelivered by PubSub - Message
received and processed successfully (second
+ * attempt) - ACK sent, message removed from subscription
+ *
+ * Stage 2: Send a message that succeeds on first attempt - Message is
received and processed successfully - ACK
+ * sent immediately, no redelivery
+ */
+ @Test
+ public void testAckNackCallbackBehavior() throws Exception {
+ // === Stage 1: Test NACK and redelivery ===
+ LOG.info("Stage 1: Testing NACK on failure and redelivery");
+
+ processCount.set(0);
+ shouldFail = true;
+
+ Exchange failExchange = new DefaultExchange(context);
+ failExchange.getIn().setBody("Test message for NACK/redelivery: " +
failExchange.getExchangeId());
+
+ // We expect the message to eventually succeed after redelivery
+ result.expectedMessageCount(1);
+
+ // Send the message
+ producer.send(failExchange);
+
+ // Wait for the message to be processed (may take up to ack deadline +
processing time)
+ result.assertIsSatisfied(15000);
+
+ // Verify that the message was processed at least twice (first attempt
failed, second succeeded)
+ int nackProcessCount = processCount.get();
+ LOG.info("Stage 1: Consumer processed message {} times",
nackProcessCount);
+ assertTrue(nackProcessCount >= 2,
+ "Message should have been processed at least twice due to
NACK/redelivery, but was processed "
+ + nackProcessCount + " times");
+
+ // === Stage 2: Test immediate ACK on success ===
+ LOG.info("Stage 2: Testing immediate ACK on success");
+
+ result.reset();
+ processCount.set(0);
+ shouldFail = false;
+
+ Exchange successExchange = new DefaultExchange(context);
+ successExchange.getIn().setBody("Test message for immediate ACK: " +
successExchange.getExchangeId());
+
+ result.expectedMessageCount(1);
+
+ // Send the message
+ producer.send(successExchange);
+
+ // Wait for message to be processed
+ result.assertIsSatisfied(5000);
+
+ // Wait a bit more to ensure no redelivery happens
+ Thread.sleep(3000);
+
+ // Verify message was processed exactly once (properly ACK'd, no
redelivery)
+ int ackProcessCount = processCount.get();
+ LOG.info("Stage 2: Consumer processed message {} times (expecting 1)",
ackProcessCount);
+ assertTrue(ackProcessCount == 1,
+ "Message should have been processed exactly once (properly
ACK'd), but was processed "
+ + ackProcessCount + " times");
+
+ LOG.info("All stages passed - ACK/NACK callback behavior is working
correctly");
+ }
+}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
index d6ee6fa5a903..3763dcbeb773 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java
@@ -1081,6 +1081,21 @@ public interface GooglePubsubEndpointBuilderFactory {
public String googlePubsubAcknowledge() {
return "CamelGooglePubsubAcknowledge";
}
+ /**
+ * The delivery attempt counter received from PubSub. This is the
+ * approximate number of times the message has been delivered. This
will
+ * be 1 for the first delivery. This feature requires a dead-letter
+ * policy to be configured on the subscription.
+ *
+ * The option is a: {@code Integer} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code GooglePubsubDeliveryAttempt}.
+ */
+ public String googlePubsubDeliveryAttempt() {
+ return "CamelGooglePubsubDeliveryAttempt";
+ }
}
static GooglePubsubEndpointBuilder endpointBuilder(String componentName,
String path) {
class GooglePubsubEndpointBuilderImpl extends AbstractEndpointBuilder
implements GooglePubsubEndpointBuilder, AdvancedGooglePubsubEndpointBuilder {