This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new a764bbd2a7b [fix][monitor] Fix the partitioned publisher topic stat 
aggregation bug (#18807)
a764bbd2a7b is described below

commit a764bbd2a7b5c7053ba0f50ed1aa3197cba6e7de
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Jan 4 06:04:06 2023 -0800

    [fix][monitor] Fix the partitioned publisher topic stat aggregation bug 
(#18807)
    
    (cherry picked from commit 8790ed18fc037988b044fed18202a1f3b50f7c65)
---
 .../NonPersistentPartitionedTopicStatsImpl.java    |  1 +
 .../data/stats/NonPersistentTopicStatsImpl.java    | 31 +++++++++++-----------
 .../data/stats/PartitionedTopicStatsImpl.java      |  1 +
 .../common/policies/data/stats/TopicStatsImpl.java | 29 +++++++++++---------
 .../NonPersistentPartitionedTopicStatsTest.java    | 16 +++++++++--
 .../policies/data/PersistentTopicStatsTest.java    | 30 +++++++++++++++++----
 6 files changed, 73 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
index 2a9fe423284..2ba9682383f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats
 
 /**
  * Statistics for a non-persistent partitioned topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class NonPersistentPartitionedTopicStatsImpl extends 
NonPersistentTopicStatsImpl
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 23e603ea028..c798c00acec 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
 
 /**
  * Statistics for a non-persistent topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements 
NonPersistentTopicStats {
@@ -148,14 +149,14 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
-    // stats.
+    // stats. This stat addition is not thread-safe.
     public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
         NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
         Objects.requireNonNull(stats);
         super.add(stats);
         this.msgDropRate += stats.msgDropRate;
-
-        stats.getNonPersistentPublishers().forEach(s -> {
+        for (int index = 0; index < stats.getNonPersistentPublishers().size(); 
index++) {
+            NonPersistentPublisherStats s = 
stats.getNonPersistentPublishers().get(index);
             if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                 ((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishersMap
                         .computeIfAbsent(s.getProducerName(), key -> {
@@ -165,20 +166,20 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
                             return newStats;
                         })).add((NonPersistentPublisherStatsImpl) s);
             } else {
-                if (this.nonPersistentPublishers.size() != 
stats.getNonPersistentPublishers().size()) {
-                    for (int i = 0; i < 
stats.getNonPersistentPublishers().size(); i++) {
-                        NonPersistentPublisherStatsImpl newStats = new 
NonPersistentPublisherStatsImpl();
-                        newStats.setSupportsPartialProducer(false);
-                        
this.nonPersistentPublishers.add(newStats.add((NonPersistentPublisherStatsImpl) 
s));
-                    }
-                } else {
-                    for (int i = 0; i < 
stats.getNonPersistentPublishers().size(); i++) {
-                        ((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishers.get(i))
-                                .add((NonPersistentPublisherStatsImpl) s);
-                    }
+                // Add a non-persistent publisher stat entry to 
this.nonPersistentPublishers
+                // if this.nonPersistentPublishers.size() is smaller than
+                // the input stats.nonPersistentPublishers.size().
+                // Here, index == this.nonPersistentPublishers.size() means
+                // this.nonPersistentPublishers.size() is smaller than the 
input stats.nonPersistentPublishers.size()
+                if (index == this.nonPersistentPublishers.size()) {
+                    NonPersistentPublisherStatsImpl newStats = new 
NonPersistentPublisherStatsImpl();
+                    newStats.setSupportsPartialProducer(false);
+                    this.nonPersistentPublishers.add(newStats);
                 }
+                ((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishers.get(index))
+                        .add((NonPersistentPublisherStatsImpl) s);
             }
-        });
+        }
 
         if (this.getNonPersistentSubscriptions().size() != 
stats.getNonPersistentSubscriptions().size()) {
             for (String subscription : 
stats.getNonPersistentSubscriptions().keySet()) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
index 0b07dac85c3..8330a662898 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 
 /**
  * Statistics for a partitioned topic.
+ * This class is not thread-safe.
  */
 @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
 public class PartitionedTopicStatsImpl extends TopicStatsImpl implements 
PartitionedTopicStats {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index c90b625f486..49f88392417 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 
 /**
  * Statistics for a Pulsar topic.
+ * This class is not thread-safe.
  */
 @Data
 public class TopicStatsImpl implements TopicStats {
@@ -212,7 +213,7 @@ public class TopicStatsImpl implements TopicStats {
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
-    // stats.
+    // stats. This stat addition is not thread-safe.
     public TopicStatsImpl add(TopicStats ts) {
         TopicStatsImpl stats = (TopicStatsImpl) ts;
 
@@ -239,7 +240,8 @@ public class TopicStatsImpl implements TopicStats {
         this.abortedTxnCount = stats.abortedTxnCount;
         this.committedTxnCount = stats.committedTxnCount;
 
-        stats.getPublishers().forEach(s -> {
+        for (int index = 0; index < stats.getPublishers().size(); index++) {
+           PublisherStats s = stats.getPublishers().get(index);
            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
                    final PublisherStatsImpl newStats = new 
PublisherStatsImpl();
@@ -248,19 +250,20 @@ public class TopicStatsImpl implements TopicStats {
                    return newStats;
                }).add((PublisherStatsImpl) s);
            } else {
-               if (this.publishers.size() != stats.publishers.size()) {
-                   for (int i = 0; i < stats.publishers.size(); i++) {
-                       PublisherStatsImpl newStats = new PublisherStatsImpl();
-                       newStats.setSupportsPartialProducer(false);
-                       
this.publishers.add(newStats.add(stats.publishers.get(i)));
-                   }
-               } else {
-                   for (int i = 0; i < stats.publishers.size(); i++) {
-                       this.publishers.get(i).add(stats.publishers.get(i));
-                   }
+               // Add a publisher stat entry to this.publishers
+               // if this.publishers.size() is smaller than
+               // the input stats.publishers.size().
+               // Here, index == this.publishers.size() means
+               // this.publishers.size() is smaller than the input 
stats.publishers.size()
+               if (index == this.publishers.size()) {
+                   PublisherStatsImpl newStats = new PublisherStatsImpl();
+                   newStats.setSupportsPartialProducer(false);
+                   this.publishers.add(newStats);
                }
+               this.publishers.get(index)
+                       .add((PublisherStatsImpl) s);
            }
-        });
+        }
 
         if (this.subscriptions.size() != stats.subscriptions.size()) {
             for (String subscription : stats.subscriptions.keySet()) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
index 001eaec901f..eb52af4a54f 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
@@ -62,9 +62,11 @@ public class NonPersistentPartitionedTopicStatsTest {
     public void testPartitionedTopicStatsByNullProducerName() {
         final NonPersistentTopicStatsImpl topicStats1 = new 
NonPersistentTopicStatsImpl();
         final NonPersistentPublisherStatsImpl publisherStats1 = new 
NonPersistentPublisherStatsImpl();
+        publisherStats1.setMsgRateIn(1);
         publisherStats1.setSupportsPartialProducer(false);
         publisherStats1.setProducerName(null);
         final NonPersistentPublisherStatsImpl publisherStats2 = new 
NonPersistentPublisherStatsImpl();
+        publisherStats2.setMsgRateIn(2);
         publisherStats2.setSupportsPartialProducer(false);
         publisherStats2.setProducerName(null);
         topicStats1.addPublisher(publisherStats1);
@@ -76,15 +78,22 @@ public class NonPersistentPartitionedTopicStatsTest {
 
         final NonPersistentTopicStatsImpl topicStats2 = new 
NonPersistentTopicStatsImpl();
         final NonPersistentPublisherStatsImpl publisherStats3 = new 
NonPersistentPublisherStatsImpl();
+        publisherStats3.setMsgRateIn(3);
         publisherStats3.setSupportsPartialProducer(true);
         publisherStats3.setProducerName(null);
         final NonPersistentPublisherStatsImpl publisherStats4 = new 
NonPersistentPublisherStatsImpl();
+        publisherStats4.setMsgRateIn(4);
         publisherStats4.setSupportsPartialProducer(true);
         publisherStats4.setProducerName(null);
+        final NonPersistentPublisherStatsImpl publisherStats5 = new 
NonPersistentPublisherStatsImpl();
+        publisherStats5.setMsgRateIn(5);
+        publisherStats5.setSupportsPartialProducer(true);
+        publisherStats5.setProducerName(null);
         topicStats2.addPublisher(publisherStats3);
         topicStats2.addPublisher(publisherStats4);
+        topicStats2.addPublisher(publisherStats5);
 
-        assertEquals(topicStats2.getPublishers().size(), 2);
+        assertEquals(topicStats2.getPublishers().size(), 3);
         // when the producerName is null, fall back to false
         
assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
         
assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
@@ -93,6 +102,9 @@ public class NonPersistentPartitionedTopicStatsTest {
         target.add(topicStats1);
         target.add(topicStats2);
 
-        assertEquals(target.getPublishers().size(), 2);
+        assertEquals(target.getPublishers().size(), 3);
+        assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4);
+        assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6);
+        assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5);
     }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index 0da2b9f8abb..a23c422da61 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -87,9 +87,15 @@ public class PersistentTopicStatsTest {
         topicStats1.averageMsgSize = 1;
         topicStats1.storageSize = 1;
         final PublisherStatsImpl publisherStats1 = new PublisherStatsImpl();
+        publisherStats1.setMsgRateIn(1);
         publisherStats1.setSupportsPartialProducer(false);
         publisherStats1.setProducerName("name1");
+        final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
+        publisherStats2.setMsgRateIn(2);
+        publisherStats2.setSupportsPartialProducer(false);
+        publisherStats2.setProducerName("name2");
         topicStats1.addPublisher(publisherStats1);
+        topicStats1.addPublisher(publisherStats2);
         topicStats1.subscriptions.put("test_ns", new SubscriptionStatsImpl());
         topicStats1.replication.put("test_ns", new ReplicatorStatsImpl());
 
@@ -100,10 +106,21 @@ public class PersistentTopicStatsTest {
         topicStats2.msgThroughputOut = 4;
         topicStats2.averageMsgSize = 5;
         topicStats2.storageSize = 6;
-        final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
-        publisherStats2.setSupportsPartialProducer(false);
-        publisherStats2.setProducerName("name1");
-        topicStats2.addPublisher(publisherStats2);
+        final PublisherStatsImpl publisherStats3 = new PublisherStatsImpl();
+        publisherStats3.setMsgRateIn(3);
+        publisherStats3.setSupportsPartialProducer(false);
+        publisherStats3.setProducerName("name3");
+        final PublisherStatsImpl publisherStats4 = new PublisherStatsImpl();
+        publisherStats4.setMsgRateIn(4);
+        publisherStats4.setSupportsPartialProducer(false);
+        publisherStats4.setProducerName("name4");
+        final PublisherStatsImpl publisherStats5 = new PublisherStatsImpl();
+        publisherStats5.setMsgRateIn(5);
+        publisherStats5.setSupportsPartialProducer(false);
+        publisherStats5.setProducerName("name5");
+        topicStats2.addPublisher(publisherStats3);
+        topicStats2.addPublisher(publisherStats4);
+        topicStats2.addPublisher(publisherStats5);
         topicStats2.subscriptions.put("test_ns", new SubscriptionStatsImpl());
         topicStats2.replication.put("test_ns", new ReplicatorStatsImpl());
 
@@ -117,7 +134,10 @@ public class PersistentTopicStatsTest {
         assertEquals(target.msgThroughputOut, 5.0);
         assertEquals(target.averageMsgSize, 3.0);
         assertEquals(target.storageSize, 7);
-        assertEquals(target.getPublishers().size(), 1);
+        assertEquals(target.getPublishers().size(), 3);
+        assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4);
+        assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6);
+        assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5);
         assertEquals(target.subscriptions.size(), 1);
         assertEquals(target.replication.size(), 1);
     }

Reply via email to