This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c513482420c [fix] [admin] fix incorrect state replication.connected on 
API partitioned-topic stat (#19942)
c513482420c is described below

commit c513482420cb66880e2e5b28641008252987d240
Author: fengyubiao <[email protected]>
AuthorDate: Wed Mar 29 09:24:12 2023 +0800

    [fix] [admin] fix incorrect state replication.connected on API 
partitioned-topic stat (#19942)
    
    ### Motivation
    
    Pulsar will merge the variable 
`PartitionedTopicStatsImpl.replication[x].connected` by the way below when we 
call `pulsar-admin topics partitioned-stats`
    
    ``` java
    this.connected = this.connected & other.connected
    ```
    
    But the variable `connected` of `PartitionedTopicStatsImpl.replication` is 
initialized `false`, so the expression `this.connected & other.connected` will 
always be `false`.
    
    Then we will always get the value `false` if we call `pulsar-admin topics 
partitioned-stats`.
    
    ### Modifications
    
    make the variable `` of `PartitionedTopicStatsImpl` is initialized `true`
    
    (cherry picked from commit 9fc0b5e197aa36ff64087cd13d156bb65a097fad)
---
 .../pulsar/broker/service/ReplicatorTest.java      | 25 ++++++++++++++++++++++
 .../policies/data/stats/ReplicatorStatsImpl.java   |  4 +++-
 .../common/policies/data/stats/TopicStatsImpl.java |  2 ++
 3 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 77d2ea9f680..23491972f4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -97,8 +97,10 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.schema.Schemas;
@@ -871,6 +873,29 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertNull(producer);
     }
 
+    @Test(priority = 5, timeOut = 30000)
+    public void testReplicatorConnected() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/tp_" + UUID.randomUUID());
+        final TopicName dest = TopicName.get(topicName);
+        admin1.topics().createPartitionedTopic(topicName, 1);
+
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        Awaitility.await().until(() -> {
+            TopicStats topicStats = admin1.topics().getStats(topicName + 
"-partition-0");
+            return topicStats.getReplication().values().stream()
+                    .map(ReplicatorStats::isConnected).reduce((a, b) -> a & 
b).get();
+        });
+
+        PartitionedTopicStats
+                partitionedTopicStats = 
admin1.topics().getPartitionedStats(topicName, true);
+
+        for (ReplicatorStats replicatorStats : 
partitionedTopicStats.getReplication().values()){
+            assertTrue(replicatorStats.isConnected());
+        }
+    }
+
     @Test
     public void testDeleteTopicFailure() throws Exception {
         final String topicName = 
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/tp_" + UUID.randomUUID());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
index 90fb0b4eec8..5dc168e2fb8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
@@ -72,7 +72,9 @@ public class ReplicatorStatsImpl implements ReplicatorStats {
         this.msgThroughputOut += stats.msgThroughputOut;
         this.msgRateExpired += stats.msgRateExpired;
         this.replicationBacklog += stats.replicationBacklog;
-        this.connected &= stats.connected;
+        if (this.connected) {
+            this.connected &= stats.connected;
+        }
         this.replicationDelayInSeconds = 
Math.max(this.replicationDelayInSeconds, stats.replicationDelayInSeconds);
         return this;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 3c5ce5f14d6..c90b625f486 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -280,6 +280,7 @@ public class TopicStatsImpl implements TopicStats {
         if (this.replication.size() != stats.replication.size()) {
             for (String repl : stats.replication.keySet()) {
                 ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
+                replStats.setConnected(true);
                 this.replication.put(repl, 
replStats.add(stats.replication.get(repl)));
             }
         } else {
@@ -288,6 +289,7 @@ public class TopicStatsImpl implements TopicStats {
                     
this.replication.get(repl).add(stats.replication.get(repl));
                 } else {
                     ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
+                    replStats.setConnected(true);
                     this.replication.put(repl, 
replStats.add(stats.replication.get(repl)));
                 }
             }

Reply via email to