This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8ebec884a0cfaea8da1dd776c3ef1c9b32560cbd Author: lipenghui <[email protected]> AuthorDate: Wed Jan 13 09:54:49 2021 +0800 Fix incoming message size issue that introduced in #9113 (#9182) ### Motivation Fix incoming message size issue that introduced in #9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With #9113, will always increase the incoming queue size. ### Modifications Add method `increaseIncomingSize` and `decreaseIncomingSize` ### Verifying this change Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero. (cherry picked from commit 2cee0a8395c5deecf5584e30ac55c370161f2b45) --- .../client/api/SimpleProducerConsumerTest.java | 51 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 19 +++++--- .../apache/pulsar/client/impl/ConsumerImpl.java | 4 +- .../client/impl/MultiTopicsConsumerImpl.java | 12 ++--- 4 files changed, 73 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 55dab8b..8eda0d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -77,6 +77,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -3695,4 +3696,54 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + + @DataProvider(name = "partitioned") + public static Object[] isPartitioned() { + return new Object[] {false, true}; + } + + @Test(dataProvider = "partitioned") + public void testIncomingMessageSize(boolean isPartitioned) throws Exception { + final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" + + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + if (isPartitioned) { + admin.topics().createPartitionedTopic(topicName, 3); + } + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + final int messages = 100; + List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should greater that 0, current size is {}", size); + Assert.assertTrue(size > 0); + }); + + for (int i = 0; i < messages; i++) { + consumer.acknowledge(consumer.receive()); + } + + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + }); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index c184f9a..dc4d2c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import java.util.Collections; import java.util.List; @@ -668,8 +669,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) { if (canEnqueueMessage(message) && incomingMessages.offer(message)) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet( - this, message.getData() == null ? 0 : message.getData().length); + increaseIncomingMessageSize(message); } return hasEnoughMessagesForBatchReceive(); } @@ -679,7 +679,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) - || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); + || (batchReceivePolicy.getMaxNumBytes() > 0 && getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes()); } private void verifyConsumerState() throws PulsarClientException { @@ -851,13 +851,22 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T return pendingBatchReceives != null && peekNextBatchReceive() != null; } + protected void increaseIncomingMessageSize(final Message<?> message) { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet( + this, message.getData() == null ? 0 : message.getData().length); + } + protected void resetIncomingMessageSize() { INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); } - protected void updateIncomingMessageSize(final Message<?> message) { + protected void decreaseIncomingMessageSize(final Message<?> message) { INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, - (message.getData() != null) ? message.getData().length : 0); + (message.getData() != null) ? -message.getData().length : 0); + } + + public long getIncomingMessageSize() { + return INCOMING_MESSAGES_SIZE_UPDATER.get(this); } protected abstract void completeOpBatchReceive(OpBatchReceive<T> op); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b5c3c01..0473e23 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1528,7 +1528,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle stats.updateNumMsgsReceived(msg); trackMessage(msg); - updateIncomingMessageSize(msg); + decreaseIncomingMessageSize(msg); } protected void trackMessage(Message<?> msg) { @@ -2222,7 +2222,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // try not to remove elements that are added while we remove Message<T> message = incomingMessages.poll(); while (message != null) { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); messagesFromQueue++; MessageIdImpl id = getMessageIdImpl(message); if (!messageIds.contains(id)) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f59c2c6..4149b39 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -311,7 +311,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @Override protected synchronized void messageProcessed(Message<?> msg) { unAckedMessageTracker.add(msg.getMessageId()); - updateIncomingMessageSize(msg); + decreaseIncomingMessageSize(msg); } private void resumeReceivingFromPausedConsumersIfNeeded() { @@ -334,7 +334,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { Message<T> message; try { message = incomingMessages.take(); - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -350,7 +350,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { try { message = incomingMessages.poll(timeout, unit); if (message != null) { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); } @@ -391,7 +391,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { while (msgPeeked != null && messages.canAdd(msgPeeked)) { Message<T> msg = incomingMessages.poll(); if (msg != null) { - updateIncomingMessageSize(msg); + decreaseIncomingMessageSize(msg); Message<T> interceptMsg = beforeConsume(msg); messages.add(interceptMsg); } @@ -419,7 +419,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -784,7 +784,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { Message<T> message = incomingMessages.poll(); checkState(message instanceof TopicMessageImpl); while (message != null) { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); MessageId messageId = message.getMessageId(); if (!messageIds.contains(messageId)) { messageIds.add(messageId);
