This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c30765e789b [improve][broker] Exclude producers for geo-replication 
from publishers field of topic stats (#22556)
c30765e789b is described below

commit c30765e789ba1ed28699edb9f159c7b71ca5b907
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue May 7 14:02:52 2024 +0900

    [improve][broker] Exclude producers for geo-replication from publishers 
field of topic stats (#22556)
---
 .../service/nonpersistent/NonPersistentTopic.java  | 20 +++++++----
 .../broker/service/persistent/PersistentTopic.java | 25 +++++++------
 .../broker/service/OneWayReplicatorTest.java       | 41 ++++++++++++++++++++--
 3 files changed, 66 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 8cb8394440f..d19aeaa4b0f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.PulsarServerException;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -746,8 +747,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
         replicators.forEach((region, replicator) -> replicator.updateRates());
 
-        nsStats.producerCount += producers.size();
-        bundleStats.producerCount += producers.size();
+        final MutableInt producerCount = new MutableInt();
         topicStatsStream.startObject(topic);
 
         topicStatsStream.startList("publishers");
@@ -760,14 +760,19 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
             if (producer.isRemote()) {
                 
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), 
publisherStats);
-            }
-
-            if (hydratePublishers) {
-                StreamingStats.writePublisherStats(topicStatsStream, 
publisherStats);
+            } else {
+                // Exclude producers for replication from "publishers" and 
"producerCount"
+                producerCount.increment();
+                if (hydratePublishers) {
+                    StreamingStats.writePublisherStats(topicStatsStream, 
publisherStats);
+                }
             }
         });
         topicStatsStream.endList();
 
+        nsStats.producerCount += producerCount.intValue();
+        bundleStats.producerCount += producerCount.intValue();
+
         // Start replicator stats
         topicStatsStream.startObject("replication");
         nsStats.replicatorCount += topicStats.remotePublishersStats.size();
@@ -856,7 +861,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         // Remaining dest stats.
         topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
                 : (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
-        topicStatsStream.writePair("producerCount", producers.size());
+        topicStatsStream.writePair("producerCount", producerCount.intValue());
         topicStatsStream.writePair("averageMsgSize", 
topicStats.averageMsgSize);
         topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
         topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
@@ -930,6 +935,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             if (producer.isRemote()) {
                 remotePublishersStats.put(producer.getRemoteCluster(), 
publisherStats);
             } else if (!getStatsOptions.isExcludePublishers()) {
+                // Exclude producers for replication from "publishers"
                 stats.addPublisher(publisherStats);
             }
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22041326ba2..e9ed8aa6edf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -88,6 +88,7 @@ import 
org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -2257,8 +2258,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         replicators.forEach((region, replicator) -> replicator.updateRates());
 
-        nsStats.producerCount += producers.size();
-        bundleStats.producerCount += producers.size();
+        final MutableInt producerCount = new MutableInt();
         topicStatsStream.startObject(topic);
 
         // start publisher stats
@@ -2272,14 +2272,19 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
             if (producer.isRemote()) {
                 
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), 
publisherStats);
-            }
-
-            // Populate consumer specific stats here
-            if (hydratePublishers) {
-                StreamingStats.writePublisherStats(topicStatsStream, 
publisherStats);
+            } else {
+                // Exclude producers for replication from "publishers" and 
"producerCount"
+                producerCount.increment();
+                if (hydratePublishers) {
+                    StreamingStats.writePublisherStats(topicStatsStream, 
publisherStats);
+                }
             }
         });
         topicStatsStream.endList();
+
+        nsStats.producerCount += producerCount.intValue();
+        bundleStats.producerCount += producerCount.intValue();
+
         // if publish-rate increases (eg: 0 to 1K) then pick max publish-rate 
and if publish-rate decreases then keep
         // average rate.
         lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > 
lastUpdatedAvgPublishRateInMsg
@@ -2447,7 +2452,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         // Remaining dest stats.
         topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 
? 0.0
                 : (topicStatsHelper.aggMsgThroughputIn / 
topicStatsHelper.aggMsgRateIn);
-        topicStatsStream.writePair("producerCount", producers.size());
+        topicStatsStream.writePair("producerCount", producerCount.intValue());
         topicStatsStream.writePair("averageMsgSize", 
topicStatsHelper.averageMsgSize);
         topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
         topicStatsStream.writePair("msgRateOut", 
topicStatsHelper.aggMsgRateOut);
@@ -2535,8 +2540,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
             if (producer.isRemote()) {
                 remotePublishersStats.put(producer.getRemoteCluster(), 
publisherStats);
-            }
-            if (!getStatsOptions.isExcludePublishers()){
+            } else if (!getStatsOptions.isExcludePublishers()) {
+                // Exclude producers for replication from "publishers"
                 stats.addPublisher(publisherStats);
             }
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index eb31c13b0d5..99fd4d877c1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -28,12 +28,15 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -138,17 +141,49 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
 
         // Verify replicator works.
         Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        Producer<byte[]> producer2 = 
client2.newProducer().topic(topicName).create(); // Do not publish messages
         Consumer<byte[]> consumer2 = 
client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
         producer1.newMessage().value(msgValue).send();
         pulsar1.getBrokerService().checkReplicationPolicies();
         assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), 
msgValue);
 
-        // Verify there has one item in the attribute "publishers" or 
"replications"
+        // Verify that the "publishers" field does not include the producer 
for replication
         TopicStats topicStats2 = admin2.topics().getStats(topicName);
-        assertTrue(topicStats2.getPublishers().size() + 
topicStats2.getReplication().size() > 0);
+        assertEquals(topicStats2.getPublishers().size(), 1);
+        
assertFalse(topicStats2.getPublishers().get(0).getProducerName().startsWith(config1.getReplicatorPrefix()));
+
+        // Update broker stats immediately (usually updated every minute)
+        pulsar2.getBrokerService().updateRates();
+        String brokerStats2 = admin2.brokerStats().getTopics();
+
+        boolean found = false;
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode rootNode = mapper.readTree(brokerStats2);
+        if (rootNode.hasNonNull(replicatedNamespace)) {
+            Iterator<JsonNode> bundleNodes = 
rootNode.get(replicatedNamespace).elements();
+            while (bundleNodes.hasNext()) {
+                JsonNode bundleNode = bundleNodes.next();
+                if (bundleNode.hasNonNull("persistent") && 
bundleNode.get("persistent").hasNonNull(topicName)) {
+                    found = true;
+                    JsonNode topicNode = 
bundleNode.get("persistent").get(topicName);
+                    // Verify that the "publishers" field does not include the 
producer for replication
+                    assertEquals(topicNode.get("publishers").size(), 1);
+                    assertEquals(topicNode.get("producerCount").intValue(), 1);
+                    Iterator<JsonNode> publisherNodes = 
topicNode.get("publishers").elements();
+                    while (publisherNodes.hasNext()) {
+                        JsonNode publisherNode = publisherNodes.next();
+                        
assertFalse(publisherNode.get("producerName").textValue()
+                                .startsWith(config1.getReplicatorPrefix()));
+                    }
+                    break;
+                }
+            }
+        }
+        assertTrue(found);
 
         // cleanup.
-        consumer2.close();
+        consumer2.unsubscribe();
+        producer2.close();
         producer1.close();
         cleanupTopics(() -> {
             admin1.topics().delete(topicName);

Reply via email to