This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f43398e8b1d [fix][broker][fix][broker]Replication stats is empty when
the cluster is the target cluster of a one-way replication (#25583)
f43398e8b1d is described below
commit f43398e8b1da50dfce0306a50d3de9df05019f2b
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 6 00:43:09 2026 +0800
[fix][broker][fix][broker]Replication stats is empty when the cluster is
the target cluster of a one-way replication (#25583)
---
.../broker/service/persistent/PersistentTopic.java | 28 +++++++++----
.../broker/service/OneWayReplicatorTest.java | 49 ++++++++++++++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 6 +++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 +++
4 files changed, 82 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2e76254dc97..f98da6a5a89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -27,6 +27,7 @@ import static
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.github.merlimat.slog.Logger;
@@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Value;
@@ -2978,24 +2980,36 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
- replicators.forEach((cluster, replicator) -> {
- ReplicatorStatsImpl replicatorStats = replicator.computeStats();
-
- // Add incoming msg rates
- PublisherStatsImpl pubStats =
remotePublishersStats.get(replicator.getRemoteCluster());
- if (pubStats != null) {
+ BiConsumer<ReplicatorStatsImpl, PublisherStatsImpl>
replicationInboundSetter =
+ (ReplicatorStatsImpl replicatorStats, PublisherStatsImpl pubStats)
-> {
+ if (replicatorStats == null || pubStats == null) {
+ return;
+ }
replicatorStats.msgRateIn = pubStats.msgRateIn;
replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
replicatorStats.inboundConnection = pubStats.getAddress();
replicatorStats.inboundConnectedSince =
pubStats.getConnectedSince();
- }
+ };
+
+ replicators.forEach((cluster, replicator) -> {
+ ReplicatorStatsImpl replicatorStats = replicator.computeStats();
+ // Add incoming msg rates
+ PublisherStatsImpl pubStats =
remotePublishersStats.remove(replicator.getRemoteCluster());
+ replicationInboundSetter.accept(replicatorStats, pubStats);
stats.msgRateOut += replicatorStats.msgRateOut;
stats.msgThroughputOut += replicatorStats.msgThroughputOut;
stats.replication.put(replicator.getRemoteCluster(),
replicatorStats);
});
+ for (ObjectObjectCursor<String, PublisherStatsImpl> inboundReplication
: remotePublishersStats) {
+ PublisherStatsImpl pubStats = inboundReplication.value;
+ ReplicatorStatsImpl replicatorStats = new ReplicatorStatsImpl();
+ replicationInboundSetter.accept(replicatorStats, pubStats);
+ stats.replication.put(inboundReplication.key, replicatorStats);
+ }
+
stats.storageSize = ledger.getTotalSize();
stats.backlogSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus =
messageDeduplication.getStatus().toString();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 0645877c144..8665a8cd53a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -117,6 +117,7 @@ import
org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -155,6 +156,54 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
super.cleanup();
}
+ @Test(timeOut = 45 * 1000)
+ public void testReceiverSideReplicationStats() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
replicatedNamespace + "/tp_");
+ admin1.topics().createNonPartitionedTopic(topic);
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topic)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS).create();
+ waitReplicatorStarted(topic);
+
+ // Keep publishing to cluster-1.
+ AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ Thread publisherThread = new Thread(() -> {
+ while (keepPublishing.get()) {
+ try {
+ producer1.send("msg");
+ Thread.sleep(100);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ publisherThread.start();
+
+ // Verify: in-bound replication stats.
+ PersistentTopic persistentTopic2 = (PersistentTopic)
broker2.getTopic(topic, false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+
persistentTopic2.getProducers().values().forEach(org.apache.pulsar.broker.service.Producer::updateRates);
+ TopicStats topicStats = admin2.topics().getStats(topic);
+ assertNotNull(topicStats);
+ assertNotNull(topicStats.getReplication());
+ ReplicatorStats replicatorStats =
topicStats.getReplication().get(cluster1);
+ assertNotNull(replicatorStats);
+ assertTrue(replicatorStats.getMsgRateIn() > 0);
+ assertTrue(replicatorStats.getMsgThroughputIn() > 0);
+ assertNotNull(replicatorStats.getInboundConnection());
+ assertNotNull(replicatorStats.getInboundConnectedSince());
+ // The connected attribute means out-bound connection so far.
+ assertFalse(replicatorStats.isConnected());
+ });
+
+ // cleanup.
+ keepPublishing.set(false);
+ producer1.close();
+ cleanupTopics(() -> {
+ admin1.topics().delete(topic);
+ admin2.topics().delete(topic);
+ });
+ }
+
@Test(timeOut = 45 * 1000)
public void testDeleteTopicWhenReplicating() throws Exception {
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index f828f92f7db..89fc126e49c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -65,6 +65,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.cleanup();
}
+ @Override
+ @Test(enabled = false)
+ public void testReceiverSideReplicationStats() throws Exception {
+ super.testReceiverSideReplicationStats();
+ }
+
@Override
protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 4db26758beb..6bbc09d62ba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -84,6 +84,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.cleanup();
}
+ @Override
+ @Test(enabled = false)
+ public void testReceiverSideReplicationStats() throws Exception {
+ super.testReceiverSideReplicationStats();
+ }
+
protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);