This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new a764bbd2a7b [fix][monitor] Fix the partitioned publisher topic stat
aggregation bug (#18807)
a764bbd2a7b is described below
commit a764bbd2a7b5c7053ba0f50ed1aa3197cba6e7de
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Jan 4 06:04:06 2023 -0800
[fix][monitor] Fix the partitioned publisher topic stat aggregation bug
(#18807)
(cherry picked from commit 8790ed18fc037988b044fed18202a1f3b50f7c65)
---
.../NonPersistentPartitionedTopicStatsImpl.java | 1 +
.../data/stats/NonPersistentTopicStatsImpl.java | 31 +++++++++++-----------
.../data/stats/PartitionedTopicStatsImpl.java | 1 +
.../common/policies/data/stats/TopicStatsImpl.java | 29 +++++++++++---------
.../NonPersistentPartitionedTopicStatsTest.java | 16 +++++++++--
.../policies/data/PersistentTopicStatsTest.java | 30 +++++++++++++++++----
6 files changed, 73 insertions(+), 35 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
index 2a9fe423284..2ba9682383f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
@@ -27,6 +27,7 @@ import
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats
/**
* Statistics for a non-persistent partitioned topic.
+ * This class is not thread-safe.
*/
@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
public class NonPersistentPartitionedTopicStatsImpl extends
NonPersistentTopicStatsImpl
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 23e603ea028..c798c00acec 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
/**
* Statistics for a non-persistent topic.
+ * This class is not thread-safe.
*/
@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements
NonPersistentTopicStats {
@@ -148,14 +149,14 @@ public class NonPersistentTopicStatsImpl extends
TopicStatsImpl implements NonPe
}
// if the stats are added for the 1st time, we will need to make a copy of
these stats and add it to the current
- // stats.
+ // stats. This stat addition is not thread-safe.
public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
Objects.requireNonNull(stats);
super.add(stats);
this.msgDropRate += stats.msgDropRate;
-
- stats.getNonPersistentPublishers().forEach(s -> {
+ for (int index = 0; index < stats.getNonPersistentPublishers().size();
index++) {
+ NonPersistentPublisherStats s =
stats.getNonPersistentPublishers().get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
((NonPersistentPublisherStatsImpl)
this.nonPersistentPublishersMap
.computeIfAbsent(s.getProducerName(), key -> {
@@ -165,20 +166,20 @@ public class NonPersistentTopicStatsImpl extends
TopicStatsImpl implements NonPe
return newStats;
})).add((NonPersistentPublisherStatsImpl) s);
} else {
- if (this.nonPersistentPublishers.size() !=
stats.getNonPersistentPublishers().size()) {
- for (int i = 0; i <
stats.getNonPersistentPublishers().size(); i++) {
- NonPersistentPublisherStatsImpl newStats = new
NonPersistentPublisherStatsImpl();
- newStats.setSupportsPartialProducer(false);
-
this.nonPersistentPublishers.add(newStats.add((NonPersistentPublisherStatsImpl)
s));
- }
- } else {
- for (int i = 0; i <
stats.getNonPersistentPublishers().size(); i++) {
- ((NonPersistentPublisherStatsImpl)
this.nonPersistentPublishers.get(i))
- .add((NonPersistentPublisherStatsImpl) s);
- }
+ // Add a non-persistent publisher stat entry to
this.nonPersistentPublishers
+ // if this.nonPersistentPublishers.size() is smaller than
+ // the input stats.nonPersistentPublishers.size().
+ // Here, index == this.nonPersistentPublishers.size() means
+ // this.nonPersistentPublishers.size() is smaller than the
input stats.nonPersistentPublishers.size()
+ if (index == this.nonPersistentPublishers.size()) {
+ NonPersistentPublisherStatsImpl newStats = new
NonPersistentPublisherStatsImpl();
+ newStats.setSupportsPartialProducer(false);
+ this.nonPersistentPublishers.add(newStats);
}
+ ((NonPersistentPublisherStatsImpl)
this.nonPersistentPublishers.get(index))
+ .add((NonPersistentPublisherStatsImpl) s);
}
- });
+ }
if (this.getNonPersistentSubscriptions().size() !=
stats.getNonPersistentSubscriptions().size()) {
for (String subscription :
stats.getNonPersistentSubscriptions().keySet()) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
index 0b07dac85c3..8330a662898 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
/**
* Statistics for a partitioned topic.
+ * This class is not thread-safe.
*/
@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
public class PartitionedTopicStatsImpl extends TopicStatsImpl implements
PartitionedTopicStats {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index c90b625f486..49f88392417 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
/**
* Statistics for a Pulsar topic.
+ * This class is not thread-safe.
*/
@Data
public class TopicStatsImpl implements TopicStats {
@@ -212,7 +213,7 @@ public class TopicStatsImpl implements TopicStats {
}
// if the stats are added for the 1st time, we will need to make a copy of
these stats and add it to the current
- // stats.
+ // stats. This stat addition is not thread-safe.
public TopicStatsImpl add(TopicStats ts) {
TopicStatsImpl stats = (TopicStatsImpl) ts;
@@ -239,7 +240,8 @@ public class TopicStatsImpl implements TopicStats {
this.abortedTxnCount = stats.abortedTxnCount;
this.committedTxnCount = stats.committedTxnCount;
- stats.getPublishers().forEach(s -> {
+ for (int index = 0; index < stats.getPublishers().size(); index++) {
+ PublisherStats s = stats.getPublishers().get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
final PublisherStatsImpl newStats = new
PublisherStatsImpl();
@@ -248,19 +250,20 @@ public class TopicStatsImpl implements TopicStats {
return newStats;
}).add((PublisherStatsImpl) s);
} else {
- if (this.publishers.size() != stats.publishers.size()) {
- for (int i = 0; i < stats.publishers.size(); i++) {
- PublisherStatsImpl newStats = new PublisherStatsImpl();
- newStats.setSupportsPartialProducer(false);
-
this.publishers.add(newStats.add(stats.publishers.get(i)));
- }
- } else {
- for (int i = 0; i < stats.publishers.size(); i++) {
- this.publishers.get(i).add(stats.publishers.get(i));
- }
+ // Add a publisher stat entry to this.publishers
+ // if this.publishers.size() is smaller than
+ // the input stats.publishers.size().
+ // Here, index == this.publishers.size() means
+ // this.publishers.size() is smaller than the input
stats.publishers.size()
+ if (index == this.publishers.size()) {
+ PublisherStatsImpl newStats = new PublisherStatsImpl();
+ newStats.setSupportsPartialProducer(false);
+ this.publishers.add(newStats);
}
+ this.publishers.get(index)
+ .add((PublisherStatsImpl) s);
}
- });
+ }
if (this.subscriptions.size() != stats.subscriptions.size()) {
for (String subscription : stats.subscriptions.keySet()) {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
index 001eaec901f..eb52af4a54f 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
@@ -62,9 +62,11 @@ public class NonPersistentPartitionedTopicStatsTest {
public void testPartitionedTopicStatsByNullProducerName() {
final NonPersistentTopicStatsImpl topicStats1 = new
NonPersistentTopicStatsImpl();
final NonPersistentPublisherStatsImpl publisherStats1 = new
NonPersistentPublisherStatsImpl();
+ publisherStats1.setMsgRateIn(1);
publisherStats1.setSupportsPartialProducer(false);
publisherStats1.setProducerName(null);
final NonPersistentPublisherStatsImpl publisherStats2 = new
NonPersistentPublisherStatsImpl();
+ publisherStats2.setMsgRateIn(2);
publisherStats2.setSupportsPartialProducer(false);
publisherStats2.setProducerName(null);
topicStats1.addPublisher(publisherStats1);
@@ -76,15 +78,22 @@ public class NonPersistentPartitionedTopicStatsTest {
final NonPersistentTopicStatsImpl topicStats2 = new
NonPersistentTopicStatsImpl();
final NonPersistentPublisherStatsImpl publisherStats3 = new
NonPersistentPublisherStatsImpl();
+ publisherStats3.setMsgRateIn(3);
publisherStats3.setSupportsPartialProducer(true);
publisherStats3.setProducerName(null);
final NonPersistentPublisherStatsImpl publisherStats4 = new
NonPersistentPublisherStatsImpl();
+ publisherStats4.setMsgRateIn(4);
publisherStats4.setSupportsPartialProducer(true);
publisherStats4.setProducerName(null);
+ final NonPersistentPublisherStatsImpl publisherStats5 = new
NonPersistentPublisherStatsImpl();
+ publisherStats5.setMsgRateIn(5);
+ publisherStats5.setSupportsPartialProducer(true);
+ publisherStats5.setProducerName(null);
topicStats2.addPublisher(publisherStats3);
topicStats2.addPublisher(publisherStats4);
+ topicStats2.addPublisher(publisherStats5);
- assertEquals(topicStats2.getPublishers().size(), 2);
+ assertEquals(topicStats2.getPublishers().size(), 3);
// when the producerName is null, fall back to false
assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer());
assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer());
@@ -93,6 +102,9 @@ public class NonPersistentPartitionedTopicStatsTest {
target.add(topicStats1);
target.add(topicStats2);
- assertEquals(target.getPublishers().size(), 2);
+ assertEquals(target.getPublishers().size(), 3);
+ assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4);
+ assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6);
+ assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5);
}
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index 0da2b9f8abb..a23c422da61 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -87,9 +87,15 @@ public class PersistentTopicStatsTest {
topicStats1.averageMsgSize = 1;
topicStats1.storageSize = 1;
final PublisherStatsImpl publisherStats1 = new PublisherStatsImpl();
+ publisherStats1.setMsgRateIn(1);
publisherStats1.setSupportsPartialProducer(false);
publisherStats1.setProducerName("name1");
+ final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
+ publisherStats2.setMsgRateIn(2);
+ publisherStats2.setSupportsPartialProducer(false);
+ publisherStats2.setProducerName("name2");
topicStats1.addPublisher(publisherStats1);
+ topicStats1.addPublisher(publisherStats2);
topicStats1.subscriptions.put("test_ns", new SubscriptionStatsImpl());
topicStats1.replication.put("test_ns", new ReplicatorStatsImpl());
@@ -100,10 +106,21 @@ public class PersistentTopicStatsTest {
topicStats2.msgThroughputOut = 4;
topicStats2.averageMsgSize = 5;
topicStats2.storageSize = 6;
- final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl();
- publisherStats2.setSupportsPartialProducer(false);
- publisherStats2.setProducerName("name1");
- topicStats2.addPublisher(publisherStats2);
+ final PublisherStatsImpl publisherStats3 = new PublisherStatsImpl();
+ publisherStats3.setMsgRateIn(3);
+ publisherStats3.setSupportsPartialProducer(false);
+ publisherStats3.setProducerName("name3");
+ final PublisherStatsImpl publisherStats4 = new PublisherStatsImpl();
+ publisherStats4.setMsgRateIn(4);
+ publisherStats4.setSupportsPartialProducer(false);
+ publisherStats4.setProducerName("name4");
+ final PublisherStatsImpl publisherStats5 = new PublisherStatsImpl();
+ publisherStats5.setMsgRateIn(5);
+ publisherStats5.setSupportsPartialProducer(false);
+ publisherStats5.setProducerName("name5");
+ topicStats2.addPublisher(publisherStats3);
+ topicStats2.addPublisher(publisherStats4);
+ topicStats2.addPublisher(publisherStats5);
topicStats2.subscriptions.put("test_ns", new SubscriptionStatsImpl());
topicStats2.replication.put("test_ns", new ReplicatorStatsImpl());
@@ -117,7 +134,10 @@ public class PersistentTopicStatsTest {
assertEquals(target.msgThroughputOut, 5.0);
assertEquals(target.averageMsgSize, 3.0);
assertEquals(target.storageSize, 7);
- assertEquals(target.getPublishers().size(), 1);
+ assertEquals(target.getPublishers().size(), 3);
+ assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4);
+ assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6);
+ assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5);
assertEquals(target.subscriptions.size(), 1);
assertEquals(target.replication.size(), 1);
}