This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6619679 Fix average size of messages included in partitioned stats (#2637) 6619679 is described below commit 6619679124731ceaf0e5fc7ad011d5c8342ff156 Author: massakam <massa...@yahoo-corp.jp> AuthorDate: Tue Sep 25 11:03:21 2018 +0900 Fix average size of messages included in partitioned stats (#2637) Stats of partitioned topic is the simple sum of the stats of all the partitions. https://github.com/apache/pulsar/blob/24605d328357e89ae107b549a6f0da2ce1683857/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L607-L612 So, `averageMsgSize` included in the stats is the sum of `averageMsgSize` of all the partitions. I will fix it to the correct average value. --- .../common/policies/data/PublisherStats.java | 6 +++- .../pulsar/common/policies/data/TopicStats.java | 7 +++- .../policies/data/PersistentTopicStatsTest.java | 41 +++++++++++++++++++++- .../common/policies/data/PublisherStatsTest.java | 22 ++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java index 977ac9b..7514b2a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java @@ -25,6 +25,8 @@ import static com.google.common.base.Preconditions.checkNotNull; /** */ public class PublisherStats { + private int count; + /** Total rate of messages published by this publisher. msg/s */ public double msgRateIn; @@ -64,9 +66,11 @@ public class PublisherStats { public PublisherStats add(PublisherStats stats) { checkNotNull(stats); + this.count++; this.msgRateIn += stats.msgRateIn; this.msgThroughputIn += stats.msgThroughputIn; - this.averageMsgSize += stats.averageMsgSize; + double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; + this.averageMsgSize = newAverageMsgSize; return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java index 4487e6e..4d0b1f5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java @@ -29,6 +29,8 @@ import com.google.common.collect.Maps; /** */ public class TopicStats { + private int count; + /** Total rate of messages published on the topic. msg/s */ public double msgRateIn; @@ -65,6 +67,7 @@ public class TopicStats { } public void reset() { + this.count = 0; this.msgRateIn = 0; this.msgThroughputIn = 0; this.msgRateOut = 0; @@ -81,11 +84,13 @@ public class TopicStats { // stats public TopicStats add(TopicStats stats) { checkNotNull(stats); + this.count++; this.msgRateIn += stats.msgRateIn; this.msgThroughputIn += stats.msgThroughputIn; this.msgRateOut += stats.msgRateOut; this.msgThroughputOut += stats.msgThroughputOut; - this.averageMsgSize += stats.averageMsgSize; + double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; + this.averageMsgSize = newAverageMsgSize; this.storageSize += stats.storageSize; if (this.publishers.size() != stats.publishers.size()) { for (int i = 0; i < stats.publishers.size(); i++) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java index ec3be1c..cdbfcd5 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java @@ -63,4 +63,43 @@ public class PersistentTopicStatsTest { assertEquals(topicStats.replication.size(), 0); } -} \ No newline at end of file + @Test + public void testPersistentTopicStatsAggregation() { + TopicStats topicStats1 = new TopicStats(); + topicStats1.msgRateIn = 1; + topicStats1.msgThroughputIn = 1; + topicStats1.msgRateOut = 1; + topicStats1.msgThroughputOut = 1; + topicStats1.averageMsgSize = 1; + topicStats1.storageSize = 1; + topicStats1.publishers.add(new PublisherStats()); + topicStats1.subscriptions.put("test_ns", new SubscriptionStats()); + topicStats1.replication.put("test_ns", new ReplicatorStats()); + + TopicStats topicStats2 = new TopicStats(); + topicStats2.msgRateIn = 1; + topicStats2.msgThroughputIn = 2; + topicStats2.msgRateOut = 3; + topicStats2.msgThroughputOut = 4; + topicStats2.averageMsgSize = 5; + topicStats2.storageSize = 6; + topicStats2.publishers.add(new PublisherStats()); + topicStats2.subscriptions.put("test_ns", new SubscriptionStats()); + topicStats2.replication.put("test_ns", new ReplicatorStats()); + + TopicStats target = new TopicStats(); + target.add(topicStats1); + target.add(topicStats2); + + assertEquals(target.msgRateIn, 2.0); + assertEquals(target.msgThroughputIn, 3.0); + assertEquals(target.msgRateOut, 4.0); + assertEquals(target.msgThroughputOut, 5.0); + assertEquals(target.averageMsgSize, 3.0); + assertEquals(target.storageSize, 7); + assertEquals(target.publishers.size(), 1); + assertEquals(target.subscriptions.size(), 1); + assertEquals(target.replication.size(), 1); + } + +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java index 7b30591..2ab08d4 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java @@ -72,4 +72,26 @@ public class PublisherStatsTest { Assert.assertNull(stats.getConnectedSince()); Assert.assertNull(stats.getClientVersion()); } + + @Test + public void testPublisherStatsAggregation() { + PublisherStats stats1 = new PublisherStats(); + stats1.msgRateIn = 1; + stats1.msgThroughputIn = 1; + stats1.averageMsgSize = 1; + + PublisherStats stats2 = new PublisherStats(); + stats2.msgRateIn = 1; + stats2.msgThroughputIn = 2; + stats2.averageMsgSize = 3; + + PublisherStats target = new PublisherStats(); + target.add(stats1); + target.add(stats2); + + Assert.assertEquals(target.msgRateIn, 2.0); + Assert.assertEquals(target.msgThroughputIn, 3.0); + Assert.assertEquals(target.averageMsgSize, 2.0); + } + }