This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c513482420c [fix] [admin] fix incorrect state replication.connected on
API partitioned-topic stat (#19942)
c513482420c is described below
commit c513482420cb66880e2e5b28641008252987d240
Author: fengyubiao <[email protected]>
AuthorDate: Wed Mar 29 09:24:12 2023 +0800
[fix] [admin] fix incorrect state replication.connected on API
partitioned-topic stat (#19942)
### Motivation
Pulsar will merge the variable
`PartitionedTopicStatsImpl.replication[x].connected` by the way below when we
call `pulsar-admin topics partitioned-stats`
``` java
this.connected = this.connected & other.connected
```
But the variable `connected` of `PartitionedTopicStatsImpl.replication` is
initialized `false`, so the expression `this.connected & other.connected` will
always be `false`.
Then we will always get the value `false` if we call `pulsar-admin topics
partitioned-stats`.
### Modifications
make the variable `` of `PartitionedTopicStatsImpl` is initialized `true`
(cherry picked from commit 9fc0b5e197aa36ff64087cd13d156bb65a097fad)
---
.../pulsar/broker/service/ReplicatorTest.java | 25 ++++++++++++++++++++++
.../policies/data/stats/ReplicatorStatsImpl.java | 4 +++-
.../common/policies/data/stats/TopicStatsImpl.java | 2 ++
3 files changed, 30 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 77d2ea9f680..23491972f4a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -97,8 +97,10 @@ import
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.schema.Schemas;
@@ -871,6 +873,29 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertNull(producer);
}
+ @Test(priority = 5, timeOut = 30000)
+ public void testReplicatorConnected() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/tp_" + UUID.randomUUID());
+ final TopicName dest = TopicName.get(topicName);
+ admin1.topics().createPartitionedTopic(topicName, 1);
+
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ Awaitility.await().until(() -> {
+ TopicStats topicStats = admin1.topics().getStats(topicName +
"-partition-0");
+ return topicStats.getReplication().values().stream()
+ .map(ReplicatorStats::isConnected).reduce((a, b) -> a &
b).get();
+ });
+
+ PartitionedTopicStats
+ partitionedTopicStats =
admin1.topics().getPartitionedStats(topicName, true);
+
+ for (ReplicatorStats replicatorStats :
partitionedTopicStats.getReplication().values()){
+ assertTrue(replicatorStats.isConnected());
+ }
+ }
+
@Test
public void testDeleteTopicFailure() throws Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/tp_" + UUID.randomUUID());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
index 90fb0b4eec8..5dc168e2fb8 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ReplicatorStatsImpl.java
@@ -72,7 +72,9 @@ public class ReplicatorStatsImpl implements ReplicatorStats {
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateExpired += stats.msgRateExpired;
this.replicationBacklog += stats.replicationBacklog;
- this.connected &= stats.connected;
+ if (this.connected) {
+ this.connected &= stats.connected;
+ }
this.replicationDelayInSeconds =
Math.max(this.replicationDelayInSeconds, stats.replicationDelayInSeconds);
return this;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 3c5ce5f14d6..c90b625f486 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -280,6 +280,7 @@ public class TopicStatsImpl implements TopicStats {
if (this.replication.size() != stats.replication.size()) {
for (String repl : stats.replication.keySet()) {
ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
+ replStats.setConnected(true);
this.replication.put(repl,
replStats.add(stats.replication.get(repl)));
}
} else {
@@ -288,6 +289,7 @@ public class TopicStatsImpl implements TopicStats {
this.replication.get(repl).add(stats.replication.get(repl));
} else {
ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
+ replStats.setConnected(true);
this.replication.put(repl,
replStats.add(stats.replication.get(repl)));
}
}