This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c50d1801a7c273bbf7416aec823ec7b2da59f50
Author: 萧易客 <[email protected]>
AuthorDate: Wed Mar 27 20:12:39 2024 +0800

    [fix][client] Consumer lost message ack due to race condition in 
acknowledge with batch message (#22353)
    
    Co-authored-by: Yunze Xu <[email protected]>
    Co-authored-by: 汪苏诚 <[email protected]>
    (cherry picked from commit 3fa2ae83312ead38a81fe82bc06c1784e6061d6f)
---
 .../pulsar/client/impl/MessageIdAdvUtils.java      | 19 ++++--
 .../PersistentAcknowledgmentsGroupingTracker.java  | 18 +++--
 .../pulsar/client/impl/MessageIdAdvUtilsTest.java  | 76 ++++++++++++++++++++++
 .../org/apache/pulsar/client/api/MessageIdAdv.java |  2 +
 4 files changed, 106 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
index a0d1446ba3d..f66bb642021 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
@@ -40,6 +40,13 @@ public class MessageIdAdvUtils {
                 && lhs.getBatchIndex() == rhs.getBatchIndex();
     }
 
+    /**
+     * Acknowledge batch message.
+     *
+     * @param msgId     the message id
+     * @param individual whether to acknowledge the batch message individually
+     * @return true if the batch message is fully acknowledged
+     */
     static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
         if (!isBatch(msgId)) {
             return true;
@@ -51,12 +58,14 @@ public class MessageIdAdvUtils {
             return false;
         }
         int batchIndex = msgId.getBatchIndex();
-        if (individual) {
-            ackSet.clear(batchIndex);
-        } else {
-            ackSet.clear(0, batchIndex + 1);
+        synchronized (ackSet) {
+            if (individual) {
+                ackSet.clear(batchIndex);
+            } else {
+                ackSet.clear(0, batchIndex + 1);
+            }
+            return ackSet.isEmpty();
         }
-        return ackSet.isEmpty();
     }
 
     static boolean isBatch(MessageIdAdv msgId) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0cf776aea59..c0ee13b346a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -324,8 +324,15 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                 MessageIdAdvUtils.discardBatch(msgId), __ -> {
                     final BitSet ackSet = msgId.getAckSet();
                     final ConcurrentBitSetRecyclable value;
-                    if (ackSet != null && !ackSet.isEmpty()) {
-                        value = ConcurrentBitSetRecyclable.create(ackSet);
+                    if (ackSet != null) {
+                        synchronized (ackSet) {
+                            if (!ackSet.isEmpty()) {
+                                value = 
ConcurrentBitSetRecyclable.create(ackSet);
+                            } else {
+                                value = ConcurrentBitSetRecyclable.create();
+                                value.set(0, msgId.getBatchSize());
+                            }
+                        }
                     } else {
                         value = ConcurrentBitSetRecyclable.create();
                         value.set(0, msgId.getBatchSize());
@@ -374,8 +381,11 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                     .ConnectException("Consumer connect fail! consumer state:" 
+ consumer.getState()));
         }
         BitSetRecyclable bitSet;
-        if (msgId.getAckSet() != null) {
-            bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray());
+        BitSet ackSetFromMsgId = msgId.getAckSet();
+        if (ackSetFromMsgId != null) {
+            synchronized (ackSetFromMsgId) {
+                bitSet = 
BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray());
+            }
         } else {
             bitSet = BitSetRecyclable.create();
             bitSet.set(0, batchSize);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
new file mode 100644
index 00000000000..704dfc9cbd7
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.BitSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link MessageIdAdvUtils}.
+ */
+public class MessageIdAdvUtilsTest {
+
+    /**
+     * Call <code>acknowledge</code> concurrently with batch message, and 
verify that only return true once
+     *
+     * @see MessageIdAdvUtils#acknowledge(MessageIdAdv, boolean)
+     * @see MessageIdAdv#getAckSet()
+     */
+    @Test
+    public void testAcknowledgeIndividualConcurrently() throws 
InterruptedException {
+        ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat("pulsar-consumer-%d").build();
+        @Cleanup("shutdown")
+        ExecutorService executorService = 
Executors.newCachedThreadPool(threadFactory);
+        for (int i = 0; i < 100; i++) {
+            int batchSize = 32;
+            BitSet bitSet = new BitSet(batchSize);
+            bitSet.set(0, batchSize);
+            AtomicInteger individualAcked = new AtomicInteger();
+            Phaser phaser = new Phaser(1);
+            CountDownLatch finishLatch = new CountDownLatch(batchSize);
+            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
+                phaser.register();
+                BatchMessageIdImpl messageId = new BatchMessageIdImpl(1, 0, 0, 
batchIndex, batchSize, bitSet);
+                executorService.execute(() -> {
+                    try {
+                        phaser.arriveAndAwaitAdvance();
+                        if (MessageIdAdvUtils.acknowledge(messageId, true)) {
+                            individualAcked.incrementAndGet();
+                        }
+                    } finally {
+                        finishLatch.countDown();
+                    }
+                });
+            }
+            phaser.arriveAndDeregister();
+            finishLatch.await();
+            assertEquals(individualAcked.get(), 1);
+        }
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
index 73ecfed0ad0..76d41a7d3d4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
@@ -75,6 +75,8 @@ public interface MessageIdAdv extends MessageId {
      * @implNote The message IDs of a batch should share a BitSet. For 
example, given 3 messages in the same batch whose
      * size is 3, all message IDs of them should return "111" (i.e. a BitSet 
whose size is 3 and all bits are 1). If the
      * 1st message has been acknowledged, the returned BitSet should become 
"011" (i.e. the 1st bit become 0).
+     * If the caller performs any read or write operations on the return value 
of this method, they should do so with
+     * lock protection.
      *
      * @return null if the message is a non-batched message
      */

Reply via email to