This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c30765e789b [improve][broker] Exclude producers for geo-replication
from publishers field of topic stats (#22556)
c30765e789b is described below
commit c30765e789ba1ed28699edb9f159c7b71ca5b907
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue May 7 14:02:52 2024 +0900
[improve][broker] Exclude producers for geo-replication from publishers
field of topic stats (#22556)
---
.../service/nonpersistent/NonPersistentTopic.java | 20 +++++++----
.../broker/service/persistent/PersistentTopic.java | 25 +++++++------
.../broker/service/OneWayReplicatorTest.java | 41 ++++++++++++++++++++--
3 files changed, 66 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 8cb8394440f..d19aeaa4b0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarServerException;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -746,8 +747,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
replicators.forEach((region, replicator) -> replicator.updateRates());
- nsStats.producerCount += producers.size();
- bundleStats.producerCount += producers.size();
+ final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);
topicStatsStream.startList("publishers");
@@ -760,14 +760,19 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
if (producer.isRemote()) {
topicStats.remotePublishersStats.put(producer.getRemoteCluster(),
publisherStats);
- }
-
- if (hydratePublishers) {
- StreamingStats.writePublisherStats(topicStatsStream,
publisherStats);
+ } else {
+ // Exclude producers for replication from "publishers" and
"producerCount"
+ producerCount.increment();
+ if (hydratePublishers) {
+ StreamingStats.writePublisherStats(topicStatsStream,
publisherStats);
+ }
}
});
topicStatsStream.endList();
+ nsStats.producerCount += producerCount.intValue();
+ bundleStats.producerCount += producerCount.intValue();
+
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
@@ -856,7 +861,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
// Remaining dest stats.
topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
: (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
- topicStatsStream.writePair("producerCount", producers.size());
+ topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize",
topicStats.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
@@ -930,6 +935,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(),
publisherStats);
} else if (!getStatsOptions.isExcludePublishers()) {
+ // Exclude producers for replication from "publishers"
stats.addPublisher(publisherStats);
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22041326ba2..e9ed8aa6edf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -88,6 +88,7 @@ import
org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -2257,8 +2258,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
replicators.forEach((region, replicator) -> replicator.updateRates());
- nsStats.producerCount += producers.size();
- bundleStats.producerCount += producers.size();
+ final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);
// start publisher stats
@@ -2272,14 +2272,19 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (producer.isRemote()) {
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(),
publisherStats);
- }
-
- // Populate consumer specific stats here
- if (hydratePublishers) {
- StreamingStats.writePublisherStats(topicStatsStream,
publisherStats);
+ } else {
+ // Exclude producers for replication from "publishers" and
"producerCount"
+ producerCount.increment();
+ if (hydratePublishers) {
+ StreamingStats.writePublisherStats(topicStatsStream,
publisherStats);
+ }
}
});
topicStatsStream.endList();
+
+ nsStats.producerCount += producerCount.intValue();
+ bundleStats.producerCount += producerCount.intValue();
+
// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate
and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn >
lastUpdatedAvgPublishRateInMsg
@@ -2447,7 +2452,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// Remaining dest stats.
topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0
? 0.0
: (topicStatsHelper.aggMsgThroughputIn /
topicStatsHelper.aggMsgRateIn);
- topicStatsStream.writePair("producerCount", producers.size());
+ topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize",
topicStatsHelper.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut",
topicStatsHelper.aggMsgRateOut);
@@ -2535,8 +2540,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(),
publisherStats);
- }
- if (!getStatsOptions.isExcludePublishers()){
+ } else if (!getStatsOptions.isExcludePublishers()) {
+ // Exclude producers for replication from "publishers"
stats.addPublisher(publisherStats);
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index eb31c13b0d5..99fd4d877c1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -28,12 +28,15 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -138,17 +141,49 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// Verify replicator works.
Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ Producer<byte[]> producer2 =
client2.newProducer().topic(topicName).create(); // Do not publish messages
Consumer<byte[]> consumer2 =
client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(),
msgValue);
- // Verify there has one item in the attribute "publishers" or
"replications"
+ // Verify that the "publishers" field does not include the producer
for replication
TopicStats topicStats2 = admin2.topics().getStats(topicName);
- assertTrue(topicStats2.getPublishers().size() +
topicStats2.getReplication().size() > 0);
+ assertEquals(topicStats2.getPublishers().size(), 1);
+
assertFalse(topicStats2.getPublishers().get(0).getProducerName().startsWith(config1.getReplicatorPrefix()));
+
+ // Update broker stats immediately (usually updated every minute)
+ pulsar2.getBrokerService().updateRates();
+ String brokerStats2 = admin2.brokerStats().getTopics();
+
+ boolean found = false;
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.readTree(brokerStats2);
+ if (rootNode.hasNonNull(replicatedNamespace)) {
+ Iterator<JsonNode> bundleNodes =
rootNode.get(replicatedNamespace).elements();
+ while (bundleNodes.hasNext()) {
+ JsonNode bundleNode = bundleNodes.next();
+ if (bundleNode.hasNonNull("persistent") &&
bundleNode.get("persistent").hasNonNull(topicName)) {
+ found = true;
+ JsonNode topicNode =
bundleNode.get("persistent").get(topicName);
+ // Verify that the "publishers" field does not include the
producer for replication
+ assertEquals(topicNode.get("publishers").size(), 1);
+ assertEquals(topicNode.get("producerCount").intValue(), 1);
+ Iterator<JsonNode> publisherNodes =
topicNode.get("publishers").elements();
+ while (publisherNodes.hasNext()) {
+ JsonNode publisherNode = publisherNodes.next();
+
assertFalse(publisherNode.get("producerName").textValue()
+ .startsWith(config1.getReplicatorPrefix()));
+ }
+ break;
+ }
+ }
+ }
+ assertTrue(found);
// cleanup.
- consumer2.close();
+ consumer2.unsubscribe();
+ producer2.close();
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);