This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 071e26da7b3 [fix] [client] fix huge permits if acked a half batched
message (#22091)
071e26da7b3 is described below
commit 071e26da7b3a5b9e07d118098959bb38a81cf0af
Author: fengyubiao <[email protected]>
AuthorDate: Mon Feb 26 22:45:55 2024 +0800
[fix] [client] fix huge permits if acked a half batched message (#22091)
(cherry picked from commit 0c49cac105ee391f327b5d85a02e69ab0a6310a6)
---
.../BatchMessageWithBatchIndexLevelTest.java | 85 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 5 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 ++-
3 files changed, 99 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 3a4cee7f2be..8e902d5d1e7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -583,4 +583,89 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
consumerSet.add(spyServiceConsumer);
return originalConsumer;
}
+
+ /***
+ * 1. Send a batch message contains 100 single messages.
+ * 2. Ack 2 messages.
+ * 3. Redeliver the batch message and ack them.
+ * 4. Verify: the permits is correct.
+ */
+ @Test
+ public void testPermitsIfHalfAckBatchMessage() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+ final String subName = "s1";
+ final int receiverQueueSize = 1000;
+ final int ackedMessagesCountInTheFistStep = 2;
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics(). createSubscription(topicName, subName,
MessageId.earliest);
+ ConsumerBuilder<String> consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .receiverQueueSize(receiverQueueSize)
+ .subscriptionName(subName)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(true);
+
+ // Send 100 messages.
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .create();
+ CompletableFuture<MessageId> lastSent = null;
+ for (int i = 1; i <= 100; i++) {
+ lastSent = producer. sendAsync(i + "");
+ }
+ producer.flush();
+ lastSent.join();
+
+ // Ack 2 messages, and trigger a redelivery.
+ Consumer<String> consumer1 = consumerBuilder.subscribe();
+ for (int i = 0; i < ackedMessagesCountInTheFistStep; i++) {
+ Message msg = consumer1. receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer1.acknowledge(msg);
+ }
+ consumer1.close();
+
+ // Receive the left 98 messages, and ack them.
+ // Verify the permits is correct.
+ ConsumerImpl<String> consumer2 = (ConsumerImpl<String>)
consumerBuilder.subscribe();
+ Awaitility.await().until(() -> consumer2.isConnected());
+ List<MessageId> messages = new ArrayList<>();
+ int nextMessageValue = ackedMessagesCountInTheFistStep + 1;
+ while (true) {
+ Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ assertEquals(msg.getValue(), nextMessageValue + "");
+ messages.add(msg.getMessageId());
+ nextMessageValue++;
+ }
+ assertEquals(messages.size(), 98);
+ consumer2.acknowledge(messages);
+
+ org.apache.pulsar.broker.service.Consumer serviceConsumer2 =
+ getTheUniqueServiceConsumer(topicName, subName);
+ Awaitility.await().untilAsserted(() -> {
+ // After the messages were pop out, the permits in the client
memory went to 98.
+ int permitsInClientMemory = consumer2.getAvailablePermits();
+ int permitsInBroker = serviceConsumer2.getAvailablePermits();
+ assertEquals(permitsInClientMemory + permitsInBroker,
receiverQueueSize);
+ });
+
+ // cleanup.
+ producer.close();
+ consumer2.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ private org.apache.pulsar.broker.service.Consumer
getTheUniqueServiceConsumer(String topic, String sub) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService(). getTopic(topic,
false).join().get();
+ PersistentDispatcherMultipleConsumers dispatcher =
+ (PersistentDispatcherMultipleConsumers)
persistentTopic.getSubscription(sub).getDispatcher();
+ return dispatcher.getConsumers().iterator().next();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index fec428824c2..67bddf525c6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
@@ -1266,6 +1267,10 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
return true;
}
+ protected boolean isSingleMessageAcked(BitSetRecyclable ackBitSet, int
batchIndex) {
+ return ackBitSet != null && !ackBitSet.get(batchIndex);
+ }
+
public boolean hasBatchReceiveTimeout() {
return batchReceiveTimeout != null;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index dba54c8d3a3..bfe8bd54849 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1181,7 +1181,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
}
- if (ackBitSet != null && !ackBitSet.get(index)) {
+ if (isSingleMessageAcked(ackBitSet, index)) {
return null;
}
@@ -1631,7 +1631,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
singleMessageMetadata, uncompressedPayload,
batchMessage, schema, true,
ackBitSet, ackSetInMessageId, redeliveryCount,
consumerEpoch);
if (message == null) {
- skippedMessages++;
+ // If it is not in ackBitSet, it means Broker does not
want to deliver it to the client, and
+ // did not decrease the permits in the broker-side.
+ // So do not acquire more permits for this message.
+ // Why not skip this single message in the first line of
for-loop block? We need call
+ // "newSingleMessage" to move "payload.readerIndex" to a
correct value to get the correct data.
+ if (!isSingleMessageAcked(ackBitSet, i)) {
+ skippedMessages++;
+ }
continue;
}
if (possibleToDeadLetter != null) {