This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 70fec148ab1379a4e6692641685d592bb1d9674c
Author: dockerzhang <[email protected]>
AuthorDate: Wed Dec 16 01:39:55 2020 +0800

    consumer support update stats with specified stats (#8951)
    
    
    
    
    Fixes #8949
    
    ### Motivation
    reuse pulsar metric for KoP
    
    ### Modifications
    
    expose updateStats with ConsumerStats param, which could update stats 
specified stats.
    
    ### Verifying this change
    
    - [√] Make sure that the change passes the CI checks.
    
    (cherry picked from commit fcbc7ebe81da64c0e434254f421039c27dbc6058)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 12 ++++++++++
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 27 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 49b0201..263d722 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -570,6 +570,18 @@ public class Consumer {
         stats.chuckedMessageRate = chuckedMessageRate.getRate();
     }
 
+    public void updateStats(ConsumerStats consumerStats) {
+        msgOutCounter.add(consumerStats.msgOutCounter);
+        bytesOutCounter.add(consumerStats.bytesOutCounter);
+        msgOut.recordMultipleEvents(consumerStats.msgOutCounter, 
consumerStats.bytesOutCounter);
+        lastAckedTimestamp = consumerStats.lastAckedTimestamp;
+        lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
+        MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
+        unackedMessages = consumerStats.unackedMessages;
+        blockedConsumerOnUnackedMsgs = 
consumerStats.blockedConsumerOnUnackedMsgs;
+        AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
+    }
+
     public ConsumerStats getStats() {
         stats.msgOutCounter = msgOutCounter.longValue();
         stats.bytesOutCounter = bytesOutCounter.longValue();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 8418b2e..acf8e65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -26,11 +26,14 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -140,4 +143,28 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testUpdateStatsForActiveConsumerAndSubscription() throws 
Exception {
+        final String topicName = 
"persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription";
+        pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        Assert.assertNotNull(topicRef);
+        Assert.assertEquals(topicRef.getSubscriptions().size(), 1);
+        List<org.apache.pulsar.broker.service.Consumer> consumers = 
topicRef.getSubscriptions()
+                .get("my-subscription").getConsumers();
+        Assert.assertEquals(consumers.size(), 1);
+        ConsumerStats consumerStats = new ConsumerStats();
+        consumerStats.msgOutCounter = 10;
+        consumerStats.bytesOutCounter = 1280;
+        consumers.get(0).updateStats(consumerStats);
+        ConsumerStats updatedStats = consumers.get(0).getStats();
+
+        Assert.assertEquals(updatedStats.msgOutCounter, 10);
+        Assert.assertEquals(updatedStats.bytesOutCounter, 1280);
+    }
 }

Reply via email to