sijie closed pull request #2224: Fixed ack message tracker cleanup for single 
message batches
URL: https://github.com/apache/incubator-pulsar/pull/2224
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 0b5126b00f..da53760d2b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +32,7 @@
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -386,4 +389,33 @@ public void testCheckUnAcknowledgedMessageTimer() throws 
PulsarClientException,
         assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
     }
 
+
+    @Test
+    public void testSingleMessageBatch() throws Exception {
+        String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("subscription")
+                .ackTimeout(1, TimeUnit.HOURS)
+                .subscribe();
+
+        // Force the creation of a batch with a single message
+        producer.sendAsync("hello");
+        producer.flush();
+
+        Message<String> message = consumer.receive();
+
+        assertFalse(((ConsumerImpl<?>) 
consumer).getUnAckedMessageTracker().isEmpty());
+
+        consumer.acknowledge(message);
+
+        assertTrue(((ConsumerImpl<?>) 
consumer).getUnAckedMessageTracker().isEmpty());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f781196d5..1c69c754a8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -385,11 +385,6 @@ boolean markAckForBatchMessage(BatchMessageIdImpl 
batchMessageId, AckType ackTyp
                 log.debug("[{}] [{}] can ack message to broker {}, acktype {}, 
cardinality {}, length {}", subscription,
                         consumerName, batchMessageId, ackType, 
outstandingAcks, batchSize);
             }
-            // increment Acknowledge-msg counter with number of messages in 
batch only if AckType is Individual.
-            // CumulativeAckType is handled while sending ack to broker
-            if (ackType == AckType.Individual) {
-                stats.incrementNumAcksSent(batchSize);
-            }
             return true;
         } else {
             if (AckType.Cumulative == ackType
@@ -433,11 +428,16 @@ boolean markAckForBatchMessage(BatchMessageIdImpl 
batchMessageId, AckType ackTyp
                                                     Map<String,Long> 
properties) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
 
-
         if (ackType == AckType.Individual) {
-            unAckedMessageTracker.remove(msgId);
-            // increment counter by 1 for non-batch msg
-            if (!(messageId instanceof BatchMessageIdImpl)) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
+
+                stats.incrementNumAcksSent(batchMessageId.getBatchSize());
+                unAckedMessageTracker.remove(new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+            } else {
+                // increment counter by 1 for non-batch msg
+                unAckedMessageTracker.remove(msgId);
                 stats.incrementNumAcksSent(1);
             }
         } else if (ackType == AckType.Cumulative) {
@@ -717,16 +717,16 @@ void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientC
         }
 
         ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, 
msgMetadata, payload, cnx);
-        
+
         boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
-        
+
         if (decryptedPayload == null) {
             // Message was discarded or CryptoKeyReader isn't implemented
             return;
         }
-        
+
         // uncompress decryptedPayload and release decryptedPayload-ByteBuf
-        ByteBuf uncompressedPayload = isMessageUndecryptable ? 
decryptedPayload.retain() 
+        ByteBuf uncompressedPayload = isMessageUndecryptable ? 
decryptedPayload.retain()
                 : uncompressPayloadIfNeeded(messageId, msgMetadata, 
decryptedPayload, cnx);
         decryptedPayload.release();
         if (uncompressedPayload == null) {
@@ -1348,7 +1348,7 @@ private boolean isMessageUndecryptable(MessageMetadata 
msgMetadata) {
 
     /**
      * Create EncryptionContext if message payload is encrypted
-     * 
+     *
      * @param msgMetadata
      * @return {@link Optional}<{@link EncryptionContext}>
      */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to