This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1e4f769 Clear-out old inbound cnx stats when repl producer
disconnects (#3250)
1e4f769 is described below
commit 1e4f7695972c2ccd2f2b9566e5b9eefeec650b5f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Dec 25 21:34:21 2018 -0800
Clear-out old inbound cnx stats when repl producer disconnects (#3250)
### Motivation
Right now, when remote replication-producer gets disconnected due to below
`ProducerBlockedQuotaExceededError`, broker doesn't cleanup inbound
repl-producer stats for the topic and it shows invalid stale inbound cnx stats
until new producer gets created successfully.
```
20:10:28.628 [pulsar-io-21-2] WARN
org.apache.pulsar.broker.service.AbstractReplicator -
[persistent://localsearch/global/ns/t1][east1 -> west1] Failed to create remote
producer
(org.apache.pulsar.client.api.PulsarClientException$ProducerBlockedQuotaExceededError:
Cannot create producer on topic with backlog quota exceeded), retrying in 5
```
### Modifications
Clear-out old stale inbound-repl stats.
### Result
Broker will not show invalid stale repl-stats.
---
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cc73d0c..77bd3ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1211,12 +1211,10 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
// Add incoming msg rates
PublisherStats pubStats =
topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
- if (pubStats != null) {
- rStat.msgRateIn = pubStats.msgRateIn;
- rStat.msgThroughputIn = pubStats.msgThroughputIn;
- rStat.inboundConnection = pubStats.getAddress();
- rStat.inboundConnectedSince = pubStats.getConnectedSince();
- }
+ rStat.msgRateIn = pubStats != null ? pubStats.msgRateIn : 0;
+ rStat.msgThroughputIn = pubStats != null ?
pubStats.msgThroughputIn : 0;
+ rStat.inboundConnection = pubStats != null ? pubStats.getAddress()
: null;
+ rStat.inboundConnectedSince = pubStats != null ?
pubStats.getConnectedSince() : null;
topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;