This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 071e26da7b3 [fix] [client] fix huge permits if acked a half batched 
message (#22091)
071e26da7b3 is described below

commit 071e26da7b3a5b9e07d118098959bb38a81cf0af
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 26 22:45:55 2024 +0800

    [fix] [client] fix huge permits if acked a half batched message (#22091)
    
    (cherry picked from commit 0c49cac105ee391f327b5d85a02e69ab0a6310a6)
---
 .../BatchMessageWithBatchIndexLevelTest.java       | 85 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  5 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 ++-
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 3a4cee7f2be..8e902d5d1e7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -583,4 +583,89 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         consumerSet.add(spyServiceConsumer);
         return originalConsumer;
     }
+
+    /***
+     * 1. Send a batch message contains 100 single messages.
+     * 2. Ack 2 messages.
+     * 3. Redeliver the batch message and ack them.
+     * 4. Verify: the permits is correct.
+     */
+    @Test
+    public void testPermitsIfHalfAckBatchMessage() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+        final String subName = "s1";
+        final int receiverQueueSize = 1000;
+        final int ackedMessagesCountInTheFistStep = 2;
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics(). createSubscription(topicName, subName, 
MessageId.earliest);
+        ConsumerBuilder<String> consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .receiverQueueSize(receiverQueueSize)
+                .subscriptionName(subName)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true);
+
+        // Send 100 messages.
+        Producer<String>  producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        CompletableFuture<MessageId>  lastSent = null;
+        for (int i = 1;  i <=  100;  i++) {
+            lastSent = producer. sendAsync(i + "");
+        }
+        producer.flush();
+        lastSent.join();
+
+        // Ack 2 messages, and trigger a redelivery.
+        Consumer<String>  consumer1 = consumerBuilder.subscribe();
+        for (int i = 0;  i <  ackedMessagesCountInTheFistStep;  i++) {
+            Message msg = consumer1. receive(2, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer1.acknowledge(msg);
+        }
+        consumer1.close();
+
+        // Receive the left 98 messages, and ack them.
+        // Verify the permits is correct.
+        ConsumerImpl<String> consumer2 = (ConsumerImpl<String>) 
consumerBuilder.subscribe();
+        Awaitility.await().until(() ->  consumer2.isConnected());
+        List<MessageId>  messages = new ArrayList<>();
+        int nextMessageValue = ackedMessagesCountInTheFistStep + 1;
+        while (true) {
+            Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            assertEquals(msg.getValue(), nextMessageValue + "");
+            messages.add(msg.getMessageId());
+            nextMessageValue++;
+        }
+        assertEquals(messages.size(), 98);
+        consumer2.acknowledge(messages);
+
+        org.apache.pulsar.broker.service.Consumer serviceConsumer2 =
+                getTheUniqueServiceConsumer(topicName, subName);
+        Awaitility.await().untilAsserted(() ->  {
+            // After the messages were pop out, the permits in the client 
memory went to 98.
+            int permitsInClientMemory = consumer2.getAvailablePermits();
+            int permitsInBroker = serviceConsumer2.getAvailablePermits();
+            assertEquals(permitsInClientMemory + permitsInBroker, 
receiverQueueSize);
+        });
+
+        // cleanup.
+        producer.close();
+        consumer2.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    private org.apache.pulsar.broker.service.Consumer 
getTheUniqueServiceConsumer(String topic, String sub) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService(). getTopic(topic, 
false).join().get();
+        PersistentDispatcherMultipleConsumers dispatcher =
+                (PersistentDispatcherMultipleConsumers) 
persistentTopic.getSubscription(sub).getDispatcher();
+        return dispatcher.getConsumers().iterator().next();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index fec428824c2..67bddf525c6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
@@ -1266,6 +1267,10 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         return true;
     }
 
+    protected boolean isSingleMessageAcked(BitSetRecyclable ackBitSet, int 
batchIndex) {
+        return ackBitSet != null && !ackBitSet.get(batchIndex);
+    }
+
     public boolean hasBatchReceiveTimeout() {
         return batchReceiveTimeout != null;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index dba54c8d3a3..bfe8bd54849 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1181,7 +1181,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             }
 
-            if (ackBitSet != null && !ackBitSet.get(index)) {
+            if (isSingleMessageAcked(ackBitSet, index)) {
                 return null;
             }
 
@@ -1631,7 +1631,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         singleMessageMetadata, uncompressedPayload, 
batchMessage, schema, true,
                         ackBitSet, ackSetInMessageId, redeliveryCount, 
consumerEpoch);
                 if (message == null) {
-                    skippedMessages++;
+                    // If it is not in ackBitSet, it means Broker does not 
want to deliver it to the client, and
+                    // did not decrease the permits in the broker-side.
+                    // So do not acquire more permits for this message.
+                    // Why not skip this single message in the first line of 
for-loop block? We need call
+                    // "newSingleMessage" to move "payload.readerIndex" to a 
correct value to get the correct data.
+                    if (!isSingleMessageAcked(ackBitSet, i)) {
+                        skippedMessages++;
+                    }
                     continue;
                 }
                 if (possibleToDeadLetter != null) {

Reply via email to