This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f199e8805f5 [improve][client] PIP-393: Support configuring
NegativeAckPrecisionBitCnt while building consumer. (#23804)
f199e8805f5 is described below
commit f199e8805f517373dbcc0f0c4a132218ecc24f0a
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Jan 3 17:56:33 2025 +0800
[improve][client] PIP-393: Support configuring NegativeAckPrecisionBitCnt
while building consumer. (#23804)
---
.../pulsar/client/impl/NegativeAcksTest.java | 47 ++++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 13 ++++++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 7 ++++
3 files changed, 67 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index f8bc30f0966..7ab3e545e98 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -545,4 +545,51 @@ public class NegativeAcksTest extends ProducerConsumerBase
{
consumer.close();
admin.topics().deletePartitionedTopic("persistent://public/default/" +
topic);
}
+
+ @DataProvider(name = "negativeAckPrecisionBitCnt")
+ public Object[][] negativeAckPrecisionBitCnt() {
+ return new Object[][]{
+ {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12}
+ };
+ }
+
+ /**
+ * When negativeAckPrecisionBitCnt is greater than 0, the lower bits of
the redelivery time will be truncated
+ * to reduce the memory occupation. If set to k, the redelivery time will
be bucketed by 2^k ms, resulting in
+ * the redelivery time could be earlier(no later) than the expected time
no more than 2^k ms.
+ * @throws Exception if an error occurs
+ */
+ @Test(dataProvider = "negativeAckPrecisionBitCnt")
+ public void testConfigureNegativeAckPrecisionBitCnt(int
negativeAckPrecisionBitCnt) throws Exception {
+ String topic =
BrokerTestUtil.newUniqueName("testConfigureNegativeAckPrecisionBitCnt");
+ long timeDeviation = 1L << negativeAckPrecisionBitCnt;
+ long delayInMs = 2000;
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(delayInMs, TimeUnit.MILLISECONDS)
+
.negativeAckRedeliveryDelayPrecision(negativeAckPrecisionBitCnt)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ producer.sendAsync("test-0");
+ producer.flush();
+
+ // receive the message and negative ack
+ consumer.negativeAcknowledge(consumer.receive());
+ long expectedTime = System.currentTimeMillis() + delayInMs;
+
+ // receive the redelivered message and calculate the time deviation
+ // assert that the redelivery time is no earlier than the `expected
time - timeDeviation`
+ Message<String> msg1 = consumer.receive();
+ assertTrue(System.currentTimeMillis() >= expectedTime - timeDeviation);
+ assertNotNull(msg1);
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 3ce12b7741a..ed77652c823 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -243,6 +243,19 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay,
TimeUnit timeUnit);
+ /**
+ * Sets the redelivery time precision bit count. The lower bits of the
redelivery time will be
+ * trimmed to reduce the memory occupation. The default value is 8, which
means the redelivery time
+ * will be bucketed by 256ms, the redelivery time could be earlier(no
later) than the expected time,
+ * but no more than 256ms. If set to k, the redelivery time will be
bucketed by 2^k ms.
+ * If the value is 0, the redelivery time will be accurate to ms.
+ *
+ * @param negativeAckPrecisionBitCnt
+ * The redelivery time precision bit count.
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int
negativeAckPrecisionBitCount);
+
/**
* Select the subscription type to be used when subscribing to a topic.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 35f772028f1..478f93b56a0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -281,6 +281,13 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
return this;
}
+ @Override
+ public ConsumerBuilder<T> negativeAckRedeliveryDelayPrecision(int
negativeAckPrecisionBitCount) {
+ checkArgument(negativeAckPrecisionBitCount >= 0,
"negativeAckPrecisionBitCount needs to be >= 0");
+ conf.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCount);
+ return this;
+ }
+
@Override
public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType
subscriptionType) {
conf.setSubscriptionType(subscriptionType);