This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-22947 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 157a9798c19163b797b3aa4cf5081f1206bb5af4 Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Feb 2 11:42:37 2026 +0100 CAMEL-22947 - Camel-Google-pubsub: consumer does not properly trigger ACK/NACK callbacks and lacks deliveryAttempt visibility Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/catalog/components/google-pubsub.json | 3 +- .../component/google/pubsub/google-pubsub.json | 3 +- .../src/main/docs/google-pubsub-component.adoc | 28 ++++ .../google/pubsub/GooglePubsubConstants.java | 6 + .../google/pubsub/GooglePubsubConsumer.java | 24 ++- .../pubsub/consumer/CamelMessageReceiver.java | 23 ++- .../pubsub/integration/AckNackCallbackIT.java | 171 +++++++++++++++++++++ .../dsl/GooglePubsubEndpointBuilderFactory.java | 15 ++ 8 files changed, 269 insertions(+), 4 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json index bf38f55fcef1..76b522d4f8f0 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json @@ -42,7 +42,8 @@ "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.google.protobuf.Timestamp", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time at which the message was published", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" }, "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "If non-empty, identifies related messages for which publish order should be respected.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" }, - "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] + "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] + "CamelGooglePubsubDeliveryAttempt": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The delivery attempt counter received from PubSub. This is the approximate number of times the message has been delivered. This will be 1 for the first delivery. This feature requires a dead-letter policy to be configure [...] }, "properties": { "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Google Cloud PubSub Project Id" }, diff --git a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json index bf38f55fcef1..76b522d4f8f0 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json +++ b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json @@ -42,7 +42,8 @@ "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.google.protobuf.Timestamp", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time at which the message was published", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" }, "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": true, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "If non-empty, identifies related messages for which publish order should be respected.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" }, - "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] + "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] + "CamelGooglePubsubDeliveryAttempt": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The delivery attempt counter received from PubSub. This is the approximate number of times the message has been delivered. This will be 1 for the first delivery. This feature requires a dead-letter policy to be configure [...] }, "properties": { "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Google Cloud PubSub Project Id" }, diff --git a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc index 1223abf76150..68ab8eb44d91 100644 --- a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc +++ b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc @@ -88,6 +88,34 @@ To ack/nack the message the component uses Acknowledgement ID stored as header ` If the header is removed or tampered with, the ack will fail and the message will be redelivered again after the ack deadline. +=== Delivery Attempts and Dead Letter Queues + +When a subscription has a https://cloud.google.com/pubsub/docs/dead-letter-topics[dead-letter policy] configured, +the component exposes the delivery attempt count via the header `GooglePubsubConstants.DELIVERY_ATTEMPT`. +This header contains an `Integer` representing the approximate number of times the message has been delivered. +The first delivery will have a value of `1`. + +NOTE: The delivery attempt header is only available when the subscription has a dead-letter policy configured. +Without a dead-letter policy, the header will not be set. + +This allows routes to implement custom retry logic based on the delivery attempt count: + +[source,java] +---- +from("google-pubsub:{{project.name}}:{{subscription.name}}") + .process(exchange -> { + Integer deliveryAttempt = exchange.getIn().getHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, Integer.class); + if (deliveryAttempt != null && deliveryAttempt > 3) { + // Custom handling for messages that have been redelivered multiple times + log.warn("Message has been delivered {} times", deliveryAttempt); + } + // Process the message + }); +---- + +With a dead-letter policy, after the configured maximum delivery attempts are exceeded, +the message will automatically be forwarded to the dead-letter topic by Google PubSub. + === Message Body The consumer endpoint returns the content of the message as `byte[]`. Exactly as the underlying system sends it. diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java index d5eb0937a23d..df0c5e350502 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java @@ -39,6 +39,12 @@ public final class GooglePubsubConstants { "message when ackMode=NONE.", javaType = "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge") public static final String GOOGLE_PUBSUB_ACKNOWLEDGE = "CamelGooglePubsubAcknowledge"; + @Metadata(label = "consumer", + description = "The delivery attempt counter received from PubSub. This is the approximate number of times " + + "the message has been delivered. This will be 1 for the first delivery. This feature requires " + + "a dead-letter policy to be configured on the subscription.", + javaType = "Integer") + public static final String DELIVERY_ATTEMPT = "CamelGooglePubsubDeliveryAttempt"; @Deprecated(since = "4.15") public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX = "goog"; diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index 0848f88f6c98..1f13e556cf50 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -45,7 +45,9 @@ import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync; import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver; import org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.UnitOfWorkHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,6 +209,12 @@ public class GooglePubsubConsumer extends DefaultConsumer { // Deprecated: replaced by headerFilterStrategy exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); + // Delivery attempt (requires dead-letter policy on subscription) + int deliveryAttempt = message.getDeliveryAttempt(); + if (deliveryAttempt > 0) { + exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, deliveryAttempt); + } + //existing subscriber can not be propagated, because it will be closed at the end of this block //subscriber will be created at the moment of use // (see https://issues.apache.org/jira/browse/CAMEL-18447) @@ -231,7 +239,21 @@ public class GooglePubsubConsumer extends DefaultConsumer { try { processor.process(exchange); } catch (Exception e) { - getExceptionHandler().handleException(e); + exchange.setException(e); + } + + // Handle exception if one occurred + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); + } + + // Execute synchronization callbacks (ACK/NACK) based on exchange status + // This is required because we are directly calling processor.process() outside of the normal + // Camel routing engine, so we must manually trigger the OnCompletion callbacks + if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { + List<Synchronization> synchronizations = exchange.getExchangeExtension().handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations); } } } catch (CancellationException e) { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java index ae412505d3d2..0967baa5a164 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java @@ -16,8 +16,11 @@ */ package org.apache.camel.component.google.pubsub.consumer; +import java.util.List; + import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; import com.google.common.base.Strings; import com.google.pubsub.v1.PubsubMessage; import org.apache.camel.Exchange; @@ -26,6 +29,8 @@ import org.apache.camel.component.google.pubsub.GooglePubsubConstants; import org.apache.camel.component.google.pubsub.GooglePubsubConsumer; import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.UnitOfWorkHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +69,12 @@ public class CamelMessageReceiver implements MessageReceiver { // Deprecated: replaced by headerFilterStrategy exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); + // Delivery attempt (requires dead-letter policy on subscription) + Integer deliveryAttempt = Subscriber.getDeliveryAttempt(pubsubMessage); + if (deliveryAttempt != null) { + exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, deliveryAttempt); + } + GooglePubsubAcknowledge acknowledge = new AcknowledgeAsync(ackReplyConsumer); if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { exchange.getExchangeExtension().addOnCompletion(new AcknowledgeCompletion(acknowledge)); @@ -86,8 +97,18 @@ public class CamelMessageReceiver implements MessageReceiver { } catch (Exception e) { exchange.setException(e); } + + // Handle exception if one occurred if (exchange.getException() != null) { - consumer.getExceptionHandler().handleException(exchange.getException()); + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + + // Execute synchronization callbacks (ACK/NACK) based on exchange status + // This is required because we are directly calling processor.process() outside of the normal + // Camel routing engine, so we must manually trigger the OnCompletion callbacks + if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { + List<Synchronization> synchronizations = exchange.getExchangeExtension().handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations); } } diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java new file mode 100644 index 000000000000..ac51fa948abd --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckNackCallbackIT.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test that validates the ACK/NACK callback behavior for synchronous pull. + * + * This test ensures that: 1. Messages are properly NACK'd on failure (exception thrown) and redelivered 2. Messages are + * properly ACK'd on successful processing + * + * The fix in CamelMessageReceiver and GooglePubsubConsumer ensures that OnCompletion synchronization callbacks are + * properly triggered after message processing, which was previously not happening. + */ +public class AckNackCallbackIT extends PubsubTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AckNackCallbackIT.class); + + private static final String TOPIC_NAME = "ackNackCallbackTopic"; + private static final String SUBSCRIPTION_NAME = "ackNackCallbackSub"; + + // Counter to track how many times a message has been processed + private static final AtomicInteger processCount = new AtomicInteger(0); + + // Flag to control failure behavior - fail on first attempt only + private static volatile boolean shouldFail = false; + + @EndpointInject("direct:in") + private Endpoint directIn; + + @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME) + private Endpoint pubsubTopic; + + @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true") + private Endpoint pubsubSubscription; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @Produce("direct:in") + private ProducerTemplate producer; + + @Override + public void createTopicSubscription() { + // Create topic/subscription pair with short ack deadline for faster test execution + createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 10); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + // Producer route + from(directIn).routeId("Producer").to(pubsubTopic); + + // Consumer route - can be configured to fail on first attempt + from(pubsubSubscription) + .routeId("Consumer") + .autoStartup(true) + .process(exchange -> { + int count = processCount.incrementAndGet(); + LOG.info("Consumer processing attempt #{}", count); + + if (shouldFail && count == 1) { + LOG.info("Consumer throwing exception on first attempt (will trigger NACK)"); + throw new RuntimeException("Simulated failure on first attempt"); + } + LOG.info("Consumer processing successful on attempt #{}", count); + }) + .to(result); + } + }; + } + + /** + * Test that validates both NACK/redelivery and ACK behavior. + * + * Stage 1: Send a message that will fail on first attempt - Message is received and processed (first attempt) - + * Exception thrown, NACK sent - Message redelivered by PubSub - Message received and processed successfully (second + * attempt) - ACK sent, message removed from subscription + * + * Stage 2: Send a message that succeeds on first attempt - Message is received and processed successfully - ACK + * sent immediately, no redelivery + */ + @Test + public void testAckNackCallbackBehavior() throws Exception { + // === Stage 1: Test NACK and redelivery === + LOG.info("Stage 1: Testing NACK on failure and redelivery"); + + processCount.set(0); + shouldFail = true; + + Exchange failExchange = new DefaultExchange(context); + failExchange.getIn().setBody("Test message for NACK/redelivery: " + failExchange.getExchangeId()); + + // We expect the message to eventually succeed after redelivery + result.expectedMessageCount(1); + + // Send the message + producer.send(failExchange); + + // Wait for the message to be processed (may take up to ack deadline + processing time) + result.assertIsSatisfied(15000); + + // Verify that the message was processed at least twice (first attempt failed, second succeeded) + int nackProcessCount = processCount.get(); + LOG.info("Stage 1: Consumer processed message {} times", nackProcessCount); + assertTrue(nackProcessCount >= 2, + "Message should have been processed at least twice due to NACK/redelivery, but was processed " + + nackProcessCount + " times"); + + // === Stage 2: Test immediate ACK on success === + LOG.info("Stage 2: Testing immediate ACK on success"); + + result.reset(); + processCount.set(0); + shouldFail = false; + + Exchange successExchange = new DefaultExchange(context); + successExchange.getIn().setBody("Test message for immediate ACK: " + successExchange.getExchangeId()); + + result.expectedMessageCount(1); + + // Send the message + producer.send(successExchange); + + // Wait for message to be processed + result.assertIsSatisfied(5000); + + // Wait a bit more to ensure no redelivery happens + Thread.sleep(3000); + + // Verify message was processed exactly once (properly ACK'd, no redelivery) + int ackProcessCount = processCount.get(); + LOG.info("Stage 2: Consumer processed message {} times (expecting 1)", ackProcessCount); + assertTrue(ackProcessCount == 1, + "Message should have been processed exactly once (properly ACK'd), but was processed " + + ackProcessCount + " times"); + + LOG.info("All stages passed - ACK/NACK callback behavior is working correctly"); + } +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java index d6ee6fa5a903..3763dcbeb773 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java @@ -1081,6 +1081,21 @@ public interface GooglePubsubEndpointBuilderFactory { public String googlePubsubAcknowledge() { return "CamelGooglePubsubAcknowledge"; } + /** + * The delivery attempt counter received from PubSub. This is the + * approximate number of times the message has been delivered. This will + * be 1 for the first delivery. This feature requires a dead-letter + * policy to be configured on the subscription. + * + * The option is a: {@code Integer} type. + * + * Group: consumer + * + * @return the name of the header {@code GooglePubsubDeliveryAttempt}. + */ + public String googlePubsubDeliveryAttempt() { + return "CamelGooglePubsubDeliveryAttempt"; + } } static GooglePubsubEndpointBuilder endpointBuilder(String componentName, String path) { class GooglePubsubEndpointBuilderImpl extends AbstractEndpointBuilder implements GooglePubsubEndpointBuilder, AdvancedGooglePubsubEndpointBuilder {
