This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 14543d3fde9 [fix][client] Avoid recycling the same ConcurrentBitSetRecyclable among different threads (#24725) 14543d3fde9 is described below commit 14543d3fde935d1b70e3707a8b2c0294eb53dccb Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu Sep 11 22:19:35 2025 +0800 [fix][client] Avoid recycling the same ConcurrentBitSetRecyclable among different threads (#24725) --- .../PersistentAcknowledgmentsGroupingTracker.java | 29 +++++++++++----------- .../impl/AcknowledgementsGroupingTrackerTest.java | 28 ++++++--------------- 2 files changed, 22 insertions(+), 35 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 5f7957d7f1d..b814d261fd7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.FastThreadLocal; @@ -26,13 +27,12 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -82,7 +82,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * broker. */ private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks; - private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks; + @VisibleForTesting + final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks; private final ScheduledFuture<?> scheduledTask; private final boolean batchIndexAckEnabled; @@ -92,7 +93,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments EventLoopGroup eventLoopGroup) { this.consumer = consumer; this.pendingIndividualAcks = new ConcurrentSkipListSet<>(); - this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>(); + this.pendingIndividualBatchIndexAcks = new ConcurrentSkipListMap<>(); this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros(); this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize(); this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled(); @@ -324,7 +325,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } } - private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) { + @VisibleForTesting + CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( MessageIdAdvUtils.discardBatch(msgId), __ -> { final BitSet ackSet = msgId.getAckSet(); @@ -484,16 +486,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } } - if (!pendingIndividualBatchIndexAcks.isEmpty()) { - Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> iterator = - pendingIndividualBatchIndexAcks.entrySet().iterator(); - - while (iterator.hasNext()) { - Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next(); - entriesToAck.add(Triple.of( - entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue())); - iterator.remove(); + while (true) { + Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = + pendingIndividualBatchIndexAcks.pollFirstEntry(); + if (entry == null) { + // The entry has been removed in a different thread + break; } + entriesToAck.add(Triple.of( + entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue())); } if (entriesToAck.size() > 0) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index bbf12654899..7a8222473a3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -31,23 +31,18 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.util.BitSet; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.ProtocolVersion; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -387,27 +382,18 @@ public class AcknowledgementsGroupingTrackerTest { } @Test - public void testDoIndividualBatchAckAsync() throws Exception{ + public void testDoIndividualBatchAckAsync() { ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); - AcknowledgmentsGroupingTracker tracker = - new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); - MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null); + var tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + var messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null); BitSet bitSet = new BitSet(20); for (int i = 0; i < 20; i++) { bitSet.set(i, true); } - MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet); - Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class - .getDeclaredMethod("doIndividualBatchAckAsync", MessageIdAdv.class); - doIndividualBatchAckAsync.setAccessible(true); - doIndividualBatchAckAsync.invoke(tracker, messageId1); - doIndividualBatchAckAsync.invoke(tracker, messageId2); - Field pendingIndividualBatchIndexAcks = - PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks"); - pendingIndividualBatchIndexAcks.setAccessible(true); - ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> batchIndexAcks = - (ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable>) pendingIndividualBatchIndexAcks - .get(tracker); + var messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet); + tracker.doIndividualBatchAckAsync(messageId1); + tracker.doIndividualBatchAckAsync(messageId2); + var batchIndexAcks = tracker.pendingIndividualBatchIndexAcks; MessageIdImpl position1 = new MessageIdImpl(5, 1, 0); MessageIdImpl position2 = new MessageIdImpl(3, 2, 0); assertTrue(batchIndexAcks.containsKey(position1));