sijie closed pull request #2224: Fixed ack message tracker cleanup for single
message batches
URL: https://github.com/apache/incubator-pulsar/pull/2224
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 0b5126b00f..da53760d2b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
@@ -30,6 +32,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -386,4 +389,33 @@ public void testCheckUnAcknowledgedMessageTimer() throws
PulsarClientException,
assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
}
+
+ @Test
+ public void testSingleMessageBatch() throws Exception {
+ String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+ .create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("subscription")
+ .ackTimeout(1, TimeUnit.HOURS)
+ .subscribe();
+
+ // Force the creation of a batch with a single message
+ producer.sendAsync("hello");
+ producer.flush();
+
+ Message<String> message = consumer.receive();
+
+ assertFalse(((ConsumerImpl<?>)
consumer).getUnAckedMessageTracker().isEmpty());
+
+ consumer.acknowledge(message);
+
+ assertTrue(((ConsumerImpl<?>)
consumer).getUnAckedMessageTracker().isEmpty());
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f781196d5..1c69c754a8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -385,11 +385,6 @@ boolean markAckForBatchMessage(BatchMessageIdImpl
batchMessageId, AckType ackTyp
log.debug("[{}] [{}] can ack message to broker {}, acktype {},
cardinality {}, length {}", subscription,
consumerName, batchMessageId, ackType,
outstandingAcks, batchSize);
}
- // increment Acknowledge-msg counter with number of messages in
batch only if AckType is Individual.
- // CumulativeAckType is handled while sending ack to broker
- if (ackType == AckType.Individual) {
- stats.incrementNumAcksSent(batchSize);
- }
return true;
} else {
if (AckType.Cumulative == ackType
@@ -433,11 +428,16 @@ boolean markAckForBatchMessage(BatchMessageIdImpl
batchMessageId, AckType ackTyp
Map<String,Long>
properties) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
-
if (ackType == AckType.Individual) {
- unAckedMessageTracker.remove(msgId);
- // increment counter by 1 for non-batch msg
- if (!(messageId instanceof BatchMessageIdImpl)) {
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
+
+ stats.incrementNumAcksSent(batchMessageId.getBatchSize());
+ unAckedMessageTracker.remove(new
MessageIdImpl(batchMessageId.getLedgerId(),
+ batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
+ } else {
+ // increment counter by 1 for non-batch msg
+ unAckedMessageTracker.remove(msgId);
stats.incrementNumAcksSent(1);
}
} else if (ackType == AckType.Cumulative) {
@@ -717,16 +717,16 @@ void messageReceived(MessageIdData messageId, ByteBuf
headersAndPayload, ClientC
}
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId,
msgMetadata, payload, cnx);
-
+
boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
-
+
if (decryptedPayload == null) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
}
-
+
// uncompress decryptedPayload and release decryptedPayload-ByteBuf
- ByteBuf uncompressedPayload = isMessageUndecryptable ?
decryptedPayload.retain()
+ ByteBuf uncompressedPayload = isMessageUndecryptable ?
decryptedPayload.retain()
: uncompressPayloadIfNeeded(messageId, msgMetadata,
decryptedPayload, cnx);
decryptedPayload.release();
if (uncompressedPayload == null) {
@@ -1348,7 +1348,7 @@ private boolean isMessageUndecryptable(MessageMetadata
msgMetadata) {
/**
* Create EncryptionContext if message payload is encrypted
- *
+ *
* @param msgMetadata
* @return {@link Optional}<{@link EncryptionContext}>
*/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services