This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3c2e3e2 Fixed ack message tracker cleanup for single message batches
(#2224)
3c2e3e2 is described below
commit 3c2e3e2e3fc3d2cce75dd3e42a77c2d7b3bc0499
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jul 25 02:00:16 2018 -0700
Fixed ack message tracker cleanup for single message batches (#2224)
### Motivation
Fixes #2221
The issue is that we were removing a batch message id impl from the
tracking set which contains the batch-wide ids.
---
.../impl/UnAcknowledgedMessagesTimeoutTest.java | 32 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 28 +++++++++----------
2 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 0b5126b..da53760 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
@@ -30,6 +32,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -386,4 +389,33 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
}
+
+ @Test
+ public void testSingleMessageBatch() throws Exception {
+ String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+ .create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("subscription")
+ .ackTimeout(1, TimeUnit.HOURS)
+ .subscribe();
+
+ // Force the creation of a batch with a single message
+ producer.sendAsync("hello");
+ producer.flush();
+
+ Message<String> message = consumer.receive();
+
+ assertFalse(((ConsumerImpl<?>)
consumer).getUnAckedMessageTracker().isEmpty());
+
+ consumer.acknowledge(message);
+
+ assertTrue(((ConsumerImpl<?>)
consumer).getUnAckedMessageTracker().isEmpty());
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f78119..1c69c75 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -385,11 +385,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.debug("[{}] [{}] can ack message to broker {}, acktype {},
cardinality {}, length {}", subscription,
consumerName, batchMessageId, ackType,
outstandingAcks, batchSize);
}
- // increment Acknowledge-msg counter with number of messages in
batch only if AckType is Individual.
- // CumulativeAckType is handled while sending ack to broker
- if (ackType == AckType.Individual) {
- stats.incrementNumAcksSent(batchSize);
- }
return true;
} else {
if (AckType.Cumulative == ackType
@@ -433,11 +428,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
Map<String,Long>
properties) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
-
if (ackType == AckType.Individual) {
- unAckedMessageTracker.remove(msgId);
- // increment counter by 1 for non-batch msg
- if (!(messageId instanceof BatchMessageIdImpl)) {
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
+
+ stats.incrementNumAcksSent(batchMessageId.getBatchSize());
+ unAckedMessageTracker.remove(new
MessageIdImpl(batchMessageId.getLedgerId(),
+ batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()));
+ } else {
+ // increment counter by 1 for non-batch msg
+ unAckedMessageTracker.remove(msgId);
stats.incrementNumAcksSent(1);
}
} else if (ackType == AckType.Cumulative) {
@@ -717,16 +717,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId,
msgMetadata, payload, cnx);
-
+
boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
-
+
if (decryptedPayload == null) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
}
-
+
// uncompress decryptedPayload and release decryptedPayload-ByteBuf
- ByteBuf uncompressedPayload = isMessageUndecryptable ?
decryptedPayload.retain()
+ ByteBuf uncompressedPayload = isMessageUndecryptable ?
decryptedPayload.retain()
: uncompressPayloadIfNeeded(messageId, msgMetadata,
decryptedPayload, cnx);
decryptedPayload.release();
if (uncompressedPayload == null) {
@@ -1348,7 +1348,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
/**
* Create EncryptionContext if message payload is encrypted
- *
+ *
* @param msgMetadata
* @return {@link Optional}<{@link EncryptionContext}>
*/