This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 22069491a9cff3947a538fb6b9c4169fa9a85f7b Author: Oneby Wang <[email protected]> AuthorDate: Wed Feb 4 18:46:37 2026 +0800 [fix][client] Fix race condition between isDuplicate() and flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty Recycler (#25208) (cherry picked from commit 5aab2f00f37c56303cb4efb86b3728d055eed7b1) --- .../apache/pulsar/client/impl/ConsumerImpl.java | 11 +++-- .../PersistentAcknowledgmentsGroupingTracker.java | 25 +++++------ .../impl/AcknowledgementsGroupingTrackerTest.java | 50 ++++++++++++++++++++++ .../apache/pulsar/common/protocol/Commands.java | 11 +++-- .../collections/ConcurrentBitSetRecyclable.java | 1 + 5 files changed, 74 insertions(+), 24 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 81a421c9b3c..c1e8df4d35c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -136,7 +136,7 @@ import org.apache.pulsar.common.util.ExceptionHandler; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3158,7 +3158,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } else { if (Commands.peerSupportsMultiMessageAcknowledgment( getClientCnx().getRemoteEndpointProtocolVersion())) { - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { @@ -3196,7 +3196,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries, + List<Triple<Long, Long, ConcurrentBitSet>> entries, long requestID) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() @@ -3215,7 +3215,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } }; - private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) { + private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) { BaseCommand cmd = LOCAL_BASE_COMMAND.get() .clear() .setType(BaseCommand.Type.ACK); @@ -3224,7 +3224,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); long entryId = entries.get(i).getMiddle(); - ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + ConcurrentBitSet bitSet = entries.get(i).getRight(); MessageIdData msgId = ack.addMessageId() .setLedgerId(ledgerId) .setEntryId(entryId); @@ -3233,7 +3233,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle for (int j = 0; j < ackSet.length; j++) { msgId.addAckSet(ackSet[j]); } - bitSet.recycle(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index b814d261fd7..0598dc4fb36 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -51,7 +51,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; import org.jspecify.annotations.Nullable; /** @@ -83,7 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments */ private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks; @VisibleForTesting - final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks; + final ConcurrentSkipListMap<MessageIdAdv, ConcurrentBitSet> pendingIndividualBatchIndexAcks; private final ScheduledFuture<?> scheduledTask; private final boolean batchIndexAckEnabled; @@ -133,7 +133,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments return true; } if (messageIdAdv.getBatchIndex() >= 0) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key); + ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key); return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex()); } return false; @@ -327,21 +327,22 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments @VisibleForTesting CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( + ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( MessageIdAdvUtils.discardBatch(msgId), __ -> { final BitSet ackSet = msgId.getAckSet(); - final ConcurrentBitSetRecyclable value; + final ConcurrentBitSet value; if (ackSet != null) { synchronized (ackSet) { if (!ackSet.isEmpty()) { - value = ConcurrentBitSetRecyclable.create(ackSet); + value = new ConcurrentBitSet(); + value.or(ackSet); } else { - value = ConcurrentBitSetRecyclable.create(); + value = new ConcurrentBitSet(); value.set(0, msgId.getBatchSize()); } } } else { - value = ConcurrentBitSetRecyclable.create(); + value = new ConcurrentBitSet(); value.set(0, msgId.getBatchSize()); } return value; @@ -445,7 +446,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } // Flush all individual acks - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); if (!pendingIndividualAcks.isEmpty()) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { @@ -487,7 +488,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } while (true) { - Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = + Map.Entry<MessageIdAdv, ConcurrentBitSet> entry = pendingIndividualBatchIndexAcks.pollFirstEntry(); if (entry == null) { // The entry has been removed in a different thread @@ -539,7 +540,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments // cumulative ack chunk by the last messageId if (chunkMsgIds != null && ackType != AckType.Cumulative) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length); + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); @@ -568,7 +569,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments long entryId, BitSetRecyclable ackSet, AckType ackType, Map<String, Long> properties, boolean flush, TimedCompletableFuture<Void> timedCompletableFuture, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) { + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) { if (consumer.isAckReceiptEnabled()) { final long requestId = consumer.getClient().newRequestId(); final ByteBuf cmd; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 8c7a605cfea..76f2f45a030 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -31,8 +31,10 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -406,6 +408,54 @@ public class AcknowledgementsGroupingTrackerTest { tracker.close(); } + @Test + public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws Exception { + ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); + conf.setMaxAcknowledgmentGroupSize(1); + PersistentAcknowledgmentsGroupingTracker tracker = + new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null); + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null); + + int loops = 10000; + int addAcknowledgmentThreadCount = 10; + List<Thread> addAcknowledgmentThreads = new ArrayList<>(addAcknowledgmentThreadCount); + for (int i = 0; i < addAcknowledgmentThreadCount; i++) { + Thread addAcknowledgmentThread = new Thread(() -> { + for (int j = 0; j < loops; j++) { + tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap()); + } + }, "doIndividualBatchAck-thread-" + i); + addAcknowledgmentThread.start(); + addAcknowledgmentThreads.add(addAcknowledgmentThread); + } + + int isDuplicateThreadCount = 10; + AtomicBoolean assertResult = new AtomicBoolean(); + List<Thread> isDuplicateThreads = new ArrayList<>(isDuplicateThreadCount); + for (int i = 0; i < isDuplicateThreadCount; i++) { + Thread isDuplicateThread = new Thread(() -> { + for (int j = 0; j < loops; j++) { + boolean duplicate = tracker.isDuplicate(batchMessageId1); + assertResult.set(assertResult.get() || duplicate); + } + }, "isDuplicate-thread-" + i); + isDuplicateThread.start(); + isDuplicateThreads.add(isDuplicateThread); + } + + for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) { + addAcknowledgmentThread.join(); + } + + for (Thread isDuplicateThread : isDuplicateThreads) { + isDuplicateThread.join(); + } + + assertFalse(assertResult.get()); + } + public class ClientCnxTest extends ClientCnx { public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index c6912ae4fe1..eaba323eec5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -109,7 +109,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; @UtilityClass @Slf4j @@ -1020,7 +1020,7 @@ public class Commands { } public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) { + List<Triple<Long, Long, ConcurrentBitSet>> entries) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() .setConsumerId(consumerId) @@ -1030,14 +1030,14 @@ public class Commands { return serializeWithSize(cmd); } - private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) { + private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSet>> entries) { BaseCommand cmd = localCmd(Type.ACK); CommandAck ack = cmd.setAck(); int entriesCount = entries.size(); for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); long entryId = entries.get(i).getMiddle(); - ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + ConcurrentBitSet bitSet = entries.get(i).getRight(); MessageIdData msgId = ack.addMessageId() .setLedgerId(ledgerId) .setEntryId(entryId); @@ -1046,7 +1046,6 @@ public class Commands { for (int j = 0; j < ackSet.length; j++) { msgId.addAckSet(ackSet[j]); } - bitSet.recycle(); } } @@ -1054,7 +1053,7 @@ public class Commands { } public static ByteBuf newMultiMessageAck(long consumerId, - List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries, + List<Triple<Long, Long, ConcurrentBitSet>> entries, long requestId) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java index 0ba409b2d7d..d29e4b8240f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -27,6 +27,7 @@ import java.util.BitSet; /** * Safe multithreaded version of {@code BitSet} and leverage netty recycler. */ +@Deprecated @EqualsAndHashCode(callSuper = true) public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
