This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f7a539593d [fix] [broker] Producer created by replicator is not
displayed in topic stats (#20229)
9f7a539593d is described below
commit 9f7a539593de57ad8c4d224fde81a9e04ac38494
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 6 13:46:05 2023 +0800
[fix] [broker] Producer created by replicator is not displayed in topic
stats (#20229)
### Motivation
A producer of the remote cluster is automatically created when replication
is turned on. But we can't see anything about it from the response of `(remote
cluster) pulsar-admin topic stats`
### Modifications
Make this producer displayed in the topic stats
---
.../broker/service/persistent/PersistentTopic.java | 3 +-
.../broker/service/OneWayReplicatorTest.java | 78 ++++++++
.../broker/service/OneWayReplicatorTestBase.java | 219 +++++++++++++++++++++
3 files changed, 298 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index fe181bb1c01..15854f55c5c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2226,9 +2226,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(),
publisherStats);
- } else {
- stats.addPublisher(publisherStats);
}
+ stats.addPublisher(publisherStats);
});
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 :
(stats.msgThroughputIn / stats.msgRateIn);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
new file mode 100644
index 00000000000..e8a21502fb1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void testReplicatorProducerStatInTopic() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ final String subscribeName = "subscribe_1";
+ final byte[] msgValue = "test".getBytes();
+
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin2.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, subscribeName,
MessageId.earliest);
+ admin2.topics().createSubscription(topicName, subscribeName,
MessageId.earliest);
+
+ // Verify replicator works.
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer2 =
client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
+ producer1.newMessage().value(msgValue).send();
+ pulsar1.getBrokerService().checkReplicationPolicies();
+ assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(),
msgValue);
+
+ // Verify there has one item in the attribute "publishers" or
"replications"
+ TopicStats topicStats2 = admin2.topics().getStats(topicName);
+ Assert.assertTrue(topicStats2.getPublishers().size() +
topicStats2.getReplication().size() > 0);
+
+ // cleanup.
+ consumer2.close();
+ producer1.close();
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ });
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
new file mode 100644
index 00000000000..33620716288
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+
+@Slf4j
+public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
+
+ protected final String defaultTenant = "public";
+ protected final String defaultNamespace = defaultTenant + "/default";
+
+ protected final String cluster1 = "r1";
+ protected URL url1;
+ protected URL urlTls1;
+ protected ServiceConfiguration config1 = new ServiceConfiguration();
+ protected ZookeeperServerTest brokerConfigZk1;
+ protected LocalBookkeeperEnsemble bkEnsemble1;
+ protected PulsarService pulsar1;
+ protected BrokerService ns1;
+ protected PulsarAdmin admin1;
+ protected PulsarClient client1;
+
+ protected URL url2;
+ protected URL urlTls2;
+ protected final String cluster2 = "r2";
+ protected ServiceConfiguration config2 = new ServiceConfiguration();
+ protected ZookeeperServerTest brokerConfigZk2;
+ protected LocalBookkeeperEnsemble bkEnsemble2;
+ protected PulsarService pulsar2;
+ protected BrokerService ns2;
+ protected PulsarAdmin admin2;
+ protected PulsarClient client2;
+
+ protected void startZKAndBK() throws Exception {
+ // Start ZK.
+ brokerConfigZk1 = new ZookeeperServerTest(0);
+ brokerConfigZk1.start();
+ brokerConfigZk2 = new ZookeeperServerTest(0);
+ brokerConfigZk2.start();
+
+ // Start BK.
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1.start();
+ bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble2.start();
+ }
+
+ protected void startBrokers() throws Exception {
+ // Start brokers.
+ setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
+ pulsar1 = new PulsarService(config1);
+ pulsar1.start();
+ ns1 = pulsar1.getBrokerService();
+
+ url1 = new URL(pulsar1.getWebServiceAddress());
+ urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
+ admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+ client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
+
+ // Start region 2
+ setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
+ pulsar2 = new PulsarService(config2);
+ pulsar2.start();
+ ns2 = pulsar2.getBrokerService();
+
+ url2 = new URL(pulsar2.getWebServiceAddress());
+ urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
+ admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+ client2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
+ }
+
+ protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
+ admin1.clusters().createCluster(cluster1, ClusterData.builder()
+ .serviceUrl(url1.toString())
+ .serviceUrlTls(urlTls1.toString())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin1.clusters().createCluster(cluster2, ClusterData.builder()
+ .serviceUrl(url2.toString())
+ .serviceUrlTls(urlTls2.toString())
+ .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin2.clusters().createCluster(cluster1, ClusterData.builder()
+ .serviceUrl(url1.toString())
+ .serviceUrlTls(urlTls1.toString())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin2.clusters().createCluster(cluster2, ClusterData.builder()
+ .serviceUrl(url2.toString())
+ .serviceUrlTls(urlTls2.toString())
+ .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+
+ admin1.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+ admin2.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+
+ admin1.namespaces().createNamespace(defaultNamespace,
Sets.newHashSet(cluster1, cluster2));
+ admin2.namespaces().createNamespace(defaultNamespace);
+ }
+
+ protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws
Exception {
+ admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace,
Collections.singleton(cluster1));
+ admin1.namespaces().unload(defaultNamespace);
+ cleanupTopicAction.run();
+ admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace,
Sets.newHashSet(cluster1, cluster2));
+ }
+
+ protected interface CleanupTopicAction {
+ void run() throws Exception;
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ incrementSetupNumber();
+
+ log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+ startZKAndBK();
+
+ startBrokers();
+
+ createDefaultTenantsAndClustersAndNamespace();
+
+ Thread.sleep(100);
+ log.info("--- OneWayReplicatorTestBase::setup completed ---");
+ }
+
+ private void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble bookkeeperEnsemble,
ZookeeperServerTest brokerConfigZk) {
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setWebServicePort(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" +
bookkeeperEnsemble.getZookeeperPort());
+ config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" +
brokerConfigZk.getZookeeperPort() + "/foo");
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setBacklogQuotaCheckIntervalInSeconds(5);
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ config.setEnableReplicatedSubscriptions(true);
+ config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ markCurrentSetupNumberCleaned();
+ log.info("--- Shutting down ---");
+
+ // Stop brokers.
+ client1.close();
+ client2.close();
+ admin1.close();
+ admin2.close();
+ if (pulsar2 != null) {
+ pulsar2.close();
+ }
+ if (pulsar1 != null) {
+ pulsar1.close();
+ }
+
+ // Stop ZK and BK.
+ bkEnsemble1.stop();
+ bkEnsemble2.stop();
+ brokerConfigZk1.stop();
+ brokerConfigZk2.stop();
+
+ // Reset configs.
+ config1 = new ServiceConfiguration();
+ setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
+ config2 = new ServiceConfiguration();
+ setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
+ }
+}