This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e4f769  Clear-out old inbound cnx stats when repl producer 
disconnects (#3250)
1e4f769 is described below

commit 1e4f7695972c2ccd2f2b9566e5b9eefeec650b5f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Dec 25 21:34:21 2018 -0800

    Clear-out old inbound cnx stats when repl producer disconnects (#3250)
    
    ### Motivation
    
    Right now, when remote replication-producer gets disconnected due to below 
`ProducerBlockedQuotaExceededError`, broker doesn't cleanup inbound 
repl-producer stats for the topic and it shows invalid stale inbound cnx stats 
until new producer gets created successfully.
    
    ```
    20:10:28.628 [pulsar-io-21-2] WARN  
org.apache.pulsar.broker.service.AbstractReplicator - 
[persistent://localsearch/global/ns/t1][east1 -> west1] Failed to create remote 
producer 
(org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
 Cannot create producer on topic with backlog quota exceeded), retrying in 5
    ```
    
    ### Modifications
    
    Clear-out old stale inbound-repl stats.
    
    ### Result
    
    Broker will not show invalid stale repl-stats.
---
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cc73d0c..77bd3ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1211,12 +1211,10 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
 
             // Add incoming msg rates
             PublisherStats pubStats = 
topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
-            if (pubStats != null) {
-                rStat.msgRateIn = pubStats.msgRateIn;
-                rStat.msgThroughputIn = pubStats.msgThroughputIn;
-                rStat.inboundConnection = pubStats.getAddress();
-                rStat.inboundConnectedSince = pubStats.getConnectedSince();
-            }
+            rStat.msgRateIn = pubStats != null ? pubStats.msgRateIn : 0;
+            rStat.msgThroughputIn = pubStats != null ? 
pubStats.msgThroughputIn : 0;
+            rStat.inboundConnection = pubStats != null ? pubStats.getAddress() 
: null;
+            rStat.inboundConnectedSince = pubStats != null ? 
pubStats.getConnectedSince() : null;
 
             topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
             topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;

Reply via email to