This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 70fec148ab1379a4e6692641685d592bb1d9674c Author: dockerzhang <[email protected]> AuthorDate: Wed Dec 16 01:39:55 2020 +0800 consumer support update stats with specified stats (#8951) Fixes #8949 ### Motivation reuse pulsar metric for KoP ### Modifications expose updateStats with ConsumerStats param, which could update stats specified stats. ### Verifying this change - [√] Make sure that the change passes the CI checks. (cherry picked from commit fcbc7ebe81da64c0e434254f421039c27dbc6058) --- .../org/apache/pulsar/broker/service/Consumer.java | 12 ++++++++++ .../pulsar/broker/stats/ConsumerStatsTest.java | 27 ++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 49b0201..263d722 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -570,6 +570,18 @@ public class Consumer { stats.chuckedMessageRate = chuckedMessageRate.getRate(); } + public void updateStats(ConsumerStats consumerStats) { + msgOutCounter.add(consumerStats.msgOutCounter); + bytesOutCounter.add(consumerStats.bytesOutCounter); + msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter); + lastAckedTimestamp = consumerStats.lastAckedTimestamp; + lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; + MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits); + unackedMessages = consumerStats.unackedMessages; + blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs; + AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry); + } + public ConsumerStats getStats() { stats.msgOutCounter = msgOutCounter.longValue(); stats.bytesOutCounter = bytesOutCounter.longValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 8418b2e..acf8e65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -26,11 +26,14 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.List; import java.util.concurrent.TimeUnit; @Slf4j @@ -140,4 +143,28 @@ public class ConsumerStatsTest extends ProducerConsumerBase { } } + @Test + public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription"; + pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscription") + .subscribe(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + Assert.assertNotNull(topicRef); + Assert.assertEquals(topicRef.getSubscriptions().size(), 1); + List<org.apache.pulsar.broker.service.Consumer> consumers = topicRef.getSubscriptions() + .get("my-subscription").getConsumers(); + Assert.assertEquals(consumers.size(), 1); + ConsumerStats consumerStats = new ConsumerStats(); + consumerStats.msgOutCounter = 10; + consumerStats.bytesOutCounter = 1280; + consumers.get(0).updateStats(consumerStats); + ConsumerStats updatedStats = consumers.get(0).getStats(); + + Assert.assertEquals(updatedStats.msgOutCounter, 10); + Assert.assertEquals(updatedStats.bytesOutCounter, 1280); + } }
