This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b961c4906f6 [fix] [broker] Enabling batch causes negative 
unackedMessages due to ack and delivery concurrency (#22090)
b961c4906f6 is described below

commit b961c4906f6992f298e73f7200f85bab5f148a7e
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 26 11:06:01 2024 +0800

    [fix] [broker] Enabling batch causes negative unackedMessages due to ack 
and delivery concurrency (#22090)
    
    (cherry picked from commit 1b1cfb58f4e64c91a311950d002b0c755bd9604f)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../BatchMessageWithBatchIndexLevelTest.java       | 182 +++++++++++++++++++++
 2 files changed, 183 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 5c84ccf64fd..d8ed99bb874 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -327,7 +327,7 @@ public class Consumer {
                 if (pendingAcks != null) {
                     int batchSize = batchSizes.getBatchSize(i);
                     int stickyKeyHash = getStickyKeyHash(entry);
-                    long[] ackSet = 
getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                    long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
                     if (ackSet != null) {
                         unackedMessages -= (batchSize - 
BitSet.valueOf(ackSet).cardinality());
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index b2fbe824b33..3a4cee7f2be 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -18,8 +18,17 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import com.carrotsearch.hppc.ObjectSet;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -28,10 +37,14 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
@@ -39,8 +52,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -401,4 +416,171 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         assertEquals(admin.topics().getStats(topicName).getSubscriptions()
                 .get("sub").getUnackedMessages(), 0);
     }
+
+    @Test
+    public void testUnAckMessagesWhenConcurrentDeliveryAndAck() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+        final String subName = "s1";
+        final int receiverQueueSize = 500;
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subName, 
MessageId.earliest);
+        ConsumerBuilder<String> consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .receiverQueueSize(receiverQueueSize)
+                .subscriptionName(subName)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true);
+
+        // Send 100 messages.
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        CompletableFuture<MessageId> lastSent = null;
+        for (int i = 0; i < 100; i++) {
+            lastSent = producer.sendAsync(i + "");
+        }
+        producer.flush();
+        lastSent.join();
+
+        // When consumer1 is closed, may some messages are in the client 
memory(it they are being acked now).
+        Consumer<String> consumer1 = 
consumerBuilder.consumerName("c1").subscribe();
+        Message[] messagesInClientMemory = new Message[2];
+        for (int i = 0; i < 2; i++) {
+            Message msg = consumer1.receive(2, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            messagesInClientMemory[i] = msg;
+        }
+        ConsumerImpl<String> consumer2 = (ConsumerImpl<String>) 
consumerBuilder.consumerName("c2").subscribe();
+        Awaitility.await().until(() -> consumer2.isConnected());
+
+        // The consumer2 will receive messages after consumer1 closed.
+        // Insert a delay mechanism to make the flow like below:
+        //  1. Close consumer1, then the 100 messages will be redelivered.
+        //  2. Read redeliver messages. No messages were acked at this time.
+        //  3. The in-flight ack of two messages is finished.
+        //  4. Send the messages to consumer2, consumer2 will get all the 100 
messages.
+        CompletableFuture<Void> receiveMessageSignal2 = new 
CompletableFuture<>();
+        org.apache.pulsar.broker.service.Consumer serviceConsumer2 =
+                makeConsumerReceiveMessagesDelay(topicName, subName, "c2", 
receiveMessageSignal2);
+        // step 1: close consumer.
+        consumer1.close();
+        // step 2: wait for read messages from replay queue.
+        Thread.sleep(2 * 1000);
+        // step 3: wait for the in-flight ack.
+        BitSetRecyclable bitSetRecyclable = createBitSetRecyclable(100);
+        long ledgerId = 0, entryId = 0;
+        for (Message message : messagesInClientMemory) {
+            BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
message.getMessageId();
+            bitSetRecyclable.clear(msgId.getBatchIndex());
+            ledgerId = msgId.getLedgerId();
+            entryId = msgId.getEntryId();
+        }
+        getCursor(topicName, subName).delete(PositionImpl.get(ledgerId, 
entryId, bitSetRecyclable.toLongArray()));
+        // step 4: send messages to consumer2.
+        receiveMessageSignal2.complete(null);
+        // Verify: Consumer2 will get all the 100 messages, and 
"unAckMessages" is 100.
+        List<Message> messages2 = new ArrayList<>();
+        while (true) {
+            Message msg = consumer2.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            messages2.add(msg);
+        }
+        assertEquals(messages2.size(), 100);
+        assertEquals(serviceConsumer2.getUnackedMessages(), 100);
+        // After the messages were pop out, the permits in the client memory 
went to 100.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(serviceConsumer2.getAvailablePermits() + 
consumer2.getAvailablePermits(),
+                    receiverQueueSize);
+        });
+
+        // cleanup.
+        producer.close();
+        consumer2.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    private BitSetRecyclable createBitSetRecyclable(int batchSize) {
+        BitSetRecyclable bitSetRecyclable = new BitSetRecyclable(batchSize);
+        for (int i = 0; i < batchSize; i++) {
+            bitSetRecyclable.set(i);
+        }
+        return bitSetRecyclable;
+    }
+
+    private ManagedCursorImpl getCursor(String topic, String sub) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        PersistentDispatcherMultipleConsumers dispatcher =
+                (PersistentDispatcherMultipleConsumers) 
persistentTopic.getSubscription(sub).getDispatcher();
+        return (ManagedCursorImpl) dispatcher.getCursor();
+    }
+
+    /***
+     * After {@param signal} complete, the consumer({@param consumerName}) 
start to receive messages.
+     */
+    private org.apache.pulsar.broker.service.Consumer 
makeConsumerReceiveMessagesDelay(String topic, String sub,
+                                                            String 
consumerName,
+                                                            
CompletableFuture<Void> signal) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        PersistentDispatcherMultipleConsumers dispatcher =
+                (PersistentDispatcherMultipleConsumers) 
persistentTopic.getSubscription(sub).getDispatcher();
+        org.apache.pulsar.broker.service.Consumer serviceConsumer = null;
+        for (org.apache.pulsar.broker.service.Consumer c : 
dispatcher.getConsumers()){
+            if (c.consumerName().equals(consumerName)) {
+                serviceConsumer = c;
+                break;
+            }
+        }
+        final org.apache.pulsar.broker.service.Consumer originalConsumer = 
serviceConsumer;
+
+        // Insert a delay signal.
+        org.apache.pulsar.broker.service.Consumer spyServiceConsumer = 
spy(originalConsumer);
+        doAnswer(invocation -> {
+            List<? extends Entry> entries = (List<? extends Entry>) 
invocation.getArguments()[0];
+            EntryBatchSizes batchSizes = (EntryBatchSizes) 
invocation.getArguments()[1];
+            EntryBatchIndexesAcks batchIndexesAcks = (EntryBatchIndexesAcks) 
invocation.getArguments()[2];
+            int totalMessages = (int) invocation.getArguments()[3];
+            long totalBytes = (long) invocation.getArguments()[4];
+            long totalChunkedMessages = (long) invocation.getArguments()[5];
+            RedeliveryTracker redeliveryTracker = (RedeliveryTracker) 
invocation.getArguments()[6];
+            return signal.thenApply(__ -> 
originalConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, 
totalMessages, totalBytes,
+                    totalChunkedMessages, redeliveryTracker)).join();
+        }).when(spyServiceConsumer)
+                .sendMessages(anyList(), any(), any(), anyInt(), anyLong(), 
anyLong(), any());
+        doAnswer(invocation -> {
+            List<? extends Entry> entries = (List<? extends Entry>) 
invocation.getArguments()[0];
+            EntryBatchSizes batchSizes = (EntryBatchSizes) 
invocation.getArguments()[1];
+            EntryBatchIndexesAcks batchIndexesAcks = (EntryBatchIndexesAcks) 
invocation.getArguments()[2];
+            int totalMessages = (int) invocation.getArguments()[3];
+            long totalBytes = (long) invocation.getArguments()[4];
+            long totalChunkedMessages = (long) invocation.getArguments()[5];
+            RedeliveryTracker redeliveryTracker = (RedeliveryTracker) 
invocation.getArguments()[6];
+            long epoch = (long) invocation.getArguments()[7];
+            return signal.thenApply(__ -> 
originalConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, 
totalMessages, totalBytes,
+                    totalChunkedMessages, redeliveryTracker, epoch)).join();
+        }).when(spyServiceConsumer)
+                .sendMessages(anyList(), any(), any(), anyInt(), anyLong(), 
anyLong(), any(), anyLong());
+
+        // Replace the consumer.
+        Field fConsumerList = 
AbstractDispatcherMultipleConsumers.class.getDeclaredField("consumerList");
+        Field fConsumerSet = 
AbstractDispatcherMultipleConsumers.class.getDeclaredField("consumerSet");
+        fConsumerList.setAccessible(true);
+        fConsumerSet.setAccessible(true);
+        List<org.apache.pulsar.broker.service.Consumer> consumerList =
+                (List<org.apache.pulsar.broker.service.Consumer>) 
fConsumerList.get(dispatcher);
+        ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet =
+                (ObjectSet<org.apache.pulsar.broker.service.Consumer>) 
fConsumerSet.get(dispatcher);
+
+        consumerList.remove(originalConsumer);
+        consumerSet.removeAll(originalConsumer);
+        consumerList.add(spyServiceConsumer);
+        consumerSet.add(spyServiceConsumer);
+        return originalConsumer;
+    }
 }

Reply via email to