This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c50d1801a7c273bbf7416aec823ec7b2da59f50 Author: 萧易客 <[email protected]> AuthorDate: Wed Mar 27 20:12:39 2024 +0800 [fix][client] Consumer lost message ack due to race condition in acknowledge with batch message (#22353) Co-authored-by: Yunze Xu <[email protected]> Co-authored-by: 汪苏诚 <[email protected]> (cherry picked from commit 3fa2ae83312ead38a81fe82bc06c1784e6061d6f) --- .../pulsar/client/impl/MessageIdAdvUtils.java | 19 ++++-- .../PersistentAcknowledgmentsGroupingTracker.java | 18 +++-- .../pulsar/client/impl/MessageIdAdvUtilsTest.java | 76 ++++++++++++++++++++++ .../org/apache/pulsar/client/api/MessageIdAdv.java | 2 + 4 files changed, 106 insertions(+), 9 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java index a0d1446ba3d..f66bb642021 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java @@ -40,6 +40,13 @@ public class MessageIdAdvUtils { && lhs.getBatchIndex() == rhs.getBatchIndex(); } + /** + * Acknowledge batch message. + * + * @param msgId the message id + * @param individual whether to acknowledge the batch message individually + * @return true if the batch message is fully acknowledged + */ static boolean acknowledge(MessageIdAdv msgId, boolean individual) { if (!isBatch(msgId)) { return true; @@ -51,12 +58,14 @@ public class MessageIdAdvUtils { return false; } int batchIndex = msgId.getBatchIndex(); - if (individual) { - ackSet.clear(batchIndex); - } else { - ackSet.clear(0, batchIndex + 1); + synchronized (ackSet) { + if (individual) { + ackSet.clear(batchIndex); + } else { + ackSet.clear(0, batchIndex + 1); + } + return ackSet.isEmpty(); } - return ackSet.isEmpty(); } static boolean isBatch(MessageIdAdv msgId) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0cf776aea59..c0ee13b346a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -324,8 +324,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments MessageIdAdvUtils.discardBatch(msgId), __ -> { final BitSet ackSet = msgId.getAckSet(); final ConcurrentBitSetRecyclable value; - if (ackSet != null && !ackSet.isEmpty()) { - value = ConcurrentBitSetRecyclable.create(ackSet); + if (ackSet != null) { + synchronized (ackSet) { + if (!ackSet.isEmpty()) { + value = ConcurrentBitSetRecyclable.create(ackSet); + } else { + value = ConcurrentBitSetRecyclable.create(); + value.set(0, msgId.getBatchSize()); + } + } } else { value = ConcurrentBitSetRecyclable.create(); value.set(0, msgId.getBatchSize()); @@ -374,8 +381,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); } BitSetRecyclable bitSet; - if (msgId.getAckSet() != null) { - bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray()); + BitSet ackSetFromMsgId = msgId.getAckSet(); + if (ackSetFromMsgId != null) { + synchronized (ackSetFromMsgId) { + bitSet = BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray()); + } } else { bitSet = BitSetRecyclable.create(); bitSet.set(0, batchSize); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java new file mode 100644 index 00000000000..704dfc9cbd7 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.BitSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.testng.annotations.Test; + +/** + * Unit test for {@link MessageIdAdvUtils}. + */ +public class MessageIdAdvUtilsTest { + + /** + * Call <code>acknowledge</code> concurrently with batch message, and verify that only return true once + * + * @see MessageIdAdvUtils#acknowledge(MessageIdAdv, boolean) + * @see MessageIdAdv#getAckSet() + */ + @Test + public void testAcknowledgeIndividualConcurrently() throws InterruptedException { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pulsar-consumer-%d").build(); + @Cleanup("shutdown") + ExecutorService executorService = Executors.newCachedThreadPool(threadFactory); + for (int i = 0; i < 100; i++) { + int batchSize = 32; + BitSet bitSet = new BitSet(batchSize); + bitSet.set(0, batchSize); + AtomicInteger individualAcked = new AtomicInteger(); + Phaser phaser = new Phaser(1); + CountDownLatch finishLatch = new CountDownLatch(batchSize); + for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) { + phaser.register(); + BatchMessageIdImpl messageId = new BatchMessageIdImpl(1, 0, 0, batchIndex, batchSize, bitSet); + executorService.execute(() -> { + try { + phaser.arriveAndAwaitAdvance(); + if (MessageIdAdvUtils.acknowledge(messageId, true)) { + individualAcked.incrementAndGet(); + } + } finally { + finishLatch.countDown(); + } + }); + } + phaser.arriveAndDeregister(); + finishLatch.await(); + assertEquals(individualAcked.get(), 1); + } + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java index 73ecfed0ad0..76d41a7d3d4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java @@ -75,6 +75,8 @@ public interface MessageIdAdv extends MessageId { * @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose * size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the * 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0). + * If the caller performs any read or write operations on the return value of this method, they should do so with + * lock protection. * * @return null if the message is a non-batched message */
