This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7ecc31d818277be333fe0d9b317f6bb98e26520d Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 9 13:08:54 2026 +0200 Revert "[improve] [broker] replace HashMap with inner implementation ConcurrentLongLongPairHashMap in Negative Ack Tracker. (#23582)" This reverts commit 431c2320d6984a4e8ac405cb3020848053fc86d4. --- .../pulsar/client/impl/NegativeAcksTest.java | 4 +-- .../pulsar/client/impl/NegativeAcksTracker.java | 34 +++++----------------- .../pulsar/client/impl/ConsumerImplTest.java | 2 +- 3 files changed, 11 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index c5aa81e7083..d5819155b80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -319,7 +319,7 @@ public class NegativeAcksTest extends ProducerConsumerBase { // negative topic message id consumer.negativeAcknowledge(topicMessageId); NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker(); - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); + assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1); assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id @@ -327,7 +327,7 @@ public class NegativeAcksTest extends ProducerConsumerBase { consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); + assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(), 1); assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index e1724ebb85c..d6b86e3593d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -23,23 +23,22 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timeout; import io.netty.util.Timer; import java.io.Closeable; +import java.util.HashMap; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class NegativeAcksTracker implements Closeable { private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class); - private ConcurrentLongLongPairHashMap nackedMessages = null; + private HashMap<MessageId, Long> nackedMessages = null; private final ConsumerBase<?> consumer; private final Timer timer; @@ -51,7 +50,6 @@ class NegativeAcksTracker implements Closeable { // Set a min delay to allow for grouping nacks within a single batch private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100); - private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE; public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) { this.consumer = consumer; @@ -77,21 +75,15 @@ class NegativeAcksTracker implements Closeable { // Group all the nacked messages into one single re-delivery request Set<MessageId> messagesToRedeliver = new HashSet<>(); long now = System.nanoTime(); - nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { + nackedMessages.forEach((msgId, timestamp) -> { if (timestamp < now) { - MessageId msgId = new MessageIdImpl(ledgerId, entryId, - // need to covert non-partitioned topic partition index to -1 - (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); messagesToRedeliver.add(msgId); } }); if (!messagesToRedeliver.isEmpty()) { - for (MessageId messageId : messagesToRedeliver) { - nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(), - ((MessageIdImpl) messageId).getEntryId()); - } + messagesToRedeliver.forEach(nackedMessages::remove); consumer.onNegativeAcksSend(messagesToRedeliver); log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size()); consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); @@ -110,10 +102,7 @@ class NegativeAcksTracker implements Closeable { private synchronized void add(MessageId messageId, int redeliveryCount) { if (nackedMessages == null) { - nackedMessages = ConcurrentLongLongPairHashMap.newBuilder() - .autoShrink(true) - .concurrencyLevel(1) - .build(); + nackedMessages = new HashMap<>(); } long backoffNs; @@ -122,14 +111,7 @@ class NegativeAcksTracker implements Closeable { } else { backoffNs = nackDelayNanos; } - MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId); - // ConcurrentLongLongPairHashMap requires the key and value >=0. - // partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use - // partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to - // avoid exception from ConcurrentLongLongPairHashMap. - nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), - messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() : - NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs); + nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId), System.nanoTime() + backoffNs); if (this.timeout == null) { // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for @@ -139,8 +121,8 @@ class NegativeAcksTracker implements Closeable { } @VisibleForTesting - Optional<Long> getNackedMessagesCount() { - return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size); + Optional<Integer> getNackedMessagesCount() { + return Optional.ofNullable(nackedMessages).map(HashMap::size); } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index e62958eb968..0c47d17098e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -208,7 +208,7 @@ public class ConsumerImplTest { Exception checkException = null; try { if (consumer != null) { - consumer.negativeAcknowledge(new MessageIdImpl(0, 0, -1)); + consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1)); consumer.close(); } } catch (Exception e) {
