nodece commented on code in PR #23600:
URL: https://github.com/apache/pulsar/pull/23600#discussion_r1849729196
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java:
##########
@@ -155,6 +155,16 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
)
private long negativeAckRedeliveryDelayMicros =
TimeUnit.MINUTES.toMicros(1);
+ @ApiModelProperty(
+ name = "negativeAckPrecisionBitCnt",
+ value = "The redelivery time precision bit count. The lower bits
of the redelivery time will be"
+ + "trimmed to reduce the memory occupation.\nThe default
value is 8, which means the"
+ + "redelivery time will be bucketed by 256ms, the
redelivery time could be earlier(no later)"
+ + "than the expected time, but no more than 256ms. \nIf
set to k, the redelivery time will be"
+ + "bucketed by 2^k ms.\nIf the value is 0, the redelivery
time will be accurate to ms."
+ )
+ private int negativeAckPrecisionBitCnt = 8;
Review Comment:
Could you consider 0 as the default value? Users may wish to submit later,
which is consistent with previous behavior.
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##########
@@ -311,19 +312,61 @@ public void testNegativeAcksDeleteFromUnackedTracker()
throws Exception {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker =
consumer.getNegativeAcksTracker();
-
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long)
-1).longValue(), 1L);
+ assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
-
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long)
-1).longValue(), 1L);
+ assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}
+ /**
+ * If we nack multiple messages in the same batch with different
redelivery delays, the messages should be redelivered
+ * with the correct delay. However, all messages are redelivered at the
same time.
+ * @throws Exception
+ */
+ @Test
+ public void testNegativeAcksWithBatch() throws Exception {
+ cleanup();
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+ setup();
+ String topic =
BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
Review Comment:
```suggestion
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.batchingMaxMessages(2)
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java:
##########
@@ -108,39 +134,56 @@ public synchronized void add(Message<?> message) {
add(message.getMessageId(), message.getRedeliveryCount());
}
+ static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
+ }
+
private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
- nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
- .autoShrink(true)
- .concurrencyLevel(1)
- .build();
+ nackedMessages = new Long2ObjectAVLTreeMap<>();
}
- long backoffNs;
+ long backoffMs;
Review Comment:
Could you use nanoseconds? Then you need to use `System.nanoTime()` instead
of `System.currentTimeMillis()`, the `System.nanoTime()` is quickly based on
JVM.
##########
pulsar-client/pom.xml:
##########
@@ -207,6 +207,16 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
Review Comment:
It looks like you should use the `fastutil-core` instead of `fastutil`, and
then we can make a new PR to reduce the `fastutil-core` size.
And then use `<include>it.unimi.dsi:fastutil-core</include>`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]