This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b37b4f2c4 [improve][client] PIP-468: Rename V5 ackTimeout to 
processingTimeout(ProcessingTimeoutPolicy) (#25649)
e0b37b4f2c4 is described below

commit e0b37b4f2c4f638d202102a9a86c6f6b6ca5d4df
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 11:00:07 2026 -0700

    [improve][client] PIP-468: Rename V5 ackTimeout to 
processingTimeout(ProcessingTimeoutPolicy) (#25649)
---
 ...meoutTest.java => V5ProcessingTimeoutTest.java} | 26 ++++----
 .../pulsar/client/api/v5/QueueConsumerBuilder.java | 22 +++----
 .../api/v5/config/ProcessingTimeoutPolicy.java     | 72 ++++++++++++++++++++++
 .../org/apache/pulsar/client/api/v5/Examples.java  |  3 +-
 .../client/impl/v5/QueueConsumerBuilderV5.java     | 27 ++++----
 5 files changed, 110 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProcessingTimeoutTest.java
similarity index 63%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProcessingTimeoutTest.java
index 3d5513f6270..ef74bb05a8f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProcessingTimeoutTest.java
@@ -22,31 +22,32 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import java.time.Duration;
 import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.testng.annotations.Test;
 
 /**
- * Coverage for {@link QueueConsumerBuilder#ackTimeout(Duration)}: when a 
consumer
- * receives a message but doesn't ack within the configured timeout, the broker
- * redelivers it.
+ * Coverage for {@link 
QueueConsumerBuilder#processingTimeout(ProcessingTimeoutPolicy)}:
+ * when a consumer receives a message but doesn't ack within the configured 
processing
+ * timeout, the client gives up and asks the broker to redeliver it.
  */
-public class V5AckTimeoutTest extends V5ClientBaseTest {
+public class V5ProcessingTimeoutTest extends V5ClientBaseTest {
 
     @Test
-    public void testUnackedMessageIsRedeliveredAfterAckTimeout() throws 
Exception {
+    public void testUnackedMessageIsRedeliveredAfterProcessingTimeout() throws 
Exception {
         String topic = newScalableTopic(1);
 
         @Cleanup
         Producer<String> producer = v5Client.newProducer(Schema.string())
                 .topic(topic)
                 .create();
-        // Default ack timeout is disabled (or 60s); use a tight one so the 
test stays fast.
-        // Pulsar enforces a minimum of 1s on ackTimeout.
+        // Default processing timeout is disabled (or 60s); use a tight one so 
the test
+        // stays fast. Pulsar enforces a minimum of 1s on the processing 
timeout.
         @Cleanup
         QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
                 .topic(topic)
-                .subscriptionName("ack-timeout-sub")
-                .ackTimeout(Duration.ofSeconds(1))
+                .subscriptionName("processing-timeout-sub")
+                
.processingTimeout(ProcessingTimeoutPolicy.of(Duration.ofSeconds(1)))
                 .subscribe();
 
         producer.newMessage().value("once").send();
@@ -56,10 +57,11 @@ public class V5AckTimeoutTest extends V5ClientBaseTest {
         assertNotNull(first);
         assertEquals(first.value(), "once");
 
-        // The broker's ack-timeout sweeper runs at ackTimeout/2 cadence, so 
wait
-        // generously past 1s for the redelivery to fire.
+        // The client's ack-timeout sweeper runs at processingTimeout/2 
cadence; wait
+        // generously past 1s for the redelivery request to be sent and the 
redelivery
+        // to land.
         Message<String> redelivered = consumer.receive(Duration.ofSeconds(10));
-        assertNotNull(redelivered, "ack-timeout did not trigger redelivery");
+        assertNotNull(redelivered, "processing-timeout did not trigger 
redelivery");
         assertEquals(redelivered.value(), "once");
         consumer.acknowledge(redelivered.id());
     }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index a3f728e127e..a6249efaca3 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 
 /**
@@ -140,13 +141,16 @@ public interface QueueConsumerBuilder<T> {
     // --- Acknowledgment ---
 
     /**
-     * If a message is not acknowledged within this duration, it is 
automatically redelivered.
-     * Set to zero to disable.
+     * Optional safety net for slow / stalled consumers: see
+     * {@link ProcessingTimeoutPolicy} for the full semantics. The policy 
bundles the
+     * timeout itself with an optional redelivery backoff. Disabled by default.
      *
-     * @param timeout the ack timeout duration
+     * @param policy timeout + redelivery-backoff configuration
      * @return this builder instance for chaining
+     * @see ProcessingTimeoutPolicy#of(Duration)
+     * @see ProcessingTimeoutPolicy#of(Duration, BackoffPolicy)
      */
-    QueueConsumerBuilder<T> ackTimeout(Duration timeout);
+    QueueConsumerBuilder<T> processingTimeout(ProcessingTimeoutPolicy policy);
 
     /**
      * How frequently acknowledgments are flushed to the broker.
@@ -176,16 +180,6 @@ public interface QueueConsumerBuilder<T> {
      */
     QueueConsumerBuilder<T> negativeAckRedeliveryBackoff(BackoffPolicy 
backoff);
 
-    /**
-     * Backoff strategy for redelivery after ack timeout.
-     *
-     * @param backoff the backoff policy to use for ack timeout redelivery
-     * @return this builder instance for chaining
-     * @see BackoffPolicy#fixed(Duration)
-     * @see BackoffPolicy#exponential(Duration, Duration)
-     */
-    QueueConsumerBuilder<T> ackTimeoutRedeliveryBackoff(BackoffPolicy backoff);
-
     // --- Dead letter queue ---
 
     /**
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
new file mode 100644
index 00000000000..0cda6c3ee04
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import java.time.Duration;
+
+/**
+ * Optional safety net for slow / stalled queue consumers: if the application 
doesn't
+ * process and acknowledge a delivered message within {@code timeout}, the 
<em>client</em>
+ * gives up on that delivery and asks the broker to redeliver it (to this 
consumer or,
+ * on a Shared subscription, to another consumer in the group). The 
bookkeeping is
+ * client-side — the client tracks pending acks and, on timeout, sends a
+ * {@code redeliverUnacknowledgedMessages} request to the broker.
+ *
+ * <p>{@code redeliveryBackoff} controls the cadence of those redeliveries — 
{@code null}
+ * means "redeliver immediately on the next sweep", which is the historical 
default.
+ *
+ * <p>Disabled by default. Pass to
+ * {@link 
org.apache.pulsar.client.api.v5.QueueConsumerBuilder#processingTimeout(ProcessingTimeoutPolicy)}
+ * when the application's processing time is bounded and you want stalled 
deliveries to
+ * be reattempted automatically.
+ *
+ * @param timeout            how long the client waits for the application to 
ack a
+ *                           delivery before requesting redelivery. {@link 
Duration#ZERO}
+ *                           disables.
+ * @param redeliveryBackoff  optional backoff applied between redeliveries. 
May be
+ *                           {@code null} for the default (no extra delay).
+ */
+public record ProcessingTimeoutPolicy(
+        Duration timeout,
+        BackoffPolicy redeliveryBackoff
+) {
+    public ProcessingTimeoutPolicy {
+        if (timeout == null) {
+            throw new IllegalArgumentException("timeout must not be null");
+        }
+        if (timeout.isNegative()) {
+            throw new IllegalArgumentException("timeout must not be negative");
+        }
+    }
+
+    /**
+     * Create a policy with just a timeout — the broker uses its default 
redelivery
+     * cadence (no extra backoff between retries).
+     */
+    public static ProcessingTimeoutPolicy of(Duration timeout) {
+        return new ProcessingTimeoutPolicy(timeout, null);
+    }
+
+    /**
+     * Create a policy with a timeout and an explicit redelivery backoff.
+     */
+    public static ProcessingTimeoutPolicy of(Duration timeout, BackoffPolicy 
redeliveryBackoff) {
+        return new ProcessingTimeoutPolicy(timeout, redeliveryBackoff);
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index 22de52c7cb2..2cc17d67b68 100644
--- 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.v5.config.CompressionType;
 import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.config.TlsPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
@@ -276,7 +277,7 @@ public class Examples {
         try (var consumer = client.newQueueConsumer(Schema.json(Order.class))
                 .topic("orders")
                 .subscriptionName("order-processor")
-                .ackTimeout(Duration.ofSeconds(30))
+                
.processingTimeout(ProcessingTimeoutPolicy.of(Duration.ofSeconds(30)))
                 .negativeAckRedeliveryBackoff(
                         BackoffPolicy.exponential(Duration.ofSeconds(1), 
Duration.ofMinutes(5)))
                 .deadLetterPolicy(DeadLetterPolicy.of(5))
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index 3592faf312e..a762964c7d7 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -149,8 +150,19 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
     }
 
     @Override
-    public QueueConsumerBuilderV5<T> ackTimeout(Duration timeout) {
-        conf.setAckTimeoutMillis(timeout.toMillis());
+    public QueueConsumerBuilderV5<T> processingTimeout(ProcessingTimeoutPolicy 
policy) {
+        conf.setAckTimeoutMillis(policy.timeout().toMillis());
+        BackoffPolicy backoff = policy.redeliveryBackoff();
+        if (backoff != null) {
+            conf.setAckTimeoutRedeliveryBackoff(
+                    
org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff.builder()
+                            .minDelayMs(backoff.initialInterval().toMillis())
+                            .maxDelayMs(backoff.maxInterval().toMillis())
+                            .multiplier(backoff.multiplier())
+                            .build());
+        } else {
+            conf.setAckTimeoutRedeliveryBackoff(null);
+        }
         return this;
     }
 
@@ -177,17 +189,6 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
         return this;
     }
 
-    @Override
-    public QueueConsumerBuilderV5<T> ackTimeoutRedeliveryBackoff(BackoffPolicy 
backoff) {
-        conf.setAckTimeoutRedeliveryBackoff(
-                
org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff.builder()
-                        .minDelayMs(backoff.initialInterval().toMillis())
-                        .maxDelayMs(backoff.maxInterval().toMillis())
-                        .multiplier(backoff.multiplier())
-                        .build());
-        return this;
-    }
-
     @Override
     public QueueConsumerBuilderV5<T> deadLetterPolicy(DeadLetterPolicy policy) 
{
         var builder = org.apache.pulsar.client.api.DeadLetterPolicy.builder()

Reply via email to