This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f199e8805f5 [improve][client] PIP-393: Support configuring 
NegativeAckPrecisionBitCnt while building consumer. (#23804)
f199e8805f5 is described below

commit f199e8805f517373dbcc0f0c4a132218ecc24f0a
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Jan 3 17:56:33 2025 +0800

    [improve][client] PIP-393: Support configuring NegativeAckPrecisionBitCnt 
while building consumer. (#23804)
---
 .../pulsar/client/impl/NegativeAcksTest.java       | 47 ++++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 13 ++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  7 ++++
 3 files changed, 67 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index f8bc30f0966..7ab3e545e98 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -545,4 +545,51 @@ public class NegativeAcksTest extends ProducerConsumerBase 
{
         consumer.close();
         admin.topics().deletePartitionedTopic("persistent://public/default/" + 
topic);
     }
+
+    @DataProvider(name = "negativeAckPrecisionBitCnt")
+    public Object[][] negativeAckPrecisionBitCnt() {
+        return new Object[][]{
+                {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12}
+        };
+    }
+
+    /**
+     * When negativeAckPrecisionBitCnt is greater than 0, the lower bits of 
the redelivery time will be truncated
+     * to reduce the memory occupation. If set to k, the redelivery time will 
be bucketed by 2^k ms, resulting in
+     * the redelivery time could be earlier(no later) than the expected time 
no more than 2^k ms.
+     * @throws Exception if an error occurs
+     */
+    @Test(dataProvider = "negativeAckPrecisionBitCnt")
+    public void testConfigureNegativeAckPrecisionBitCnt(int 
negativeAckPrecisionBitCnt) throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("testConfigureNegativeAckPrecisionBitCnt");
+        long timeDeviation = 1L << negativeAckPrecisionBitCnt;
+        long delayInMs = 2000;
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(delayInMs, TimeUnit.MILLISECONDS)
+                
.negativeAckRedeliveryDelayPrecision(negativeAckPrecisionBitCnt)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        producer.sendAsync("test-0");
+        producer.flush();
+
+        // receive the message and negative ack
+        consumer.negativeAcknowledge(consumer.receive());
+        long expectedTime = System.currentTimeMillis() + delayInMs;
+
+        // receive the redelivered message and calculate the time deviation
+        // assert that the redelivery time is no earlier than the `expected 
time - timeDeviation`
+        Message<String> msg1 = consumer.receive();
+        assertTrue(System.currentTimeMillis() >= expectedTime - timeDeviation);
+        assertNotNull(msg1);
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 3ce12b7741a..ed77652c823 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -243,6 +243,19 @@ public interface ConsumerBuilder<T> extends Cloneable {
      */
     ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, 
TimeUnit timeUnit);
 
+    /**
+     * Sets the redelivery time precision bit count. The lower bits of the 
redelivery time will be
+     * trimmed to reduce the memory occupation. The default value is 8, which 
means the redelivery time
+     * will be bucketed by 256ms, the redelivery time could be earlier(no 
later) than the expected time,
+     * but no more than 256ms. If set to k, the redelivery time will be 
bucketed by 2^k ms.
+     * If the value is 0, the redelivery time will be accurate to ms.
+     *
+     * @param negativeAckPrecisionBitCnt
+     *            The redelivery time precision bit count.
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int 
negativeAckPrecisionBitCount);
+
     /**
      * Select the subscription type to be used when subscribing to a topic.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 35f772028f1..478f93b56a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -281,6 +281,13 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int 
negativeAckPrecisionBitCount) {
+        checkArgument(negativeAckPrecisionBitCount >= 0, 
"negativeAckPrecisionBitCount needs to be >= 0");
+        conf.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCount);
+        return this;
+    }
+
     @Override
     public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType 
subscriptionType) {
         conf.setSubscriptionType(subscriptionType);

Reply via email to