This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 64598b40db3 [fix] [client] call redeliver 1 msg but did 2 msgs (#23943)
64598b40db3 is described below
commit 64598b40db3c60ae0d548c5742ed28df2edec28d
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 10 09:44:07 2025 +0800
[fix] [client] call redeliver 1 msg but did 2 msgs (#23943)
(cherry picked from commit 7a79c78f8e6f4b52f13be1c6441f4b007d9a00fe)
---
.../BatchMessageWithBatchIndexLevelTest.java | 62 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 31 ++++-------
.../collections/GrowableArrayBlockingQueue.java | 8 +++
3 files changed, 81 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 8e902d5d1e7..52147f74f4a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -59,6 +59,7 @@ import
org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -137,6 +138,67 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
});
}
+ @DataProvider
+ public Object[][] enabledBatchSend() {
+ return new Object[][] {
+ {false},
+ {true}
+ };
+ }
+
+ @Test(dataProvider = "enabledBatchSend")
+ @SneakyThrows
+ public void testBatchMessageNAck(boolean enabledBatchSend) {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+ final String subscriptionName = "s1";
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName(subscriptionName)
+ .receiverQueueSize(21)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .subscribe();
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxMessages(20)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .enableBatching(enabledBatchSend)
+ .create();
+ final PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ final PersistentDispatcherMultipleConsumers dispatcher =
+ (PersistentDispatcherMultipleConsumers)
topic.getSubscription(subscriptionName).getDispatcher();
+
+ // Send messages: 20 * 2.
+ for (int i = 0; i < 40; i++) {
+ byte[] message = ("batch-message-" + i).getBytes();
+ if (i == 19 || i == 39) {
+ producer.newMessage().value(message).send();
+ } else {
+ producer.newMessage().value(message).sendAsync();
+ }
+ }
+ Awaitility.await().untilAsserted(() -> {
+ if (enabledBatchSend) {
+ assertEquals(consumer.numMessagesInQueue(), 40);
+ } else {
+ assertEquals(consumer.numMessagesInQueue(), 21);
+ }
+ });
+
+ // Negative ack and verify result/
+ Message<byte[]> receive1 = consumer.receive();
+ consumer.pause();
+ consumer.negativeAcknowledge(receive1);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(consumer.numMessagesInQueue(), 20);
+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
+ });
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
@Test
public void testBatchMessageMultiNegtiveAck() throws Exception{
final String topicName =
"persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d5bf2619b7f..7186bfd3fb1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2812,27 +2812,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
- Message<T> peek = incomingMessages.peek();
- if (peek != null) {
- MessageIdAdv messageId =
MessageIdAdvUtils.discardBatch(peek.getMessageId());
- if (!messageIds.contains(messageId)) {
- // first message is not expired, then no message is expired in
queue.
- return 0;
- }
-
- // try not to remove elements that are added while we remove
- Message<T> message = incomingMessages.poll();
- while (message != null) {
- decreaseIncomingMessageSize(message);
- messagesFromQueue++;
- MessageIdAdv id =
MessageIdAdvUtils.discardBatch(message.getMessageId());
- if (!messageIds.contains(id)) {
- messageIds.add(id);
- break;
- }
- message.release();
- message = incomingMessages.poll();
+ Message<T> message;
+ while (true) {
+ message = incomingMessages.pollIf(msg -> {
+ MessageId idPolled =
MessageIdAdvUtils.discardBatch(msg.getMessageId());
+ return messageIds.contains(idPolled);
+ });
+ if (message == null) {
+ break;
}
+ decreaseIncomingMessageSize(message);
+ messagesFromQueue++;
+ message.release();
}
return messagesFromQueue;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index 467a455ed8b..94bfad1fbd2 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import javax.annotation.Nullable;
/**
@@ -83,10 +84,17 @@ public class GrowableArrayBlockingQueue<T> extends
AbstractQueue<T> implements B
@Override
public T poll() {
+ return pollIf(v -> true);
+ }
+
+ public T pollIf(Predicate<T> predicate) {
headLock.lock();
try {
if (SIZE_UPDATER.get(this) > 0) {
T item = data[headIndex.value];
+ if (!predicate.test(item)) {
+ return null;
+ }
data[headIndex.value] = null;
headIndex.value = (headIndex.value + 1) & (data.length - 1);
SIZE_UPDATER.decrementAndGet(this);