This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f43398e8b1d [fix][broker][fix][broker]Replication stats is empty when 
the cluster is the target cluster of a one-way replication (#25583)
f43398e8b1d is described below

commit f43398e8b1da50dfce0306a50d3de9df05019f2b
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 6 00:43:09 2026 +0800

    [fix][broker][fix][broker]Replication stats is empty when the cluster is 
the target cluster of a one-way replication (#25583)
---
 .../broker/service/persistent/PersistentTopic.java | 28 +++++++++----
 .../broker/service/OneWayReplicatorTest.java       | 49 ++++++++++++++++++++++
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |  6 +++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  6 +++
 4 files changed, 82 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2e76254dc97..f98da6a5a89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -27,6 +27,7 @@ import static 
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.github.merlimat.slog.Logger;
@@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import lombok.Getter;
 import lombok.Value;
@@ -2978,24 +2980,36 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
         stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
 
-        replicators.forEach((cluster, replicator) -> {
-            ReplicatorStatsImpl replicatorStats = replicator.computeStats();
-
-            // Add incoming msg rates
-            PublisherStatsImpl pubStats = 
remotePublishersStats.get(replicator.getRemoteCluster());
-            if (pubStats != null) {
+        BiConsumer<ReplicatorStatsImpl, PublisherStatsImpl> 
replicationInboundSetter =
+            (ReplicatorStatsImpl replicatorStats, PublisherStatsImpl pubStats) 
-> {
+                if (replicatorStats == null ||  pubStats == null) {
+                    return;
+                }
                 replicatorStats.msgRateIn = pubStats.msgRateIn;
                 replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
                 replicatorStats.inboundConnection = pubStats.getAddress();
                 replicatorStats.inboundConnectedSince = 
pubStats.getConnectedSince();
-            }
+            };
+
+        replicators.forEach((cluster, replicator) -> {
+            ReplicatorStatsImpl replicatorStats = replicator.computeStats();
 
+            // Add incoming msg rates
+            PublisherStatsImpl pubStats = 
remotePublishersStats.remove(replicator.getRemoteCluster());
+            replicationInboundSetter.accept(replicatorStats, pubStats);
             stats.msgRateOut += replicatorStats.msgRateOut;
             stats.msgThroughputOut += replicatorStats.msgThroughputOut;
 
             stats.replication.put(replicator.getRemoteCluster(), 
replicatorStats);
         });
 
+        for (ObjectObjectCursor<String, PublisherStatsImpl> inboundReplication 
: remotePublishersStats) {
+            PublisherStatsImpl pubStats = inboundReplication.value;
+            ReplicatorStatsImpl replicatorStats = new ReplicatorStatsImpl();
+            replicationInboundSetter.accept(replicatorStats, pubStats);
+            stats.replication.put(inboundReplication.key, replicatorStats);
+        }
+
         stats.storageSize = ledger.getTotalSize();
         stats.backlogSize = ledger.getEstimatedBacklogSize();
         stats.deduplicationStatus = 
messageDeduplication.getStatus().toString();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 0645877c144..8665a8cd53a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -117,6 +117,7 @@ import 
org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -155,6 +156,54 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         super.cleanup();
     }
 
+    @Test(timeOut = 45 * 1000)
+    public void testReceiverSideReplicationStats() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
replicatedNamespace + "/tp_");
+        admin1.topics().createNonPartitionedTopic(topic);
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topic)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS).create();
+        waitReplicatorStarted(topic);
+
+        // Keep publishing to cluster-1.
+        AtomicBoolean keepPublishing = new AtomicBoolean(true);
+        Thread publisherThread = new Thread(() -> {
+            while (keepPublishing.get()) {
+                try {
+                    producer1.send("msg");
+                    Thread.sleep(100);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        publisherThread.start();
+
+        // Verify: in-bound replication stats.
+        PersistentTopic persistentTopic2 = (PersistentTopic) 
broker2.getTopic(topic, false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            
persistentTopic2.getProducers().values().forEach(org.apache.pulsar.broker.service.Producer::updateRates);
+            TopicStats topicStats = admin2.topics().getStats(topic);
+            assertNotNull(topicStats);
+            assertNotNull(topicStats.getReplication());
+            ReplicatorStats replicatorStats = 
topicStats.getReplication().get(cluster1);
+            assertNotNull(replicatorStats);
+            assertTrue(replicatorStats.getMsgRateIn() > 0);
+            assertTrue(replicatorStats.getMsgThroughputIn() > 0);
+            assertNotNull(replicatorStats.getInboundConnection());
+            assertNotNull(replicatorStats.getInboundConnectedSince());
+            // The connected attribute means out-bound connection so far.
+            assertFalse(replicatorStats.isConnected());
+        });
+
+        // cleanup.
+        keepPublishing.set(false);
+        producer1.close();
+        cleanupTopics(() -> {
+            admin1.topics().delete(topic);
+            admin2.topics().delete(topic);
+        });
+    }
+
     @Test(timeOut = 45 * 1000)
     public void testDeleteTopicWhenReplicating() throws Exception {
         final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index f828f92f7db..89fc126e49c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -65,6 +65,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         super.cleanup();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testReceiverSideReplicationStats() throws Exception {
+        super.testReceiverSideReplicationStats();
+    }
+
     @Override
     protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
                                      LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 4db26758beb..6bbc09d62ba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -84,6 +84,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.cleanup();
     }
 
+    @Override
+    @Test(enabled = false)
+    public void testReceiverSideReplicationStats() throws Exception {
+        super.testReceiverSideReplicationStats();
+    }
+
     protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
                                      LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
         super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, 
brokerConfigZk);

Reply via email to