This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6fdc0e31bff [fix] [test] Fix flaky test ReplicatorTest (#22594)
6fdc0e31bff is described below
commit 6fdc0e31bff906446e70965531671389d57e6cda
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 17:24:41 2024 +0800
[fix] [test] Fix flaky test ReplicatorTest (#22594)
---
.../broker/service/ReplicatorGlobalNSTest.java | 129 ++++++++++++++++-----
.../pulsar/broker/service/ReplicatorTest.java | 121 +++++--------------
2 files changed, 130 insertions(+), 120 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index eed849ef1a0..514e0207fbf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -18,18 +18,24 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import lombok.Cleanup;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -41,6 +47,11 @@ import org.testng.annotations.Test;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
+/**
+ * The tests in this class should be denied in a production pulsar cluster.
they are very dangerous, which leads to
+ * a lot of topic deletion and makes namespace policies being incorrect.
+ */
+@Slf4j
@Test(groups = "broker-impl")
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
@@ -81,7 +92,7 @@ public class ReplicatorGlobalNSTest extends
ReplicatorTestBase {
*
* @throws Exception
*/
- @Test
+ @Test(priority = Integer.MAX_VALUE)
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
log.info("--- Starting
ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
@@ -115,32 +126,88 @@ public class ReplicatorGlobalNSTest extends
ReplicatorTestBase {
});
}
- @Test
- public void testForcefullyTopicDeletion() throws Exception {
- log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion
---");
-
- final String namespace = "pulsar/removeClusterTest";
- admin1.namespaces().createNamespace(namespace);
- admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1"));
-
- final String topicName = "persistent://" + namespace + "/topic";
-
- @Cleanup
- PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
- .build();
-
- ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>)
client1.newProducer().topic(topicName)
-
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
- producer1.close();
-
- admin1.topics().delete(topicName, true);
-
- MockedPulsarServiceBaseTest
- .retryStrategically((test) ->
!pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
-
-
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+ /**
+ * This is not a formal operation and can cause serious problems if call
it in a production environment.
+ */
+ @Test(priority = Integer.MAX_VALUE - 1)
+ public void testConfigChange() throws Exception {
+ log.info("--- Starting ReplicatorTest::testConfigChange ---");
+ // This test is to verify that the config change on global namespace
is successfully applied in broker during
+ // runtime.
+ // Run a set of producer tasks to create the topics
+ List<Future<Void>> results = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ final TopicName dest =
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" +
i));
+
+ results.add(executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+
+ @Cleanup
+ MessageProducer producer = new MessageProducer(url1, dest);
+ log.info("--- Starting producer --- " + url1);
+
+ @Cleanup
+ MessageConsumer consumer = new MessageConsumer(url1, dest);
+ log.info("--- Starting Consumer --- " + url1);
+
+ producer.produce(2);
+ consumer.receive(2);
+ return null;
+ }
+ }));
+ }
+
+ for (Future<Void> result : results) {
+ try {
+ result.get();
+ } catch (Exception e) {
+ log.error("exception in getting future result ", e);
+ fail(String.format("replication test failed with %s
exception", e.getMessage()));
+ }
+ }
+
+ Thread.sleep(1000L);
+ // Make sure that the internal replicators map contains remote cluster
info
+ ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 =
ns1.getReplicationClients();
+ ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 =
ns2.getReplicationClients();
+ ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 =
ns3.getReplicationClients();
+
+ Assert.assertNotNull(replicationClients1.get("r2"));
+ Assert.assertNotNull(replicationClients1.get("r3"));
+ Assert.assertNotNull(replicationClients2.get("r1"));
+ Assert.assertNotNull(replicationClients2.get("r3"));
+ Assert.assertNotNull(replicationClients3.get("r1"));
+ Assert.assertNotNull(replicationClients3.get("r2"));
+
+ // Case 1: Update the global namespace replication configuration to
only contains the local cluster itself
+ admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns",
Sets.newHashSet("r1"));
+
+ // Wait for config changes to be updated.
+ Thread.sleep(1000L);
+
+ // Make sure that the internal replicators map still contains remote
cluster info
+ Assert.assertNotNull(replicationClients1.get("r2"));
+ Assert.assertNotNull(replicationClients1.get("r3"));
+ Assert.assertNotNull(replicationClients2.get("r1"));
+ Assert.assertNotNull(replicationClients2.get("r3"));
+ Assert.assertNotNull(replicationClients3.get("r1"));
+ Assert.assertNotNull(replicationClients3.get("r2"));
+
+ // Case 2: Update the configuration back
+ admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns",
Sets.newHashSet("r1", "r2", "r3"));
+
+ // Wait for config changes to be updated.
+ Thread.sleep(1000L);
+
+ // Make sure that the internal replicators map still contains remote
cluster info
+ Assert.assertNotNull(replicationClients1.get("r2"));
+ Assert.assertNotNull(replicationClients1.get("r3"));
+ Assert.assertNotNull(replicationClients2.get("r1"));
+ Assert.assertNotNull(replicationClients2.get("r3"));
+ Assert.assertNotNull(replicationClients3.get("r1"));
+ Assert.assertNotNull(replicationClients3.get("r2"));
+
+ // Case 3: TODO: Once automatic cleanup is implemented, add tests case
to verify auto removal of clusters
}
-
- private static final Logger log =
LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index fa12eba1c66..765727aeac3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -44,13 +44,11 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -68,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -154,88 +153,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
- @Test(priority = Integer.MAX_VALUE)
- public void testConfigChange() throws Exception {
- log.info("--- Starting ReplicatorTest::testConfigChange ---");
- // This test is to verify that the config change on global namespace
is successfully applied in broker during
- // runtime.
- // Run a set of producer tasks to create the topics
- List<Future<Void>> results = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- final TopicName dest =
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" +
i));
-
- results.add(executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- @Cleanup
- MessageProducer producer = new MessageProducer(url1, dest);
- log.info("--- Starting producer --- " + url1);
-
- @Cleanup
- MessageConsumer consumer = new MessageConsumer(url1, dest);
- log.info("--- Starting Consumer --- " + url1);
-
- producer.produce(2);
- consumer.receive(2);
- return null;
- }
- }));
- }
-
- for (Future<Void> result : results) {
- try {
- result.get();
- } catch (Exception e) {
- log.error("exception in getting future result ", e);
- fail(String.format("replication test failed with %s
exception", e.getMessage()));
- }
- }
-
- Thread.sleep(1000L);
- // Make sure that the internal replicators map contains remote cluster
info
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 =
ns1.getReplicationClients();
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 =
ns2.getReplicationClients();
- ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 =
ns3.getReplicationClients();
-
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
-
- // Case 1: Update the global namespace replication configuration to
only contains the local cluster itself
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns",
Sets.newHashSet("r1"));
-
- // Wait for config changes to be updated.
- Thread.sleep(1000L);
-
- // Make sure that the internal replicators map still contains remote
cluster info
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
-
- // Case 2: Update the configuration back
- admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns",
Sets.newHashSet("r1", "r2", "r3"));
-
- // Wait for config changes to be updated.
- Thread.sleep(1000L);
-
- // Make sure that the internal replicators map still contains remote
cluster info
- Assert.assertNotNull(replicationClients1.get("r2"));
- Assert.assertNotNull(replicationClients1.get("r3"));
- Assert.assertNotNull(replicationClients2.get("r1"));
- Assert.assertNotNull(replicationClients2.get("r3"));
- Assert.assertNotNull(replicationClients3.get("r1"));
- Assert.assertNotNull(replicationClients3.get("r2"));
-
- // Case 3: TODO: Once automatic cleanup is implemented, add tests case
to verify auto removal of clusters
- }
-
@Test(timeOut = 10000)
public void activeBrokerParse() throws Exception {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
@@ -253,6 +170,32 @@ public class ReplicatorTest extends ReplicatorTestBase {
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}
+ @Test
+ public void testForcefullyTopicDeletion() throws Exception {
+ log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion
---");
+
+ final String namespace =
BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1"));
+
+ final String topicName = "persistent://" + namespace + "/topic";
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>)
client1.newProducer().topic(topicName)
+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+ producer1.close();
+
+ admin1.topics().delete(topicName, true);
+
+ MockedPulsarServiceBaseTest
+ .retryStrategically((test) ->
!pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
+
+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+ }
+
@SuppressWarnings("unchecked")
@Test(timeOut = 30000)
public void testConcurrentReplicator() throws Exception {
@@ -1270,7 +1213,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
- final String namespace = "pulsar/global/repl";
+ final String namespace =
BrokerTestUtil.newUniqueName("pulsar/global/repl");
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/topic1");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2", "r3"));
@@ -1677,7 +1620,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testReplication ---");
- String namespace = "pulsar/global/ns2";
+ String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/ackFailedTopic"));
@@ -1749,7 +1692,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Test
public void testWhenUpdateReplicationCluster() throws Exception {
log.info("--- testWhenUpdateReplicationCluster ---");
- String namespace = "pulsar/ns2";
+ String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
final TopicName dest = TopicName.get(
@@ -1778,12 +1721,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Test
public void testReplicatorProducerNotExceed() throws Exception {
log.info("--- testReplicatorProducerNotExceed ---");
- String namespace1 = "pulsar/ns11";
+ String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
admin1.namespaces().createNamespace(namespace1);
admin1.namespaces().setNamespaceReplicationClusters(namespace1,
Sets.newHashSet("r1", "r2"));
final TopicName dest1 = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace1 +
"/testReplicatorProducerNotExceed1"));
- String namespace2 = "pulsar/ns22";
+ String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
admin2.namespaces().createNamespace(namespace2);
admin2.namespaces().setNamespaceReplicationClusters(namespace2,
Sets.newHashSet("r1", "r2"));
final TopicName dest2 = TopicName.get(