This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b1bf0022443f110ccbd5a726118e2fa21ad41b93 Author: Jiwei Guo <[email protected]> AuthorDate: Mon Feb 14 11:18:11 2022 +0800 Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (#14260) When consumers set `enableBatchIndexAcknowledgment=true`, client will execute PersistentAcknowledgmentsGroupingTracker#doIndividualBatchAckAsync : https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L357-L372 There is an error in line 367, it should be `value.set(0, batchMessageId.getBatchSize()); ` But batchMessageId.getBatchSize() always return acker.getBatchSize(): https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java#L137-L139 If line 362 is false, BatchMessageIdImpl only has acker with BatchMessageAckerDisabled which batch is always 0. So I have added `getOriginalBatchSize` to return the user-specified batch size. Then, when print logs in line 556, `pendingIndividualBatchIndexAcks` is always empty. Should replace with `entriesToAck` (cherry picked from commit 816eaed900bbff1a8514f349cd60e439c6db97bc) --- .../pulsar/client/impl/BatchMessageIdImpl.java | 4 +++ .../PersistentAcknowledgmentsGroupingTracker.java | 2 +- .../impl/AcknowledgementsGroupingTrackerTest.java | 41 ++++++++++++++++++++-- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index fd8ea72..75ab3a8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -138,6 +138,10 @@ public class BatchMessageIdImpl extends MessageIdImpl { return acker.getBatchSize(); } + public int getOriginalBatchSize() { + return this.batchSize; + } + public MessageIdImpl prevBatchMessageId() { return new MessageIdImpl( ledgerId, entryId - 1, partitionIndex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index aa65c61..e737a2a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -363,7 +363,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet()); } else { value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchMessageId.getBatchIndex()); + value.set(0, batchMessageId.getOriginalBatchSize()); } return value; }); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 9632a88..c0b952a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -22,22 +22,27 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; - +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.BitSet; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.testng.annotations.AfterClass; @@ -381,6 +386,36 @@ public class AcknowledgementsGroupingTrackerTest { tracker.close(); } + @Test + public void testDoIndividualBatchAckAsync() throws Exception{ + ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); + AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE); + BitSet bitSet = new BitSet(20); + for(int i = 0; i < 20; i ++) { + bitSet.set(i, true); + } + MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet)); + Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class + .getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class); + doIndividualBatchAckAsync.setAccessible(true); + doIndividualBatchAckAsync.invoke(tracker, messageId1); + doIndividualBatchAckAsync.invoke(tracker, messageId2); + Field pendingIndividualBatchIndexAcks = PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks"); + pendingIndividualBatchIndexAcks.setAccessible(true); + ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> batchIndexAcks = + (ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>) pendingIndividualBatchIndexAcks.get(tracker); + MessageIdImpl position1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl position2 = new MessageIdImpl(3, 2, 0); + assertTrue(batchIndexAcks.containsKey(position1)); + assertNotNull(batchIndexAcks.get(position1)); + assertEquals(batchIndexAcks.get(position1).cardinality(), 9); + assertTrue(batchIndexAcks.containsKey(position2)); + assertNotNull(batchIndexAcks.get(position2)); + assertEquals(batchIndexAcks.get(position2).cardinality(), 19); + tracker.close(); + } + public class ClientCnxTest extends ClientCnx { public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
