This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6619679  Fix average size of messages included in partitioned stats 
(#2637)
6619679 is described below

commit 6619679124731ceaf0e5fc7ad011d5c8342ff156
Author: massakam <massa...@yahoo-corp.jp>
AuthorDate: Tue Sep 25 11:03:21 2018 +0900

    Fix average size of messages included in partitioned stats (#2637)
    
    Stats of partitioned topic is the simple sum of the stats of all the 
partitions.
    
https://github.com/apache/pulsar/blob/24605d328357e89ae107b549a6f0da2ce1683857/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L607-L612
    
    So, `averageMsgSize` included in the stats is the sum of `averageMsgSize` 
of all the partitions. I will fix it to the correct average value.
---
 .../common/policies/data/PublisherStats.java       |  6 +++-
 .../pulsar/common/policies/data/TopicStats.java    |  7 +++-
 .../policies/data/PersistentTopicStatsTest.java    | 41 +++++++++++++++++++++-
 .../common/policies/data/PublisherStatsTest.java   | 22 ++++++++++++
 4 files changed, 73 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
index 977ac9b..7514b2a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
@@ -25,6 +25,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 /**
  */
 public class PublisherStats {
+    private int count;
+
     /** Total rate of messages published by this publisher. msg/s */
     public double msgRateIn;
 
@@ -64,9 +66,11 @@ public class PublisherStats {
 
     public PublisherStats add(PublisherStats stats) {
         checkNotNull(stats);
+        this.count++;
         this.msgRateIn += stats.msgRateIn;
         this.msgThroughputIn += stats.msgThroughputIn;
-        this.averageMsgSize += stats.averageMsgSize;
+        double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + 
stats.averageMsgSize) / this.count;
+        this.averageMsgSize = newAverageMsgSize;
         return this;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 4487e6e..4d0b1f5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -29,6 +29,8 @@ import com.google.common.collect.Maps;
 /**
  */
 public class TopicStats {
+    private int count;
+
     /** Total rate of messages published on the topic. msg/s */
     public double msgRateIn;
 
@@ -65,6 +67,7 @@ public class TopicStats {
     }
 
     public void reset() {
+        this.count = 0;
         this.msgRateIn = 0;
         this.msgThroughputIn = 0;
         this.msgRateOut = 0;
@@ -81,11 +84,13 @@ public class TopicStats {
     // stats
     public TopicStats add(TopicStats stats) {
         checkNotNull(stats);
+        this.count++;
         this.msgRateIn += stats.msgRateIn;
         this.msgThroughputIn += stats.msgThroughputIn;
         this.msgRateOut += stats.msgRateOut;
         this.msgThroughputOut += stats.msgThroughputOut;
-        this.averageMsgSize += stats.averageMsgSize;
+        double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + 
stats.averageMsgSize) / this.count;
+        this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
         if (this.publishers.size() != stats.publishers.size()) {
             for (int i = 0; i < stats.publishers.size(); i++) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index ec3be1c..cdbfcd5 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -63,4 +63,43 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.replication.size(), 0);
     }
 
-}
\ No newline at end of file
+    @Test
+    public void testPersistentTopicStatsAggregation() {
+        TopicStats topicStats1 = new TopicStats();
+        topicStats1.msgRateIn = 1;
+        topicStats1.msgThroughputIn = 1;
+        topicStats1.msgRateOut = 1;
+        topicStats1.msgThroughputOut = 1;
+        topicStats1.averageMsgSize = 1;
+        topicStats1.storageSize = 1;
+        topicStats1.publishers.add(new PublisherStats());
+        topicStats1.subscriptions.put("test_ns", new SubscriptionStats());
+        topicStats1.replication.put("test_ns", new ReplicatorStats());
+
+        TopicStats topicStats2 = new TopicStats();
+        topicStats2.msgRateIn = 1;
+        topicStats2.msgThroughputIn = 2;
+        topicStats2.msgRateOut = 3;
+        topicStats2.msgThroughputOut = 4;
+        topicStats2.averageMsgSize = 5;
+        topicStats2.storageSize = 6;
+        topicStats2.publishers.add(new PublisherStats());
+        topicStats2.subscriptions.put("test_ns", new SubscriptionStats());
+        topicStats2.replication.put("test_ns", new ReplicatorStats());
+
+        TopicStats target = new TopicStats();
+        target.add(topicStats1);
+        target.add(topicStats2);
+
+        assertEquals(target.msgRateIn, 2.0);
+        assertEquals(target.msgThroughputIn, 3.0);
+        assertEquals(target.msgRateOut, 4.0);
+        assertEquals(target.msgThroughputOut, 5.0);
+        assertEquals(target.averageMsgSize, 3.0);
+        assertEquals(target.storageSize, 7);
+        assertEquals(target.publishers.size(), 1);
+        assertEquals(target.subscriptions.size(), 1);
+        assertEquals(target.replication.size(), 1);
+    }
+
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java
index 7b30591..2ab08d4 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java
@@ -72,4 +72,26 @@ public class PublisherStatsTest {
         Assert.assertNull(stats.getConnectedSince());
         Assert.assertNull(stats.getClientVersion());
     }
+
+    @Test
+    public void testPublisherStatsAggregation() {
+        PublisherStats stats1 = new PublisherStats();
+        stats1.msgRateIn = 1;
+        stats1.msgThroughputIn = 1;
+        stats1.averageMsgSize = 1;
+
+        PublisherStats stats2 = new PublisherStats();
+        stats2.msgRateIn = 1;
+        stats2.msgThroughputIn = 2;
+        stats2.averageMsgSize = 3;
+
+        PublisherStats target = new PublisherStats();
+        target.add(stats1);
+        target.add(stats2);
+
+        Assert.assertEquals(target.msgRateIn, 2.0);
+        Assert.assertEquals(target.msgThroughputIn, 3.0);
+        Assert.assertEquals(target.averageMsgSize, 2.0);
+    }
+
 }

Reply via email to