This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new a764bbd2a7b [fix][monitor] Fix the partitioned publisher topic stat aggregation bug (#18807) a764bbd2a7b is described below commit a764bbd2a7b5c7053ba0f50ed1aa3197cba6e7de Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Wed Jan 4 06:04:06 2023 -0800 [fix][monitor] Fix the partitioned publisher topic stat aggregation bug (#18807) (cherry picked from commit 8790ed18fc037988b044fed18202a1f3b50f7c65) --- .../NonPersistentPartitionedTopicStatsImpl.java | 1 + .../data/stats/NonPersistentTopicStatsImpl.java | 31 +++++++++++----------- .../data/stats/PartitionedTopicStatsImpl.java | 1 + .../common/policies/data/stats/TopicStatsImpl.java | 29 +++++++++++--------- .../NonPersistentPartitionedTopicStatsTest.java | 16 +++++++++-- .../policies/data/PersistentTopicStatsTest.java | 30 +++++++++++++++++---- 6 files changed, 73 insertions(+), 35 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java index 2a9fe423284..2ba9682383f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java @@ -27,6 +27,7 @@ import org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats /** * Statistics for a non-persistent partitioned topic. + * This class is not thread-safe. */ @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS") public class NonPersistentPartitionedTopicStatsImpl extends NonPersistentTopicStatsImpl diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java index 23e603ea028..c798c00acec 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java @@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PublisherStats; /** * Statistics for a non-persistent topic. + * This class is not thread-safe. */ @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS") public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPersistentTopicStats { @@ -148,14 +149,14 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current - // stats. + // stats. This stat addition is not thread-safe. public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) { NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts; Objects.requireNonNull(stats); super.add(stats); this.msgDropRate += stats.msgDropRate; - - stats.getNonPersistentPublishers().forEach(s -> { + for (int index = 0; index < stats.getNonPersistentPublishers().size(); index++) { + NonPersistentPublisherStats s = stats.getNonPersistentPublishers().get(index); if (s.isSupportsPartialProducer() && s.getProducerName() != null) { ((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap .computeIfAbsent(s.getProducerName(), key -> { @@ -165,20 +166,20 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe return newStats; })).add((NonPersistentPublisherStatsImpl) s); } else { - if (this.nonPersistentPublishers.size() != stats.getNonPersistentPublishers().size()) { - for (int i = 0; i < stats.getNonPersistentPublishers().size(); i++) { - NonPersistentPublisherStatsImpl newStats = new NonPersistentPublisherStatsImpl(); - newStats.setSupportsPartialProducer(false); - this.nonPersistentPublishers.add(newStats.add((NonPersistentPublisherStatsImpl) s)); - } - } else { - for (int i = 0; i < stats.getNonPersistentPublishers().size(); i++) { - ((NonPersistentPublisherStatsImpl) this.nonPersistentPublishers.get(i)) - .add((NonPersistentPublisherStatsImpl) s); - } + // Add a non-persistent publisher stat entry to this.nonPersistentPublishers + // if this.nonPersistentPublishers.size() is smaller than + // the input stats.nonPersistentPublishers.size(). + // Here, index == this.nonPersistentPublishers.size() means + // this.nonPersistentPublishers.size() is smaller than the input stats.nonPersistentPublishers.size() + if (index == this.nonPersistentPublishers.size()) { + NonPersistentPublisherStatsImpl newStats = new NonPersistentPublisherStatsImpl(); + newStats.setSupportsPartialProducer(false); + this.nonPersistentPublishers.add(newStats); } + ((NonPersistentPublisherStatsImpl) this.nonPersistentPublishers.get(index)) + .add((NonPersistentPublisherStatsImpl) s); } - }); + } if (this.getNonPersistentSubscriptions().size() != stats.getNonPersistentSubscriptions().size()) { for (String subscription : stats.getNonPersistentSubscriptions().keySet()) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java index 0b07dac85c3..8330a662898 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PartitionedTopicStatsImpl.java @@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; /** * Statistics for a partitioned topic. + * This class is not thread-safe. */ @SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS") public class PartitionedTopicStatsImpl extends TopicStatsImpl implements PartitionedTopicStats { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index c90b625f486..49f88392417 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; /** * Statistics for a Pulsar topic. + * This class is not thread-safe. */ @Data public class TopicStatsImpl implements TopicStats { @@ -212,7 +213,7 @@ public class TopicStatsImpl implements TopicStats { } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current - // stats. + // stats. This stat addition is not thread-safe. public TopicStatsImpl add(TopicStats ts) { TopicStatsImpl stats = (TopicStatsImpl) ts; @@ -239,7 +240,8 @@ public class TopicStatsImpl implements TopicStats { this.abortedTxnCount = stats.abortedTxnCount; this.committedTxnCount = stats.committedTxnCount; - stats.getPublishers().forEach(s -> { + for (int index = 0; index < stats.getPublishers().size(); index++) { + PublisherStats s = stats.getPublishers().get(index); if (s.isSupportsPartialProducer() && s.getProducerName() != null) { this.publishersMap.computeIfAbsent(s.getProducerName(), key -> { final PublisherStatsImpl newStats = new PublisherStatsImpl(); @@ -248,19 +250,20 @@ public class TopicStatsImpl implements TopicStats { return newStats; }).add((PublisherStatsImpl) s); } else { - if (this.publishers.size() != stats.publishers.size()) { - for (int i = 0; i < stats.publishers.size(); i++) { - PublisherStatsImpl newStats = new PublisherStatsImpl(); - newStats.setSupportsPartialProducer(false); - this.publishers.add(newStats.add(stats.publishers.get(i))); - } - } else { - for (int i = 0; i < stats.publishers.size(); i++) { - this.publishers.get(i).add(stats.publishers.get(i)); - } + // Add a publisher stat entry to this.publishers + // if this.publishers.size() is smaller than + // the input stats.publishers.size(). + // Here, index == this.publishers.size() means + // this.publishers.size() is smaller than the input stats.publishers.size() + if (index == this.publishers.size()) { + PublisherStatsImpl newStats = new PublisherStatsImpl(); + newStats.setSupportsPartialProducer(false); + this.publishers.add(newStats); } + this.publishers.get(index) + .add((PublisherStatsImpl) s); } - }); + } if (this.subscriptions.size() != stats.subscriptions.size()) { for (String subscription : stats.subscriptions.keySet()) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java index 001eaec901f..eb52af4a54f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java @@ -62,9 +62,11 @@ public class NonPersistentPartitionedTopicStatsTest { public void testPartitionedTopicStatsByNullProducerName() { final NonPersistentTopicStatsImpl topicStats1 = new NonPersistentTopicStatsImpl(); final NonPersistentPublisherStatsImpl publisherStats1 = new NonPersistentPublisherStatsImpl(); + publisherStats1.setMsgRateIn(1); publisherStats1.setSupportsPartialProducer(false); publisherStats1.setProducerName(null); final NonPersistentPublisherStatsImpl publisherStats2 = new NonPersistentPublisherStatsImpl(); + publisherStats2.setMsgRateIn(2); publisherStats2.setSupportsPartialProducer(false); publisherStats2.setProducerName(null); topicStats1.addPublisher(publisherStats1); @@ -76,15 +78,22 @@ public class NonPersistentPartitionedTopicStatsTest { final NonPersistentTopicStatsImpl topicStats2 = new NonPersistentTopicStatsImpl(); final NonPersistentPublisherStatsImpl publisherStats3 = new NonPersistentPublisherStatsImpl(); + publisherStats3.setMsgRateIn(3); publisherStats3.setSupportsPartialProducer(true); publisherStats3.setProducerName(null); final NonPersistentPublisherStatsImpl publisherStats4 = new NonPersistentPublisherStatsImpl(); + publisherStats4.setMsgRateIn(4); publisherStats4.setSupportsPartialProducer(true); publisherStats4.setProducerName(null); + final NonPersistentPublisherStatsImpl publisherStats5 = new NonPersistentPublisherStatsImpl(); + publisherStats5.setMsgRateIn(5); + publisherStats5.setSupportsPartialProducer(true); + publisherStats5.setProducerName(null); topicStats2.addPublisher(publisherStats3); topicStats2.addPublisher(publisherStats4); + topicStats2.addPublisher(publisherStats5); - assertEquals(topicStats2.getPublishers().size(), 2); + assertEquals(topicStats2.getPublishers().size(), 3); // when the producerName is null, fall back to false assertFalse(topicStats2.getPublishers().get(0).isSupportsPartialProducer()); assertFalse(topicStats2.getPublishers().get(1).isSupportsPartialProducer()); @@ -93,6 +102,9 @@ public class NonPersistentPartitionedTopicStatsTest { target.add(topicStats1); target.add(topicStats2); - assertEquals(target.getPublishers().size(), 2); + assertEquals(target.getPublishers().size(), 3); + assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4); + assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6); + assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java index 0da2b9f8abb..a23c422da61 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java @@ -87,9 +87,15 @@ public class PersistentTopicStatsTest { topicStats1.averageMsgSize = 1; topicStats1.storageSize = 1; final PublisherStatsImpl publisherStats1 = new PublisherStatsImpl(); + publisherStats1.setMsgRateIn(1); publisherStats1.setSupportsPartialProducer(false); publisherStats1.setProducerName("name1"); + final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl(); + publisherStats2.setMsgRateIn(2); + publisherStats2.setSupportsPartialProducer(false); + publisherStats2.setProducerName("name2"); topicStats1.addPublisher(publisherStats1); + topicStats1.addPublisher(publisherStats2); topicStats1.subscriptions.put("test_ns", new SubscriptionStatsImpl()); topicStats1.replication.put("test_ns", new ReplicatorStatsImpl()); @@ -100,10 +106,21 @@ public class PersistentTopicStatsTest { topicStats2.msgThroughputOut = 4; topicStats2.averageMsgSize = 5; topicStats2.storageSize = 6; - final PublisherStatsImpl publisherStats2 = new PublisherStatsImpl(); - publisherStats2.setSupportsPartialProducer(false); - publisherStats2.setProducerName("name1"); - topicStats2.addPublisher(publisherStats2); + final PublisherStatsImpl publisherStats3 = new PublisherStatsImpl(); + publisherStats3.setMsgRateIn(3); + publisherStats3.setSupportsPartialProducer(false); + publisherStats3.setProducerName("name3"); + final PublisherStatsImpl publisherStats4 = new PublisherStatsImpl(); + publisherStats4.setMsgRateIn(4); + publisherStats4.setSupportsPartialProducer(false); + publisherStats4.setProducerName("name4"); + final PublisherStatsImpl publisherStats5 = new PublisherStatsImpl(); + publisherStats5.setMsgRateIn(5); + publisherStats5.setSupportsPartialProducer(false); + publisherStats5.setProducerName("name5"); + topicStats2.addPublisher(publisherStats3); + topicStats2.addPublisher(publisherStats4); + topicStats2.addPublisher(publisherStats5); topicStats2.subscriptions.put("test_ns", new SubscriptionStatsImpl()); topicStats2.replication.put("test_ns", new ReplicatorStatsImpl()); @@ -117,7 +134,10 @@ public class PersistentTopicStatsTest { assertEquals(target.msgThroughputOut, 5.0); assertEquals(target.averageMsgSize, 3.0); assertEquals(target.storageSize, 7); - assertEquals(target.getPublishers().size(), 1); + assertEquals(target.getPublishers().size(), 3); + assertEquals(target.getPublishers().get(0).getMsgRateIn(), 4); + assertEquals(target.getPublishers().get(1).getMsgRateIn(), 6); + assertEquals(target.getPublishers().get(2).getMsgRateIn(), 5); assertEquals(target.subscriptions.size(), 1); assertEquals(target.replication.size(), 1); }