This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 431c2320d69 [improve] [broker] replace HashMap with inner
implementation ConcurrentLongLongPairHashMap in Negative Ack Tracker. (#23582)
431c2320d69 is described below
commit 431c2320d6984a4e8ac405cb3020848053fc86d4
Author: Wenzhi Feng <[email protected]>
AuthorDate: Wed Nov 13 17:51:55 2024 +0800
[improve] [broker] replace HashMap with inner implementation
ConcurrentLongLongPairHashMap in Negative Ack Tracker. (#23582)
(cherry picked from commit 9d65a85d6fafbc5f5534caef9b20a808cb5e4d26)
---
.../pulsar/client/impl/NegativeAcksTest.java | 4 +--
.../pulsar/client/impl/NegativeAcksTracker.java | 34 +++++++++++++++++-----
.../pulsar/client/impl/ConsumerImplTest.java | 2 +-
3 files changed, 29 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index ee2fbce240f..9e8dad44e80 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -312,7 +312,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker =
consumer.getNegativeAcksTracker();
-
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(),
1);
+
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long)
-1).longValue(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
@@ -320,7 +320,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
-
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse(-1).intValue(),
1);
+
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long)
-1).longValue(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index d6b86e3593d..e1724ebb85c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -23,22 +23,23 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.io.Closeable;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class NegativeAcksTracker implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(NegativeAcksTracker.class);
- private HashMap<MessageId, Long> nackedMessages = null;
+ private ConcurrentLongLongPairHashMap nackedMessages = null;
private final ConsumerBase<?> consumer;
private final Timer timer;
@@ -50,6 +51,7 @@ class NegativeAcksTracker implements Closeable {
// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS =
TimeUnit.MILLISECONDS.toNanos(100);
+ private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX =
Long.MAX_VALUE;
public NegativeAcksTracker(ConsumerBase<?> consumer,
ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
@@ -75,15 +77,21 @@ class NegativeAcksTracker implements Closeable {
// Group all the nacked messages into one single re-delivery request
Set<MessageId> messagesToRedeliver = new HashSet<>();
long now = System.nanoTime();
- nackedMessages.forEach((msgId, timestamp) -> {
+ nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp)
-> {
if (timestamp < now) {
+ MessageId msgId = new MessageIdImpl(ledgerId, entryId,
+ // need to covert non-partitioned topic partition
index to -1
+ (int) (partitionIndex ==
NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId,
messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});
if (!messagesToRedeliver.isEmpty()) {
- messagesToRedeliver.forEach(nackedMessages::remove);
+ for (MessageId messageId : messagesToRedeliver) {
+ nackedMessages.remove(((MessageIdImpl)
messageId).getLedgerId(),
+ ((MessageIdImpl) messageId).getEntryId());
+ }
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer,
messagesToRedeliver.size());
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
@@ -102,7 +110,10 @@ class NegativeAcksTracker implements Closeable {
private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
- nackedMessages = new HashMap<>();
+ nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
+ .autoShrink(true)
+ .concurrencyLevel(1)
+ .build();
}
long backoffNs;
@@ -111,7 +122,14 @@ class NegativeAcksTracker implements Closeable {
} else {
backoffNs = nackDelayNanos;
}
- nackedMessages.put(MessageIdAdvUtils.discardBatch(messageId),
System.nanoTime() + backoffNs);
+ MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
+ // ConcurrentLongLongPairHashMap requires the key and value >=0.
+ // partitionIndex is -1 if the message is from a non-partitioned
topic, but we don't use
+ // partitionIndex actually, so we can set it to Long.MAX_VALUE in the
case of non-partitioned topic to
+ // avoid exception from ConcurrentLongLongPairHashMap.
+ nackedMessages.put(messageIdAdv.getLedgerId(),
messageIdAdv.getEntryId(),
+ messageIdAdv.getPartitionIndex() >= 0 ?
messageIdAdv.getPartitionIndex() :
+ NON_PARTITIONED_TOPIC_PARTITION_INDEX,
System.nanoTime() + backoffNs);
if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period.
Leave a small buffer to allow for
@@ -121,8 +139,8 @@ class NegativeAcksTracker implements Closeable {
}
@VisibleForTesting
- Optional<Integer> getNackedMessagesCount() {
- return Optional.ofNullable(nackedMessages).map(HashMap::size);
+ Optional<Long> getNackedMessagesCount() {
+ return
Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
}
@Override
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 0c47d17098e..e62958eb968 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -208,7 +208,7 @@ public class ConsumerImplTest {
Exception checkException = null;
try {
if (consumer != null) {
- consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1));
+ consumer.negativeAcknowledge(new MessageIdImpl(0, 0, -1));
consumer.close();
}
} catch (Exception e) {