This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e0b37b4f2c4 [improve][client] PIP-468: Rename V5 ackTimeout to
processingTimeout(ProcessingTimeoutPolicy) (#25649)
e0b37b4f2c4 is described below
commit e0b37b4f2c4f638d202102a9a86c6f6b6ca5d4df
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 11:00:07 2026 -0700
[improve][client] PIP-468: Rename V5 ackTimeout to
processingTimeout(ProcessingTimeoutPolicy) (#25649)
---
...meoutTest.java => V5ProcessingTimeoutTest.java} | 26 ++++----
.../pulsar/client/api/v5/QueueConsumerBuilder.java | 22 +++----
.../api/v5/config/ProcessingTimeoutPolicy.java | 72 ++++++++++++++++++++++
.../org/apache/pulsar/client/api/v5/Examples.java | 3 +-
.../client/impl/v5/QueueConsumerBuilderV5.java | 27 ++++----
5 files changed, 110 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProcessingTimeoutTest.java
similarity index 63%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProcessingTimeoutTest.java
index 3d5513f6270..ef74bb05a8f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProcessingTimeoutTest.java
@@ -22,31 +22,32 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.time.Duration;
import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.testng.annotations.Test;
/**
- * Coverage for {@link QueueConsumerBuilder#ackTimeout(Duration)}: when a
consumer
- * receives a message but doesn't ack within the configured timeout, the broker
- * redelivers it.
+ * Coverage for {@link
QueueConsumerBuilder#processingTimeout(ProcessingTimeoutPolicy)}:
+ * when a consumer receives a message but doesn't ack within the configured
processing
+ * timeout, the client gives up and asks the broker to redeliver it.
*/
-public class V5AckTimeoutTest extends V5ClientBaseTest {
+public class V5ProcessingTimeoutTest extends V5ClientBaseTest {
@Test
- public void testUnackedMessageIsRedeliveredAfterAckTimeout() throws
Exception {
+ public void testUnackedMessageIsRedeliveredAfterProcessingTimeout() throws
Exception {
String topic = newScalableTopic(1);
@Cleanup
Producer<String> producer = v5Client.newProducer(Schema.string())
.topic(topic)
.create();
- // Default ack timeout is disabled (or 60s); use a tight one so the
test stays fast.
- // Pulsar enforces a minimum of 1s on ackTimeout.
+ // Default processing timeout is disabled (or 60s); use a tight one so
the test
+ // stays fast. Pulsar enforces a minimum of 1s on the processing
timeout.
@Cleanup
QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
.topic(topic)
- .subscriptionName("ack-timeout-sub")
- .ackTimeout(Duration.ofSeconds(1))
+ .subscriptionName("processing-timeout-sub")
+
.processingTimeout(ProcessingTimeoutPolicy.of(Duration.ofSeconds(1)))
.subscribe();
producer.newMessage().value("once").send();
@@ -56,10 +57,11 @@ public class V5AckTimeoutTest extends V5ClientBaseTest {
assertNotNull(first);
assertEquals(first.value(), "once");
- // The broker's ack-timeout sweeper runs at ackTimeout/2 cadence, so
wait
- // generously past 1s for the redelivery to fire.
+ // The client's ack-timeout sweeper runs at processingTimeout/2
cadence; wait
+ // generously past 1s for the redelivery request to be sent and the
redelivery
+ // to land.
Message<String> redelivered = consumer.receive(Duration.ofSeconds(10));
- assertNotNull(redelivered, "ack-timeout did not trigger redelivery");
+ assertNotNull(redelivered, "processing-timeout did not trigger
redelivery");
assertEquals(redelivered.value(), "once");
consumer.acknowledge(redelivered.id());
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index a3f728e127e..a6249efaca3 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
/**
@@ -140,13 +141,16 @@ public interface QueueConsumerBuilder<T> {
// --- Acknowledgment ---
/**
- * If a message is not acknowledged within this duration, it is
automatically redelivered.
- * Set to zero to disable.
+ * Optional safety net for slow / stalled consumers: see
+ * {@link ProcessingTimeoutPolicy} for the full semantics. The policy
bundles the
+ * timeout itself with an optional redelivery backoff. Disabled by default.
*
- * @param timeout the ack timeout duration
+ * @param policy timeout + redelivery-backoff configuration
* @return this builder instance for chaining
+ * @see ProcessingTimeoutPolicy#of(Duration)
+ * @see ProcessingTimeoutPolicy#of(Duration, BackoffPolicy)
*/
- QueueConsumerBuilder<T> ackTimeout(Duration timeout);
+ QueueConsumerBuilder<T> processingTimeout(ProcessingTimeoutPolicy policy);
/**
* How frequently acknowledgments are flushed to the broker.
@@ -176,16 +180,6 @@ public interface QueueConsumerBuilder<T> {
*/
QueueConsumerBuilder<T> negativeAckRedeliveryBackoff(BackoffPolicy
backoff);
- /**
- * Backoff strategy for redelivery after ack timeout.
- *
- * @param backoff the backoff policy to use for ack timeout redelivery
- * @return this builder instance for chaining
- * @see BackoffPolicy#fixed(Duration)
- * @see BackoffPolicy#exponential(Duration, Duration)
- */
- QueueConsumerBuilder<T> ackTimeoutRedeliveryBackoff(BackoffPolicy backoff);
-
// --- Dead letter queue ---
/**
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
new file mode 100644
index 00000000000..0cda6c3ee04
--- /dev/null
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import java.time.Duration;
+
+/**
+ * Optional safety net for slow / stalled queue consumers: if the application
doesn't
+ * process and acknowledge a delivered message within {@code timeout}, the
<em>client</em>
+ * gives up on that delivery and asks the broker to redeliver it (to this
consumer or,
+ * on a Shared subscription, to another consumer in the group). The
bookkeeping is
+ * client-side — the client tracks pending acks and, on timeout, sends a
+ * {@code redeliverUnacknowledgedMessages} request to the broker.
+ *
+ * <p>{@code redeliveryBackoff} controls the cadence of those redeliveries —
{@code null}
+ * means "redeliver immediately on the next sweep", which is the historical
default.
+ *
+ * <p>Disabled by default. Pass to
+ * {@link
org.apache.pulsar.client.api.v5.QueueConsumerBuilder#processingTimeout(ProcessingTimeoutPolicy)}
+ * when the application's processing time is bounded and you want stalled
deliveries to
+ * be reattempted automatically.
+ *
+ * @param timeout how long the client waits for the application to
ack a
+ * delivery before requesting redelivery. {@link
Duration#ZERO}
+ * disables.
+ * @param redeliveryBackoff optional backoff applied between redeliveries.
May be
+ * {@code null} for the default (no extra delay).
+ */
+public record ProcessingTimeoutPolicy(
+ Duration timeout,
+ BackoffPolicy redeliveryBackoff
+) {
+ public ProcessingTimeoutPolicy {
+ if (timeout == null) {
+ throw new IllegalArgumentException("timeout must not be null");
+ }
+ if (timeout.isNegative()) {
+ throw new IllegalArgumentException("timeout must not be negative");
+ }
+ }
+
+ /**
+ * Create a policy with just a timeout — the broker uses its default
redelivery
+ * cadence (no extra backoff between retries).
+ */
+ public static ProcessingTimeoutPolicy of(Duration timeout) {
+ return new ProcessingTimeoutPolicy(timeout, null);
+ }
+
+ /**
+ * Create a policy with a timeout and an explicit redelivery backoff.
+ */
+ public static ProcessingTimeoutPolicy of(Duration timeout, BackoffPolicy
redeliveryBackoff) {
+ return new ProcessingTimeoutPolicy(timeout, redeliveryBackoff);
+ }
+}
diff --git
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index 22de52c7cb2..2cc17d67b68 100644
---
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.v5.config.CompressionType;
import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.config.TlsPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
@@ -276,7 +277,7 @@ public class Examples {
try (var consumer = client.newQueueConsumer(Schema.json(Order.class))
.topic("orders")
.subscriptionName("order-processor")
- .ackTimeout(Duration.ofSeconds(30))
+
.processingTimeout(ProcessingTimeoutPolicy.of(Duration.ofSeconds(30)))
.negativeAckRedeliveryBackoff(
BackoffPolicy.exponential(Duration.ofSeconds(1),
Duration.ofMinutes(5)))
.deadLetterPolicy(DeadLetterPolicy.of(5))
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index 3592faf312e..a762964c7d7 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -149,8 +150,19 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
}
@Override
- public QueueConsumerBuilderV5<T> ackTimeout(Duration timeout) {
- conf.setAckTimeoutMillis(timeout.toMillis());
+ public QueueConsumerBuilderV5<T> processingTimeout(ProcessingTimeoutPolicy
policy) {
+ conf.setAckTimeoutMillis(policy.timeout().toMillis());
+ BackoffPolicy backoff = policy.redeliveryBackoff();
+ if (backoff != null) {
+ conf.setAckTimeoutRedeliveryBackoff(
+
org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff.builder()
+ .minDelayMs(backoff.initialInterval().toMillis())
+ .maxDelayMs(backoff.maxInterval().toMillis())
+ .multiplier(backoff.multiplier())
+ .build());
+ } else {
+ conf.setAckTimeoutRedeliveryBackoff(null);
+ }
return this;
}
@@ -177,17 +189,6 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
return this;
}
- @Override
- public QueueConsumerBuilderV5<T> ackTimeoutRedeliveryBackoff(BackoffPolicy
backoff) {
- conf.setAckTimeoutRedeliveryBackoff(
-
org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff.builder()
- .minDelayMs(backoff.initialInterval().toMillis())
- .maxDelayMs(backoff.maxInterval().toMillis())
- .multiplier(backoff.multiplier())
- .build());
- return this;
- }
-
@Override
public QueueConsumerBuilderV5<T> deadLetterPolicy(DeadLetterPolicy policy)
{
var builder = org.apache.pulsar.client.api.DeadLetterPolicy.builder()