This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7ecc31d818277be333fe0d9b317f6bb98e26520d
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Feb 9 13:08:54 2026 +0200

    Revert "[improve] [broker] replace HashMap with inner implementation 
ConcurrentLongLongPairHashMap in Negative Ack Tracker. (#23582)"
    
    This reverts commit 431c2320d6984a4e8ac405cb3020848053fc86d4.
---
 .../pulsar/client/impl/NegativeAcksTest.java       |  4 +--
 .../pulsar/client/impl/NegativeAcksTracker.java    | 34 +++++-----------------
 .../pulsar/client/impl/ConsumerImplTest.java       |  2 +-
 3 files changed, 11 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index c5aa81e7083..d5819155b80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -319,7 +319,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         // negative topic message id
         consumer.negativeAcknowledge(topicMessageId);
         NegativeAcksTracker negativeAcksTracker = 
consumer.getNegativeAcksTracker();
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
+        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(),
 1);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
         // negative batch message id
@@ -327,7 +327,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         consumer.negativeAcknowledge(batchMessageId);
         consumer.negativeAcknowledge(batchMessageId2);
         consumer.negativeAcknowledge(batchMessageId3);
-        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) 
-1).longValue(), 1L);
+        
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(),
 1);
         assertEquals(unAckedMessageTracker.size(), 0);
         negativeAcksTracker.close();
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index e1724ebb85c..d6b86e3593d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -23,23 +23,22 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import java.io.Closeable;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class NegativeAcksTracker implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(NegativeAcksTracker.class);
 
-    private ConcurrentLongLongPairHashMap nackedMessages = null;
+    private HashMap<MessageId, Long> nackedMessages = null;
 
     private final ConsumerBase<?> consumer;
     private final Timer timer;
@@ -51,7 +50,6 @@ class NegativeAcksTracker implements Closeable {
 
     // Set a min delay to allow for grouping nacks within a single batch
     private static final long MIN_NACK_DELAY_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100);
-    private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = 
Long.MAX_VALUE;
 
     public NegativeAcksTracker(ConsumerBase<?> consumer, 
ConsumerConfigurationData<?> conf) {
         this.consumer = consumer;
@@ -77,21 +75,15 @@ class NegativeAcksTracker implements Closeable {
         // Group all the nacked messages into one single re-delivery request
         Set<MessageId> messagesToRedeliver = new HashSet<>();
         long now = System.nanoTime();
-        nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) 
-> {
+        nackedMessages.forEach((msgId, timestamp) -> {
             if (timestamp < now) {
-                MessageId msgId = new MessageIdImpl(ledgerId, entryId,
-                        // need to covert non-partitioned topic partition 
index to -1
-                        (int) (partitionIndex == 
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
                 addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, 
messagesToRedeliver, this.consumer);
                 messagesToRedeliver.add(msgId);
             }
         });
 
         if (!messagesToRedeliver.isEmpty()) {
-            for (MessageId messageId : messagesToRedeliver) {
-                nackedMessages.remove(((MessageIdImpl) 
messageId).getLedgerId(),
-                        ((MessageIdImpl) messageId).getEntryId());
-            }
+            messagesToRedeliver.forEach(nackedMessages::remove);
             consumer.onNegativeAcksSend(messagesToRedeliver);
             log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
             consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
@@ -110,10 +102,7 @@ class NegativeAcksTracker implements Closeable {
 
     private synchronized void add(MessageId messageId, int redeliveryCount) {
         if (nackedMessages == null) {
-            nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
-                    .autoShrink(true)
-                    .concurrencyLevel(1)
-                    .build();
+            nackedMessages = new HashMap<>();
         }
 
         long backoffNs;
@@ -122,14 +111,7 @@ class NegativeAcksTracker implements Closeable {
         } else {
             backoffNs = nackDelayNanos;
         }
-        MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
-        // ConcurrentLongLongPairHashMap requires the key and value >=0.
-        // partitionIndex is -1 if the message is from a non-partitioned 
topic, but we don't use
-        // partitionIndex actually, so we can set it to Long.MAX_VALUE in the 
case of non-partitioned topic to
-        // avoid exception from ConcurrentLongLongPairHashMap.
-        nackedMessages.put(messageIdAdv.getLedgerId(), 
messageIdAdv.getEntryId(),
-                messageIdAdv.getPartitionIndex() >= 0 ? 
messageIdAdv.getPartitionIndex() :
-                        NON_PARTITIONED_TOPIC_PARTITION_INDEX, 
System.nanoTime() + backoffNs);
+        nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId), 
System.nanoTime() + backoffNs);
 
         if (this.timeout == null) {
             // Schedule a task and group all the redeliveries for same period. 
Leave a small buffer to allow for
@@ -139,8 +121,8 @@ class NegativeAcksTracker implements Closeable {
     }
 
     @VisibleForTesting
-    Optional<Long> getNackedMessagesCount() {
-        return 
Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
+    Optional<Integer> getNackedMessagesCount() {
+        return Optional.ofNullable(nackedMessages).map(HashMap::size);
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index e62958eb968..0c47d17098e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -208,7 +208,7 @@ public class ConsumerImplTest {
         Exception checkException = null;
         try {
             if (consumer != null) {
-                consumer.negativeAcknowledge(new MessageIdImpl(0, 0, -1));
+                consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
                 consumer.close();
             }
         } catch (Exception e) {

Reply via email to