This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14543d3fde9 [fix][client] Avoid recycling the same 
ConcurrentBitSetRecyclable among different threads (#24725)
14543d3fde9 is described below

commit 14543d3fde935d1b70e3707a8b2c0294eb53dccb
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Thu Sep 11 22:19:35 2025 +0800

    [fix][client] Avoid recycling the same ConcurrentBitSetRecyclable among 
different threads (#24725)
---
 .../PersistentAcknowledgmentsGroupingTracker.java  | 29 +++++++++++-----------
 .../impl/AcknowledgementsGroupingTrackerTest.java  | 28 ++++++---------------
 2 files changed, 22 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 5f7957d7f1d..b814d261fd7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.FastThreadLocal;
@@ -26,13 +27,12 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -82,7 +82,8 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      * broker.
      */
     private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks;
-    private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> 
pendingIndividualBatchIndexAcks;
+    @VisibleForTesting
+    final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> 
pendingIndividualBatchIndexAcks;
 
     private final ScheduledFuture<?> scheduledTask;
     private final boolean batchIndexAckEnabled;
@@ -92,7 +93,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                                                     EventLoopGroup 
eventLoopGroup) {
         this.consumer = consumer;
         this.pendingIndividualAcks = new ConcurrentSkipListSet<>();
-        this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
+        this.pendingIndividualBatchIndexAcks = new ConcurrentSkipListMap<>();
         this.acknowledgementGroupTimeMicros = 
conf.getAcknowledgementsGroupTimeMicros();
         this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize();
         this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
@@ -324,7 +325,8 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
     }
 
-    private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv 
msgId) {
+    @VisibleForTesting
+    CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
         ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
                 MessageIdAdvUtils.discardBatch(msgId), __ -> {
                     final BitSet ackSet = msgId.getAckSet();
@@ -484,16 +486,15 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             }
         }
 
-        if (!pendingIndividualBatchIndexAcks.isEmpty()) {
-            Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> 
iterator =
-                    pendingIndividualBatchIndexAcks.entrySet().iterator();
-
-            while (iterator.hasNext()) {
-                Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = 
iterator.next();
-                entriesToAck.add(Triple.of(
-                        entry.getKey().getLedgerId(), 
entry.getKey().getEntryId(), entry.getValue()));
-                iterator.remove();
+        while (true) {
+            Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry =
+                    pendingIndividualBatchIndexAcks.pollFirstEntry();
+            if (entry == null) {
+                // The entry has been removed in a different thread
+                break;
             }
+            entriesToAck.add(Triple.of(
+                    entry.getKey().getLedgerId(), entry.getKey().getEntryId(), 
entry.getValue()));
         }
 
         if (entriesToAck.size() > 0) {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index bbf12654899..7a8222473a3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -31,23 +31,18 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
-import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -387,27 +382,18 @@ public class AcknowledgementsGroupingTrackerTest {
     }
 
     @Test
-    public void testDoIndividualBatchAckAsync() throws Exception{
+    public void testDoIndividualBatchAckAsync() {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
-        AcknowledgmentsGroupingTracker tracker =
-                new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
-        MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null);
+        var tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, 
conf, eventLoopGroup);
+        var messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null);
         BitSet bitSet = new BitSet(20);
         for (int i = 0; i < 20; i++) {
             bitSet.set(i, true);
         }
-        MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet);
-        Method doIndividualBatchAckAsync = 
PersistentAcknowledgmentsGroupingTracker.class
-                .getDeclaredMethod("doIndividualBatchAckAsync", 
MessageIdAdv.class);
-        doIndividualBatchAckAsync.setAccessible(true);
-        doIndividualBatchAckAsync.invoke(tracker, messageId1);
-        doIndividualBatchAckAsync.invoke(tracker, messageId2);
-        Field pendingIndividualBatchIndexAcks =
-                
PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
-        pendingIndividualBatchIndexAcks.setAccessible(true);
-        ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> 
batchIndexAcks =
-                (ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>) 
pendingIndividualBatchIndexAcks
-                        .get(tracker);
+        var messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet);
+        tracker.doIndividualBatchAckAsync(messageId1);
+        tracker.doIndividualBatchAckAsync(messageId2);
+        var batchIndexAcks = tracker.pendingIndividualBatchIndexAcks;
         MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
         MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
         assertTrue(batchIndexAcks.containsKey(position1));

Reply via email to