This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9e281  Add non-persistent topic stats separately in brokers-stat 
(#1235)
3c9e281 is described below

commit 3c9e28172d486bfb4c39db5248e1033d5079c48f
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Wed Feb 14 12:03:38 2018 -0800

    Add non-persistent topic stats separately in brokers-stat (#1235)
---
 .../apache/pulsar/broker/service/PulsarStats.java  | 45 +++++++++++++++++-----
 1 file changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index cdc5ada..835cabc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.BrokerOperabilityMetrics;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
@@ -55,6 +56,7 @@ public class PulsarStats implements Closeable {
     private Map<String, NamespaceBundleStats> bundleStats;
     private List<Metrics> tempMetricsCollection;
     private List<Metrics> metricsCollection;
+    private List<NonPersistentTopic> tempNonPersistentTopics;
     private final BrokerOperabilityMetrics brokerOperabilityMetrics;
 
     private final ReentrantReadWriteLock bufferLock = new 
ReentrantReadWriteLock();
@@ -71,6 +73,7 @@ public class PulsarStats implements Closeable {
         this.metricsCollection = Lists.newArrayList();
         this.brokerOperabilityMetrics = new 
BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
                 pulsar.getAdvertisedAddress());
+        this.tempNonPersistentTopics = Lists.newArrayList();
     }
 
     @Override
@@ -118,22 +121,46 @@ public class PulsarStats implements Closeable {
                         currentBundleStats.topics = topics.size();
 
                         
topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle));
+
+                        tempNonPersistentTopics.clear();
+                        // start persistent topic
                         topicStatsStream.startObject("persistent");
                         topics.forEach((name, topic) -> {
-                            try {
-                                topic.updateRates(nsStats, currentBundleStats, 
topicStatsStream,
-                                        clusterReplicationMetrics, 
namespaceName);
-                            } catch (Exception e) {
-                                log.error("Failed to generate topic stats for 
topic {}: {}", name, e.getMessage(), e);
-                            }
-                            // this task: helps to activate 
inactive-backlog-cursors which have caught up and
-                            // connected, also deactivate 
active-backlog-cursors which has backlog
                             if (topic instanceof PersistentTopic) {
+                                try {
+                                    topic.updateRates(nsStats, 
currentBundleStats, topicStatsStream,
+                                            clusterReplicationMetrics, 
namespaceName);
+                                } catch (Exception e) {
+                                    log.error("Failed to generate topic stats 
for topic {}: {}", name, e.getMessage(), e);
+                                }
+                                // this task: helps to activate 
inactive-backlog-cursors which have caught up and
+                                // connected, also deactivate 
active-backlog-cursors which has backlog
                                 ((PersistentTopic) 
topic).getManagedLedger().checkBackloggedCursors();
+                            }else if (topic instanceof NonPersistentTopic) {
+                                
tempNonPersistentTopics.add((NonPersistentTopic) topic);
+                            } else {
+                                log.warn("Unsupported type of topic {}", 
topic.getClass().getName());
                             }
                         });
-
+                        // end persistent topics section
                         topicStatsStream.endObject();
+                        
+                        if(!tempNonPersistentTopics.isEmpty()) {
+                         // start non-persistent topic
+                            topicStatsStream.startObject("non-persistent");
+                            tempNonPersistentTopics.forEach(topic -> {
+                                try {
+                                    topic.updateRates(nsStats, 
currentBundleStats, topicStatsStream,
+                                            clusterReplicationMetrics, 
namespaceName);
+                                } catch (Exception e) {
+                                    log.error("Failed to generate topic stats 
for topic {}: {}", topic.getName(), e.getMessage(), e);
+                                }
+                            });
+                            // end non-persistent topics section
+                            topicStatsStream.endObject();    
+                        }
+                        
+                        // end namespace-bundle section
                         topicStatsStream.endObject();
                     });
 

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to