This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3fa2ae83312 [fix][client] Consumer lost message ack due to race
condition in acknowledge with batch message (#22353)
3fa2ae83312 is described below
commit 3fa2ae83312ead38a81fe82bc06c1784e6061d6f
Author: 萧易客 <[email protected]>
AuthorDate: Wed Mar 27 20:12:39 2024 +0800
[fix][client] Consumer lost message ack due to race condition in
acknowledge with batch message (#22353)
Co-authored-by: Yunze Xu <[email protected]>
Co-authored-by: 汪苏诚 <[email protected]>
---
.../pulsar/client/impl/MessageIdAdvUtils.java | 19 ++++--
.../PersistentAcknowledgmentsGroupingTracker.java | 18 +++--
.../pulsar/client/impl/MessageIdAdvUtilsTest.java | 76 ++++++++++++++++++++++
.../org/apache/pulsar/client/api/MessageIdAdv.java | 2 +
4 files changed, 106 insertions(+), 9 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
index a0d1446ba3d..f66bb642021 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java
@@ -40,6 +40,13 @@ public class MessageIdAdvUtils {
&& lhs.getBatchIndex() == rhs.getBatchIndex();
}
+ /**
+ * Acknowledge batch message.
+ *
+ * @param msgId the message id
+ * @param individual whether to acknowledge the batch message individually
+ * @return true if the batch message is fully acknowledged
+ */
static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
if (!isBatch(msgId)) {
return true;
@@ -51,12 +58,14 @@ public class MessageIdAdvUtils {
return false;
}
int batchIndex = msgId.getBatchIndex();
- if (individual) {
- ackSet.clear(batchIndex);
- } else {
- ackSet.clear(0, batchIndex + 1);
+ synchronized (ackSet) {
+ if (individual) {
+ ackSet.clear(batchIndex);
+ } else {
+ ackSet.clear(0, batchIndex + 1);
+ }
+ return ackSet.isEmpty();
}
- return ackSet.isEmpty();
}
static boolean isBatch(MessageIdAdv msgId) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0cf776aea59..c0ee13b346a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -324,8 +324,15 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
final ConcurrentBitSetRecyclable value;
- if (ackSet != null && !ackSet.isEmpty()) {
- value = ConcurrentBitSetRecyclable.create(ackSet);
+ if (ackSet != null) {
+ synchronized (ackSet) {
+ if (!ackSet.isEmpty()) {
+ value =
ConcurrentBitSetRecyclable.create(ackSet);
+ } else {
+ value = ConcurrentBitSetRecyclable.create();
+ value.set(0, msgId.getBatchSize());
+ }
+ }
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, msgId.getBatchSize());
@@ -374,8 +381,11 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
.ConnectException("Consumer connect fail! consumer state:"
+ consumer.getState()));
}
BitSetRecyclable bitSet;
- if (msgId.getAckSet() != null) {
- bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray());
+ BitSet ackSetFromMsgId = msgId.getAckSet();
+ if (ackSetFromMsgId != null) {
+ synchronized (ackSetFromMsgId) {
+ bitSet =
BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray());
+ }
} else {
bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
new file mode 100644
index 00000000000..704dfc9cbd7
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdAdvUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.BitSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link MessageIdAdvUtils}.
+ */
+public class MessageIdAdvUtilsTest {
+
+ /**
+ * Call <code>acknowledge</code> concurrently with batch message, and
verify that only return true once
+ *
+ * @see MessageIdAdvUtils#acknowledge(MessageIdAdv, boolean)
+ * @see MessageIdAdv#getAckSet()
+ */
+ @Test
+ public void testAcknowledgeIndividualConcurrently() throws
InterruptedException {
+ ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("pulsar-consumer-%d").build();
+ @Cleanup("shutdown")
+ ExecutorService executorService =
Executors.newCachedThreadPool(threadFactory);
+ for (int i = 0; i < 100; i++) {
+ int batchSize = 32;
+ BitSet bitSet = new BitSet(batchSize);
+ bitSet.set(0, batchSize);
+ AtomicInteger individualAcked = new AtomicInteger();
+ Phaser phaser = new Phaser(1);
+ CountDownLatch finishLatch = new CountDownLatch(batchSize);
+ for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
+ phaser.register();
+ BatchMessageIdImpl messageId = new BatchMessageIdImpl(1, 0, 0,
batchIndex, batchSize, bitSet);
+ executorService.execute(() -> {
+ try {
+ phaser.arriveAndAwaitAdvance();
+ if (MessageIdAdvUtils.acknowledge(messageId, true)) {
+ individualAcked.incrementAndGet();
+ }
+ } finally {
+ finishLatch.countDown();
+ }
+ });
+ }
+ phaser.arriveAndDeregister();
+ finishLatch.await();
+ assertEquals(individualAcked.get(), 1);
+ }
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
index 73ecfed0ad0..76d41a7d3d4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
@@ -75,6 +75,8 @@ public interface MessageIdAdv extends MessageId {
* @implNote The message IDs of a batch should share a BitSet. For
example, given 3 messages in the same batch whose
* size is 3, all message IDs of them should return "111" (i.e. a BitSet
whose size is 3 and all bits are 1). If the
* 1st message has been acknowledged, the returned BitSet should become
"011" (i.e. the 1st bit become 0).
+ * If the caller performs any read or write operations on the return value
of this method, they should do so with
+ * lock protection.
*
* @return null if the message is a non-batched message
*/