This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7d86d183b0403ffecba870ede8c321f7881c8a46 Author: fengyubiao <[email protected]> AuthorDate: Mon Feb 10 09:44:07 2025 +0800 [fix] [client] call redeliver 1 msg but did 2 msgs (#23943) (cherry picked from commit 7a79c78f8e6f4b52f13be1c6441f4b007d9a00fe) --- .../BatchMessageWithBatchIndexLevelTest.java | 62 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 31 ++++------- .../collections/GrowableArrayBlockingQueue.java | 8 +++ 3 files changed, 81 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 21a843a3efc..7fa7bf078e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -59,6 +59,7 @@ import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -137,6 +138,67 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { }); } + @DataProvider + public Object[][] enabledBatchSend() { + return new Object[][] { + {false}, + {true} + }; + } + + @Test(dataProvider = "enabledBatchSend") + @SneakyThrows + public void testBatchMessageNAck(boolean enabledBatchSend) { + final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String subscriptionName = "s1"; + ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) + .subscriptionName(subscriptionName) + .receiverQueueSize(21) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .subscribe(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(enabledBatchSend) + .create(); + final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + final AbstractPersistentDispatcherMultipleConsumers dispatcher = + (AbstractPersistentDispatcherMultipleConsumers) topic.getSubscription(subscriptionName).getDispatcher(); + + // Send messages: 20 * 2. + for (int i = 0; i < 40; i++) { + byte[] message = ("batch-message-" + i).getBytes(); + if (i == 19 || i == 39) { + producer.newMessage().value(message).send(); + } else { + producer.newMessage().value(message).sendAsync(); + } + } + Awaitility.await().untilAsserted(() -> { + if (enabledBatchSend) { + assertEquals(consumer.numMessagesInQueue(), 40); + } else { + assertEquals(consumer.numMessagesInQueue(), 21); + } + }); + + // Negative ack and verify result/ + Message<byte[]> receive1 = consumer.receive(); + consumer.pause(); + consumer.negativeAcknowledge(receive1); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer.numMessagesInQueue(), 20); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20); + }); + + // cleanup. + producer.close(); + consumer.close(); + admin.topics().delete(topicName); + } + @Test public void testBatchMessageMultiNegtiveAck() throws Exception{ final String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 77a91a944ee..6f2ad9152d3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2831,27 +2831,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) { int messagesFromQueue = 0; - Message<T> peek = incomingMessages.peek(); - if (peek != null) { - MessageId messageId = NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId()); - if (!messageIds.contains(messageId)) { - // first message is not expired, then no message is expired in queue. - return 0; - } - - // try not to remove elements that are added while we remove - Message<T> message = incomingMessages.poll(); - while (message != null) { - decreaseIncomingMessageSize(message); - messagesFromQueue++; - MessageId id = NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId()); - if (!messageIds.contains(id)) { - messageIds.add(id); - break; - } - message.release(); - message = incomingMessages.poll(); + Message<T> message; + while (true) { + message = incomingMessages.pollIf(msg -> { + MessageId idPolled = NegativeAcksTracker.discardBatchAndPartitionIndex(msg.getMessageId()); + return messageIds.contains(idPolled); + }); + if (message == null) { + break; } + decreaseIncomingMessageSize(message); + messagesFromQueue++; + message.release(); } return messagesFromQueue; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java index 467a455ed8b..94bfad1fbd2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.StampedLock; import java.util.function.Consumer; +import java.util.function.Predicate; import javax.annotation.Nullable; /** @@ -83,10 +84,17 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B @Override public T poll() { + return pollIf(v -> true); + } + + public T pollIf(Predicate<T> predicate) { headLock.lock(); try { if (SIZE_UPDATER.get(this) > 0) { T item = data[headIndex.value]; + if (!predicate.test(item)) { + return null; + } data[headIndex.value] = null; headIndex.value = (headIndex.value + 1) & (data.length - 1); SIZE_UPDATER.decrementAndGet(this);
