nodece commented on code in PR #23600:
URL: https://github.com/apache/pulsar/pull/23600#discussion_r1849729196


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java:
##########
@@ -155,6 +155,16 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
     )
     private long negativeAckRedeliveryDelayMicros = 
TimeUnit.MINUTES.toMicros(1);
 
+    @ApiModelProperty(
+            name = "negativeAckPrecisionBitCnt",
+            value = "The redelivery time precision bit count. The lower bits 
of the redelivery time will be"
+                    + "trimmed to reduce the memory occupation.\nThe default 
value is 8, which means the"
+                    + "redelivery time will be bucketed by 256ms, the 
redelivery time could be earlier(no later)"
+                    + "than the expected time, but no more than 256ms. \nIf 
set to k, the redelivery time will be"
+                    + "bucketed by 2^k ms.\nIf the value is 0, the redelivery 
time will be accurate to ms."
+    )
+    private int negativeAckPrecisionBitCnt = 8;

Review Comment:
   Could you consider 0 as the default value? Users may wish to submit later, 
which is consistent with previous behavior.
   
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java:
##########
@@ -311,19 +312,61 @@ public void testNegativeAcksDeleteFromUnackedTracker() 
throws Exception {
         // negative topic message id
         consumer.negativeAcknowledge(topicMessageId);
         NegativeAcksTracker negativeAcksTracker = 
consumer.getNegativeAcksTracker();
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
+        assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
         // negative batch message id
         unAckedMessageTracker.add(messageId);
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
+        assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
     }
 
+    /**
+     * If we nack multiple messages in the same batch with different 
redelivery delays, the messages should be redelivered
+     * with the correct delay. However, all messages are redelivered at the 
same time.
+     * @throws Exception
+     */
+    @Test
+    public void testNegativeAcksWithBatch() throws Exception {
+        cleanup();
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        setup();
+        String topic = 
BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)

Review Comment:
   ```suggestion
                   .topic(topic)
                   .enableBatching(true)
                   .batchingMaxPublishDelay(1, TimeUnit.HOURS)
                   .batchingMaxMessages(2)
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java:
##########
@@ -108,39 +134,56 @@ public synchronized void add(Message<?> message) {
         add(message.getMessageId(), message.getRedeliveryCount());
     }
 
+    static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
+    }
+
     private synchronized void add(MessageId messageId, int redeliveryCount) {
         if (nackedMessages == null) {
-            nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
-                    .autoShrink(true)
-                    .concurrencyLevel(1)
-                    .build();
+            nackedMessages = new Long2ObjectAVLTreeMap<>();
         }
 
-        long backoffNs;
+        long backoffMs;

Review Comment:
   Could you use nanoseconds? Then you need to use `System.nanoTime()` instead 
of `System.currentTimeMillis()`, the `System.nanoTime()` is quickly based on 
JVM.



##########
pulsar-client/pom.xml:
##########
@@ -207,6 +207,16 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>

Review Comment:
   It looks like you should use the `fastutil-core` instead of `fastutil`, and 
then we can make a new PR to reduce the `fastutil-core` size.
   
   And then use `<include>it.unimi.dsi:fastutil-core</include>`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to