This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 738ec6a  [FlakyTest]Try to fix flaky test 
ReplicatorTest.testReplicatorOnPartitionedTopic. (#9659)
738ec6a is described below

commit 738ec6a7d0a1b46155ca3b6436d294e38528230f
Author: Marvin Cai <[email protected]>
AuthorDate: Thu Feb 25 05:54:55 2021 -0800

    [FlakyTest]Try to fix flaky test 
ReplicatorTest.testReplicatorOnPartitionedTopic. (#9659)
    
    Fixes #9457
    
    ### Motivation
    Not sure why state is not cleaned up but exception is indicating metadata 
of namespace the test trying to create already exist so use a different 
namespace name for different test run should able to fix it.
    
    ### Modifications
    Append System.nanoTime to the namespace used to avoid being affect by 
previous state. This seems working fine for other test cases in the same test.
---
 .../org/apache/pulsar/broker/BrokerTestUtil.java   | 32 +++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  3 +-
 .../broker/admin/IncrementPartitionsTest.java      |  7 ++-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 15 +++---
 .../broker/service/BrokerBkEnsemblesTests.java     |  5 +-
 .../pulsar/broker/service/PeerReplicatorTest.java  |  3 +-
 .../broker/service/PersistentFailoverE2ETest.java  |  3 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 53 +++++++++++-----------
 .../service/persistent/DelayedDeliveryTest.java    | 11 +++--
 .../ReplicatedSubscriptionConfigTest.java          |  9 ++--
 .../api/SimpleTypedProducerConsumerTest.java       |  3 +-
 .../SubscriptionMessageDispatchThrottlingTest.java |  3 +-
 .../client/impl/ConsumerDedupPermitsUpdate.java    |  3 +-
 .../pulsar/client/impl/NegativeAcksTest.java       |  3 +-
 .../websocket/proxy/ProxyPublishConsumeTest.java   |  3 +-
 15 files changed, 101 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
new file mode 100644
index 0000000..ff03e42
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.util.UUID;
+
+/**
+ * Holds util methods used in test.
+ */
+public class BrokerTestUtil {
+    // Generate unique name for different test run.
+    public static String newUniqueName(String prefix) {
+        return prefix + "-" + UUID.randomUUID();
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 826116c..8c8ddff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -60,6 +60,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -2621,7 +2622,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                 Sets.newHashSet("test", "usw"));
         admin.tenants().updateTenant("prop-xyz", tenantInfo);
 
-        String ns = "prop-xyz/ns-" + System.nanoTime();
+        String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");
 
         admin.namespaces().createNamespace(ns, 24);
         admin.namespaces().deleteNamespace(ns);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index d3f5d7b..5c08fd5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
@@ -107,7 +108,8 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
-        final String partitionedTopicName = 
"persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime();
+        final String partitionedTopicName =
+                
BrokerTestUtil.newUniqueName("persistent://prop-xyz/use/ns1/test-topic");
 
         admin.topics().createPartitionedTopic(partitionedTopicName, 1);
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 1);
@@ -133,7 +135,8 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testIncrementPartitionsWithReaders() throws Exception {
-        TopicName partitionedTopicName = 
TopicName.get("persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime());
+        TopicName partitionedTopicName = TopicName.get(
+                
BrokerTestUtil.newUniqueName("persistent://prop-xyz/use/ns1/test-topic"));
 
         admin.topics().createPartitionedTopic(partitionedTopicName.toString(), 
1);
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName.toString()).partitions,
 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b7e3e52..ce31fd1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import static org.mockito.ArgumentMatchers.any;
@@ -1140,7 +1141,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
      */
     @Test
     public void testForceDeleteNamespace() throws Exception {
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
         String topic = namespace + "/topic";
 
         admin.namespaces().createNamespace(namespace, 100);
@@ -1360,7 +1361,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testDeleteNonPartitionedTopicMultipleTimes() throws Exception {
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
         String topic = namespace + "/topic";
 
         admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(testLocalCluster));
@@ -1388,7 +1389,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testDeletePartitionedTopicMultipleTimes() throws Exception {
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
         String topic = namespace + "/topic";
 
         admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(testLocalCluster));
@@ -1417,7 +1418,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testRetentionPolicyValidation() throws Exception {
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
 
         admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(testLocalCluster));
 
@@ -1596,7 +1597,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     public void testSubscriptionTypesEnabled() throws PulsarAdminException, 
PulsarClientException {
         pulsar.getConfiguration().setAuthorizationEnabled(false);
         pulsar.getConfiguration().setTopicLevelPoliciesEnabled(false);
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
         String topic = namespace + "/test-subscription-enabled";
         admin.namespaces().createNamespace(namespace);
         Set<SubscriptionType> subscriptionTypes = new HashSet<>();
@@ -1666,7 +1667,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     private void assertValidRetentionPolicyAsPartOfAllPolicies(Policies 
policies, int retentionTimeInMinutes,
                                                                int 
retentionSizeInMB) throws PulsarAdminException {
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
         RetentionPolicies retention = new 
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
         policies.retention_policies = retention;
         admin.namespaces().createNamespace(namespace, policies);
@@ -1675,7 +1676,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     private void assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies 
policies, int retentionTimeInMinutes,
                                                                  int 
retentionSizeInMB) {
-        String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
         try {
             RetentionPolicies retention = new 
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
             policies.retention_policies = retention;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index ba9ccb1..3894f7c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -40,6 +40,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.util.StringUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -309,10 +310,10 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
 
     @Test
     public void testDeleteTopicWithMissingData() throws Exception {
-        String namespace = "prop/usc-" + System.nanoTime();
+        String namespace = BrokerTestUtil.newUniqueName("prop/usc");
         admin.namespaces().createNamespace(namespace);
 
-        String topic = namespace + "/my-topic-" + System.nanoTime();
+        String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
index f1a3017..f230c71 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.fail;
 import java.util.LinkedHashSet;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
@@ -196,7 +197,7 @@ public class PeerReplicatorTest extends ReplicatorTestBase {
         admin1.clusters().updatePeerClusterNames("r3", null);
 
         final String serviceUrl = pulsar3.getBrokerServiceUrl();
-        final String namespace1 = "pulsar/global/peer-change-repl-ns-" + 
System.nanoTime();
+        final String namespace1 = 
BrokerTestUtil.newUniqueName("pulsar/global/peer-change-repl-ns");
         admin1.namespaces().createNamespace(namespace1);
         // add replication cluster
         admin1.namespaces().setNamespaceReplicationClusters(namespace1, 
Sets.newHashSet("r1"));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index dd90017..d1595fa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -277,7 +278,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
         int numPartitions = 4;
 
-        final String topicName = 
"persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" + 
System.nanoTime();
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition");
         final TopicName destName = TopicName.get(topicName);
         final String subName = "sub1";
         final int numMsgs = 100;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9cccdd5..143625b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -127,7 +128,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         List<Future<Void>> results = Lists.newArrayList();
         for (int i = 0; i < 10; i++) {
             final TopicName dest = TopicName.get(String
-                    .format("persistent://pulsar/ns/topic-%d-%d", 
System.currentTimeMillis(), i));
+                    
.format(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i)));
 
             results.add(executor.submit(new Callable<Void>() {
                 @Override
@@ -225,11 +226,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
 
-        final String namespace = "pulsar/concurrent";
+        final String namespace = 
BrokerTestUtil.newUniqueName("pulsar/concurrent");
         admin1.namespaces().createNamespace(namespace);
         admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
         final TopicName topicName = TopicName
-                .get(String.format("persistent://" + namespace + 
"/topic-%d-%d", System.currentTimeMillis(), 0));
+                .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/topic"));
 
         @Cleanup
         PulsarClient client1 = PulsarClient.builder()
@@ -290,7 +291,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         // runtime.
         // Run a set of producer tasks to create the topics
         final TopicName dest = TopicName
-                .get(String.format("persistent://%s/repltopic-%d", namespace, 
System.nanoTime()));
+                .get(BrokerTestUtil.newUniqueName("persistent://" + namespace 
+ "/repltopic"));
 
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -369,7 +370,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         // Run a set of producer tasks to create the topics
         for (int i = 0; i < 10; i++) {
             final TopicName dest = TopicName
-                    .get(String.format("persistent://pulsar/ns/repltopic-%d", 
System.nanoTime()));
+                    
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopic"));
 
             @Cleanup
             MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -430,7 +431,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             // 1. Create a consumer using the reserved consumer id prefix 
"pulsar.repl."
 
             final TopicName dest = TopicName
-                    
.get(String.format("persistent://pulsar/ns/res-cons-id-%d", 
System.currentTimeMillis()));
+                    
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-"));
 
             // Create another consumer using replication prefix as sub id
             MessageConsumer consumer = new MessageConsumer(url2, dest, 
"pulsar.repl.");
@@ -446,7 +447,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
     public void testReplicatePeekAndSkip() throws Exception {
 
         final TopicName dest = TopicName.get(
-                String.format("persistent://pulsar/ns/peekAndSeekTopic-%d", 
System.currentTimeMillis()));
+                
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/peekAndSeekTopic"));
 
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -472,7 +473,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         SortedSet<String> testDests = new TreeSet<String>();
 
         final TopicName dest = TopicName
-                
.get(String.format("persistent://pulsar/ns/clearBacklogTopic-%d", 
System.currentTimeMillis()));
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
         testDests.add(dest.toString());
 
         @Cleanup
@@ -502,7 +503,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         // This test is to verify that reset cursor fails on global topic
         final TopicName dest = TopicName
-                .get(String.format("persistent://pulsar/ns/resetrepltopic-%d", 
System.nanoTime()));
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetrepltopic"));
 
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -526,7 +527,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         // Run a set of producer tasks to create the topics
         final TopicName dest = TopicName
-                .get(String.format("persistent://pulsar/ns/repltopicbatch-%d", 
System.nanoTime()));
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"));
 
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest, true);
@@ -580,7 +581,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
     @Test(timeOut = 30000)
     public void testDeleteReplicatorFailure() throws Exception {
         log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure 
---");
-        final String topicName = "persistent://pulsar/ns/repltopicbatch-" + 
System.currentTimeMillis() + "-";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
         final TopicName dest = TopicName.get(topicName);
 
         @Cleanup
@@ -621,7 +622,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
     @Test(priority = 5, timeOut = 30000)
     public void testReplicatorProducerClosing() throws Exception {
         log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure 
---");
-        final String topicName = "persistent://pulsar/ns/repltopicbatch-" + 
System.currentTimeMillis() + "-";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
         final TopicName dest = TopicName.get(topicName);
 
         @Cleanup
@@ -662,7 +663,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             Thread.sleep(200);
 
             TopicName dest = TopicName
-                    .get(String.format("persistent://pulsar/ns1/%s-%d", 
policy, System.currentTimeMillis()));
+                    
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy));
 
             // Producer on r1
             @Cleanup
@@ -721,7 +722,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
      */
     @Test(timeOut = 15000)
     public void testCloseReplicatorStartProducer() throws Exception {
-        TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor-" 
+ System.currentTimeMillis() + "-");
+        TopicName dest = 
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
         // Producer on r1
         @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -767,7 +768,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
     @Test(timeOut = 30000)
     public void verifyChecksumAfterReplication() throws Exception {
-        final String topicName = 
"persistent://pulsar/ns/checksumAfterReplication-" + System.currentTimeMillis() 
+ "-";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/checksumAfterReplication");
 
         PulsarClient c1 = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
         Producer<byte[]> p1 = c1.newProducer().topic(topicName)
@@ -805,11 +806,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         log.info("--- Starting ReplicatorTest::{} --- ", methodName);
 
-        final String namespace = "pulsar/partitionedNs-" + isPartitionedTopic;
-        final String persistentTopicName =
-                "persistent://" + namespace + "/partTopic-" + 
System.currentTimeMillis() + "-" + isPartitionedTopic;
-        final String nonPersistentTopicName =
-                "non-persistent://" + namespace + "/partTopic-" + 
System.currentTimeMillis() + "-"+ isPartitionedTopic;
+        final String namespace = 
BrokerTestUtil.newUniqueName("pulsar/partitionedNs-" + isPartitionedTopic);
+        final String persistentTopicName = 
BrokerTestUtil.newUniqueName("persistent://" + namespace + "/partTopic-" + 
isPartitionedTopic);
+        final String nonPersistentTopicName = 
BrokerTestUtil.newUniqueName("non-persistent://" + namespace + "/partTopic-" + 
isPartitionedTopic);
         BrokerService brokerService = pulsar1.getBrokerService();
 
         admin1.namespaces().createNamespace(namespace);
@@ -852,7 +851,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
             }
             assertTrue(e.getCause() instanceof NamingException);
         }
-
+        
     }
 
     @Test
@@ -861,7 +860,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
 
         final String namespace = "pulsar/global/repl";
-        final String topicName = String.format("persistent://%s/topic1-%d", 
namespace, System.currentTimeMillis());
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/topic1");
         admin1.namespaces().createNamespace(namespace);
         admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
         admin1.topics().createPartitionedTopic(topicName, 4);
@@ -908,8 +907,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         final String cluster1 = pulsar1.getConfig().getClusterName();
         final String cluster2 = pulsar2.getConfig().getClusterName();
-        final String namespace = "pulsar/ns-" + System.nanoTime();
-        final String topicName = "persistent://" + namespace + "/topic1";
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/topic1");
         int startPartitions = 4;
         int newPartitions = 8;
         final String subscriberName = "sub1";
@@ -960,9 +959,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         final String cluster1 = pulsar1.getConfig().getClusterName();
         final String cluster2 = pulsar2.getConfig().getClusterName();
-        final String namespace = "pulsar/ns-" + System.nanoTime();
-        final String partitionedTopicName = topicPrefix + namespace + 
topicName + "-partitioned";
-        final String nonPartitionedTopicName = topicPrefix + namespace + 
topicName + "-non-partitioned";
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+        final String partitionedTopicName = 
BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + 
"-partitioned");
+        final String nonPartitionedTopicName = 
BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + 
"-non-partitioned");
         final int startPartitions = 4;
         admin1.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster1, cluster2));
         admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 799d7f1..804dfbb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -71,7 +72,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase 
{
     @Test
     public void testDelayedDelivery()
             throws Exception {
-        String topic = "testNegativeAcks-" + System.nanoTime();
+        String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
 
         @Cleanup
         Consumer<String> failoverConsumer = 
pulsarClient.newConsumer(Schema.STRING)
@@ -126,7 +127,7 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
     @Test
     public void testInterleavedMessages()
             throws Exception {
-        String topic = "testInterleavedMessages-" + System.nanoTime();
+        String topic = BrokerTestUtil.newUniqueName("testInterleavedMessages");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -178,7 +179,7 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
     @Test
     public void testEverythingFilteredInMultipleReads()
             throws Exception {
-        String topic = "testEverythingFilteredInMultipleReads-" + 
System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("testEverythingFilteredInMultipleReads");
 
         @Cleanup
         Consumer<String> sharedConsumer = 
pulsarClient.newConsumer(Schema.STRING)
@@ -227,7 +228,7 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
     @Test
     public void testDelayedDeliveryWithMultipleConcurrentReadEntries()
             throws Exception {
-        String topic = "persistent://public/default/testDelayedDelivery-" + 
System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedDelivery");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -289,7 +290,7 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
 
     @Test
     public void testOrderingDispatch() throws PulsarClientException {
-        String topic = "persistent://public/default/testOrderingDispatch-" + 
System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/testOrderingDispatch");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
index d284254..192f8fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertTrue;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -49,7 +50,7 @@ public class ReplicatedSubscriptionConfigTest extends 
ProducerConsumerBase {
     @Test
     public void createReplicatedSubscription() throws Exception {
         this.conf.setEnableReplicatedSubscriptions(true);
-        String topic = "createReplicatedSubscription-" + System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("createReplicatedSubscription");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -71,7 +72,7 @@ public class ReplicatedSubscriptionConfigTest extends 
ProducerConsumerBase {
     @Test
     public void upgradeToReplicatedSubscription() throws Exception {
         this.conf.setEnableReplicatedSubscriptions(true);
-        String topic = "upgradeToReplicatedSubscription-" + System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscription");
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -97,7 +98,7 @@ public class ReplicatedSubscriptionConfigTest extends 
ProducerConsumerBase {
     @Test
     public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception 
{
         this.conf.setEnableReplicatedSubscriptions(true);
-        String topic = "upgradeToReplicatedSubscriptionAfterRestart-" + 
System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscriptionAfterRestart");
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -125,7 +126,7 @@ public class ReplicatedSubscriptionConfigTest extends 
ProducerConsumerBase {
     @Test
     public void testDisableReplicatedSubscriptions() throws Exception {
         this.conf.setEnableReplicatedSubscriptions(false);
-        String topic = "disableReplicatedSubscriptions-" + System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("disableReplicatedSubscriptions");
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("sub")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index a1c9a00..c3a7324 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -32,6 +32,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -601,7 +602,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testMessageBuilderLoadConf() throws Exception {
-        String topic = "my-topic-" + System.nanoTime();
+        String topic = BrokerTestUtil.newUniqueName("my-topic");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 2b63411..1bec4d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -222,7 +223,7 @@ public class SubscriptionMessageDispatchThrottlingTest 
extends MessageDispatchTh
         log.info("-- Starting {} test --", methodName);
 
         final String namespace = "my-property/throttling_ns";
-        final String topicName = "persistent://" + namespace + 
"/throttlingAll-" + System.nanoTime();
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/throttlingAll");
         final String subName = "my-subscriber-name-" + subscription;
 
         final int byteRate = 100;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
index a81f6a1..af26630 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -65,7 +66,7 @@ public class ConsumerDedupPermitsUpdate extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000, dataProvider = "combinations")
     public void testConsumerDedup(boolean batchingEnabled, int 
receiverQueueSize) throws Exception {
-        String topic = "persistent://my-property/my-ns/my-topic-" + 
System.nanoTime();
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 6c9de8b..366695a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -103,7 +104,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
             throws Exception {
         log.info("Test negative acks batching={} partitions={} subType={} 
negAckDelayMs={}", batching, usePartitions,
                 subscriptionType, negAcksDelayMillis);
-        String topic = "testNegativeAcks-" + System.nanoTime();
+        String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 4845c2d..8f433d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -50,6 +50,7 @@ import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -266,7 +267,7 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
     public void unsubscribeTest() throws Exception {
         final String namespace = "my-property/my-ns";
         final String topic = namespace + "/" + "my-topic7";
-        final String topicName = "persistent://" + topic + System.nanoTime();
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ topic);
         admin.topics().createPartitionedTopic(topicName, 3);
 
         final String subscription = "my-sub";

Reply via email to