This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 801ba85 Fix incoming message size issue that introduced in #9113
(#9182)
801ba85 is described below
commit 801ba85350f9a1c5dff9a7288c1f8fc9c22d8868
Author: lipenghui <[email protected]>
AuthorDate: Wed Jan 13 09:54:49 2021 +0800
Fix incoming message size issue that introduced in #9113 (#9182)
Fix incoming message size issue that introduced in #9113. We should
decrease the incoming message size when taking messages from the queue and
increase the incoming message size while adding messages to the queue. With
#9113, will always increase the incoming queue size.
Add method `increaseIncomingSize` and `decreaseIncomingSize`
Add a new test for verifying the incoming message size should be zero while
the incoming queue size is zero.
(cherry picked from commit 2cee0a8395c5deecf5584e30ac55c370161f2b45)
---
.../client/api/SimpleProducerConsumerTest.java | 55 ++++++++++++++++++++--
.../apache/pulsar/client/impl/ConsumerBase.java | 19 ++++++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +-
.../client/impl/MultiTopicsConsumerImpl.java | 12 ++---
4 files changed, 74 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 19d572aa..fa57f2d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.reflect.Reflection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -72,14 +71,13 @@ import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
@@ -98,6 +96,7 @@ import
org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -3557,4 +3556,54 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
Assert.assertTrue(producer.getLastDisconnectedTimestamp() > 0);
Assert.assertTrue(consumer.getLastDisconnectedTimestamp() > 0);
}
+
+ @DataProvider(name = "partitioned")
+ public static Object[] isPartitioned() {
+ return new Object[] {false, true};
+ }
+
+ @Test(dataProvider = "partitioned")
+ public void testIncomingMessageSize(boolean isPartitioned) throws
Exception {
+ final String topicName =
"persistent://my-property/my-ns/testIncomingMessageSize-" +
+ UUID.randomUUID().toString();
+ final String subName = "my-sub";
+
+ if (isPartitioned) {
+ admin.topics().createPartitionedTopic(topicName, 3);
+ }
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ final int messages = 100;
+ List<CompletableFuture<MessageId>> messageIds = new
ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+ messageIds.add(producer.newMessage().key(i + "").value(("Message-"
+ i).getBytes()).sendAsync());
+ }
+ FutureUtil.waitForAll(messageIds).get();
+
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ long size = ((ConsumerBase<byte[]>)
consumer).getIncomingMessageSize();
+ log.info("Check the incoming message size should greater that 0,
current size is {}", size);
+ Assert.assertTrue(size > 0);
+ });
+
+ for (int i = 0; i < messages; i++) {
+ consumer.acknowledge(consumer.receive());
+ }
+
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ long size = ((ConsumerBase<byte[]>)
consumer).getIncomingMessageSize();
+ log.info("Check the incoming message size should be 0, current
size is {}", size);
+ Assert.assertEquals(size, 0);
+ });
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index da5302c..88d7abd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.Map;
@@ -630,8 +631,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
- INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
- this, message.getData() == null ? 0 :
message.getData().length);
+ increaseIncomingMessageSize(message);
}
return hasEnoughMessagesForBatchReceive();
}
@@ -641,7 +641,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 &&
incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
- || (batchReceivePolicy.getMaxNumBytes() > 0 &&
INCOMING_MESSAGES_SIZE_UPDATER.get(this) >=
batchReceivePolicy.getMaxNumBytes());
+ || (batchReceivePolicy.getMaxNumBytes() > 0 &&
getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes());
}
private void verifyConsumerState() throws PulsarClientException {
@@ -813,13 +813,22 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
return pendingBatchReceives != null && peekNextBatchReceive() != null;
}
+ protected void increaseIncomingMessageSize(final Message<?> message) {
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
+ this, message.getData() == null ? 0 :
message.getData().length);
+ }
+
protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}
- protected void updateIncomingMessageSize(final Message<?> message) {
+ protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
- (message.getData() != null) ? message.getData().length : 0);
+ (message.getData() != null) ? -message.getData().length : 0);
+ }
+
+ public long getIncomingMessageSize() {
+ return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 04c0427..7b73c23 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1438,7 +1438,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
stats.updateNumMsgsReceived(msg);
trackMessage(msg);
- updateIncomingMessageSize(msg);
+ decreaseIncomingMessageSize(msg);
}
protected void trackMessage(Message<?> msg) {
@@ -2115,7 +2115,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
- updateIncomingMessageSize(message);
+ decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdImpl id = getMessageIdImpl(message);
if (!messageIds.contains(id)) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index c602772..549eb23 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -285,7 +285,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
- updateIncomingMessageSize(msg);
+ decreaseIncomingMessageSize(msg);
}
private void resumeReceivingFromPausedConsumersIfNeeded() {
@@ -308,7 +308,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
Message<T> message;
try {
message = incomingMessages.take();
- updateIncomingMessageSize(message);
+ decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
@@ -324,7 +324,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
- updateIncomingMessageSize(message);
+ decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
}
@@ -365,7 +365,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
- updateIncomingMessageSize(msg);
+ decreaseIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
@@ -393,7 +393,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
} else {
- updateIncomingMessageSize(message);
+ decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
@@ -679,7 +679,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
Message<T> message = incomingMessages.poll();
checkState(message instanceof TopicMessageImpl);
while (message != null) {
- updateIncomingMessageSize(message);
+ decreaseIncomingMessageSize(message);
MessageId messageId = message.getMessageId();
if (!messageIds.contains(messageId)) {
messageIds.add(messageId);