This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 431c2320d69 [improve] [broker] replace HashMap with inner 
implementation ConcurrentLongLongPairHashMap in Negative Ack Tracker. (#23582)
431c2320d69 is described below

commit 431c2320d6984a4e8ac405cb3020848053fc86d4
Author: Wenzhi Feng <[email protected]>
AuthorDate: Wed Nov 13 17:51:55 2024 +0800

    [improve] [broker] replace HashMap with inner implementation 
ConcurrentLongLongPairHashMap in Negative Ack Tracker. (#23582)
    
    (cherry picked from commit 9d65a85d6fafbc5f5534caef9b20a808cb5e4d26)
---
 .../pulsar/client/impl/NegativeAcksTest.java       |  4 +--
 .../pulsar/client/impl/NegativeAcksTracker.java    | 34 +++++++++++++++++-----
 .../pulsar/client/impl/ConsumerImplTest.java       |  2 +-
 3 files changed, 29 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index ee2fbce240f..9e8dad44e80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -312,7 +312,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         // negative topic message id
         consumer.negativeAcknowledge(topicMessageId);
         NegativeAcksTracker negativeAcksTracker = 
consumer.getNegativeAcksTracker();
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(),
 1);
+        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
         // negative batch message id
@@ -320,7 +320,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(),
 1);
+        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index d6b86e3593d..e1724ebb85c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -23,22 +23,23 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import java.io.Closeable;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class NegativeAcksTracker implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(NegativeAcksTracker.class);
 
-    private HashMap<MessageId, Long> nackedMessages = null;
+    private ConcurrentLongLongPairHashMap nackedMessages = null;
 
     private final ConsumerBase<?> consumer;
     private final Timer timer;
@@ -50,6 +51,7 @@ class NegativeAcksTracker implements Closeable {
 
     // Set a min delay to allow for grouping nacks within a single batch
     private static final long MIN_NACK_DELAY_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100);
+    private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = 
Long.MAX_VALUE;
 
     public NegativeAcksTracker(ConsumerBase<?> consumer, 
ConsumerConfigurationData<?> conf) {
         this.consumer = consumer;
@@ -75,15 +77,21 @@ class NegativeAcksTracker implements Closeable {
         // Group all the nacked messages into one single re-delivery request
         Set<MessageId> messagesToRedeliver = new HashSet<>();
         long now = System.nanoTime();
-        nackedMessages.forEach((msgId, timestamp) -> {
+        nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) 
-> {
             if (timestamp < now) {
+                MessageId msgId = new MessageIdImpl(ledgerId, entryId,
+                        // need to covert non-partitioned topic partition 
index to -1
+                        (int) (partitionIndex == 
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
                 addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
                 messagesToRedeliver.add(msgId);
             }
         });
 
         if (!messagesToRedeliver.isEmpty()) {
-            messagesToRedeliver.forEach(nackedMessages::remove);
+            for (MessageId messageId : messagesToRedeliver) {
+                nackedMessages.remove(((MessageIdImpl) 
messageId).getLedgerId(),
+                        ((MessageIdImpl) messageId).getEntryId());
+            }
             consumer.onNegativeAcksSend(messagesToRedeliver);
             log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
             consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
@@ -102,7 +110,10 @@ class NegativeAcksTracker implements Closeable {
 
     private synchronized void add(MessageId messageId, int redeliveryCount) {
         if (nackedMessages == null) {
-            nackedMessages = new HashMap<>();
+            nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
+                    .autoShrink(true)
+                    .concurrencyLevel(1)
+                    .build();
         }
 
         long backoffNs;
@@ -111,7 +122,14 @@ class NegativeAcksTracker implements Closeable {
         } else {
             backoffNs = nackDelayNanos;
         }
-        nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId), 
System.nanoTime() + backoffNs);
+        MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
+        // ConcurrentLongLongPairHashMap requires the key and value >=0.
+        // partitionIndex is -1 if the message is from a non-partitioned 
topic, but we don't use
+        // partitionIndex actually, so we can set it to Long.MAX_VALUE in the 
case of non-partitioned topic to
+        // avoid exception from ConcurrentLongLongPairHashMap.
+        nackedMessages.put(messageIdAdv.getLedgerId(), 
messageIdAdv.getEntryId(),
+                messageIdAdv.getPartitionIndex() >= 0 ? 
messageIdAdv.getPartitionIndex() :
+                        NON_PARTITIONED_TOPIC_PARTITION_INDEX, 
System.nanoTime() + backoffNs);
 
         if (this.timeout == null) {
             // Schedule a task and group all the redeliveries for same period. 
Leave a small buffer to allow for
@@ -121,8 +139,8 @@ class NegativeAcksTracker implements Closeable {
     }
 
     @VisibleForTesting
-    Optional<Integer> getNackedMessagesCount() {
-        return Optional.ofNullable(nackedMessages).map(HashMap::size);
+    Optional<Long> getNackedMessagesCount() {
+        return 
Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 0c47d17098e..e62958eb968 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -208,7 +208,7 @@ public class ConsumerImplTest {
         Exception checkException = null;
         try {
             if (consumer != null) {
-                consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
+                consumer.negativeAcknowledge(new MessageIdImpl(0, 0, -1));
                 consumer.close();
             }
         } catch (Exception e) {

Reply via email to