This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 738ec6a [FlakyTest]Try to fix flaky test
ReplicatorTest.testReplicatorOnPartitionedTopic. (#9659)
738ec6a is described below
commit 738ec6a7d0a1b46155ca3b6436d294e38528230f
Author: Marvin Cai <[email protected]>
AuthorDate: Thu Feb 25 05:54:55 2021 -0800
[FlakyTest]Try to fix flaky test
ReplicatorTest.testReplicatorOnPartitionedTopic. (#9659)
Fixes #9457
### Motivation
Not sure why state is not cleaned up but exception is indicating metadata
of namespace the test trying to create already exist so use a different
namespace name for different test run should able to fix it.
### Modifications
Append System.nanoTime to the namespace used to avoid being affect by
previous state. This seems working fine for other test cases in the same test.
---
.../org/apache/pulsar/broker/BrokerTestUtil.java | 32 +++++++++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 3 +-
.../broker/admin/IncrementPartitionsTest.java | 7 ++-
.../apache/pulsar/broker/admin/NamespacesTest.java | 15 +++---
.../broker/service/BrokerBkEnsemblesTests.java | 5 +-
.../pulsar/broker/service/PeerReplicatorTest.java | 3 +-
.../broker/service/PersistentFailoverE2ETest.java | 3 +-
.../pulsar/broker/service/ReplicatorTest.java | 53 +++++++++++-----------
.../service/persistent/DelayedDeliveryTest.java | 11 +++--
.../ReplicatedSubscriptionConfigTest.java | 9 ++--
.../api/SimpleTypedProducerConsumerTest.java | 3 +-
.../SubscriptionMessageDispatchThrottlingTest.java | 3 +-
.../client/impl/ConsumerDedupPermitsUpdate.java | 3 +-
.../pulsar/client/impl/NegativeAcksTest.java | 3 +-
.../websocket/proxy/ProxyPublishConsumeTest.java | 3 +-
15 files changed, 101 insertions(+), 55 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
new file mode 100644
index 0000000..ff03e42
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.util.UUID;
+
+/**
+ * Holds util methods used in test.
+ */
+public class BrokerTestUtil {
+ // Generate unique name for different test run.
+ public static String newUniqueName(String prefix) {
+ return prefix + "-" + UUID.randomUUID();
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 826116c..8c8ddff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -60,6 +60,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -2621,7 +2622,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
Sets.newHashSet("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
- String ns = "prop-xyz/ns-" + System.nanoTime();
+ String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");
admin.namespaces().createNamespace(ns, 24);
admin.namespaces().deleteNamespace(ns);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index d3f5d7b..5c08fd5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
@@ -107,7 +108,8 @@ public class IncrementPartitionsTest extends
MockedPulsarServiceBaseTest {
@Test
public void testIncrementPartitionsWithNoSubscriptions() throws Exception {
- final String partitionedTopicName =
"persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime();
+ final String partitionedTopicName =
+
BrokerTestUtil.newUniqueName("persistent://prop-xyz/use/ns1/test-topic");
admin.topics().createPartitionedTopic(partitionedTopicName, 1);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
1);
@@ -133,7 +135,8 @@ public class IncrementPartitionsTest extends
MockedPulsarServiceBaseTest {
@Test
public void testIncrementPartitionsWithReaders() throws Exception {
- TopicName partitionedTopicName =
TopicName.get("persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime());
+ TopicName partitionedTopicName = TopicName.get(
+
BrokerTestUtil.newUniqueName("persistent://prop-xyz/use/ns1/test-topic"));
admin.topics().createPartitionedTopic(partitionedTopicName.toString(),
1);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName.toString()).partitions,
1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b7e3e52..ce31fd1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import static org.mockito.ArgumentMatchers.any;
@@ -1140,7 +1141,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
*/
@Test
public void testForceDeleteNamespace() throws Exception {
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
String topic = namespace + "/topic";
admin.namespaces().createNamespace(namespace, 100);
@@ -1360,7 +1361,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testDeleteNonPartitionedTopicMultipleTimes() throws Exception {
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
String topic = namespace + "/topic";
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(testLocalCluster));
@@ -1388,7 +1389,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testDeletePartitionedTopicMultipleTimes() throws Exception {
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
String topic = namespace + "/topic";
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(testLocalCluster));
@@ -1417,7 +1418,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testRetentionPolicyValidation() throws Exception {
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(testLocalCluster));
@@ -1596,7 +1597,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
public void testSubscriptionTypesEnabled() throws PulsarAdminException,
PulsarClientException {
pulsar.getConfiguration().setAuthorizationEnabled(false);
pulsar.getConfiguration().setTopicLevelPoliciesEnabled(false);
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
String topic = namespace + "/test-subscription-enabled";
admin.namespaces().createNamespace(namespace);
Set<SubscriptionType> subscriptionTypes = new HashSet<>();
@@ -1666,7 +1667,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
private void assertValidRetentionPolicyAsPartOfAllPolicies(Policies
policies, int retentionTimeInMinutes,
int
retentionSizeInMB) throws PulsarAdminException {
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
RetentionPolicies retention = new
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
policies.retention_policies = retention;
admin.namespaces().createNamespace(namespace, policies);
@@ -1675,7 +1676,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
private void assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies
policies, int retentionTimeInMinutes,
int
retentionSizeInMB) {
- String namespace = this.testTenant + "/namespace-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
try {
RetentionPolicies retention = new
RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB);
policies.retention_policies = retention;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index ba9ccb1..3894f7c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -40,6 +40,7 @@ import
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.util.StringUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -309,10 +310,10 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
@Test
public void testDeleteTopicWithMissingData() throws Exception {
- String namespace = "prop/usc-" + System.nanoTime();
+ String namespace = BrokerTestUtil.newUniqueName("prop/usc");
admin.namespaces().createNamespace(namespace);
- String topic = namespace + "/my-topic-" + System.nanoTime();
+ String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
@Cleanup
PulsarClient client = PulsarClient.builder()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
index f1a3017..f230c71 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.fail;
import java.util.LinkedHashSet;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
@@ -196,7 +197,7 @@ public class PeerReplicatorTest extends ReplicatorTestBase {
admin1.clusters().updatePeerClusterNames("r3", null);
final String serviceUrl = pulsar3.getBrokerServiceUrl();
- final String namespace1 = "pulsar/global/peer-change-repl-ns-" +
System.nanoTime();
+ final String namespace1 =
BrokerTestUtil.newUniqueName("pulsar/global/peer-change-repl-ns");
admin1.namespaces().createNamespace(namespace1);
// add replication cluster
admin1.namespaces().setNamespaceReplicationClusters(namespace1,
Sets.newHashSet("r1"));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index dd90017..d1595fa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -277,7 +278,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
int numPartitions = 4;
- final String topicName =
"persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" +
System.nanoTime();
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition");
final TopicName destName = TopicName.get(topicName);
final String subName = "sub1";
final int numMsgs = 100;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9cccdd5..143625b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -127,7 +128,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
List<Future<Void>> results = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName.get(String
- .format("persistent://pulsar/ns/topic-%d-%d",
System.currentTimeMillis(), i));
+
.format(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i)));
results.add(executor.submit(new Callable<Void>() {
@Override
@@ -225,11 +226,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
- final String namespace = "pulsar/concurrent";
+ final String namespace =
BrokerTestUtil.newUniqueName("pulsar/concurrent");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
final TopicName topicName = TopicName
- .get(String.format("persistent://" + namespace +
"/topic-%d-%d", System.currentTimeMillis(), 0));
+ .get(BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/topic"));
@Cleanup
PulsarClient client1 = PulsarClient.builder()
@@ -290,7 +291,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
// runtime.
// Run a set of producer tasks to create the topics
final TopicName dest = TopicName
- .get(String.format("persistent://%s/repltopic-%d", namespace,
System.nanoTime()));
+ .get(BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/repltopic"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -369,7 +370,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
// Run a set of producer tasks to create the topics
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName
- .get(String.format("persistent://pulsar/ns/repltopic-%d",
System.nanoTime()));
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopic"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -430,7 +431,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
// 1. Create a consumer using the reserved consumer id prefix
"pulsar.repl."
final TopicName dest = TopicName
-
.get(String.format("persistent://pulsar/ns/res-cons-id-%d",
System.currentTimeMillis()));
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-"));
// Create another consumer using replication prefix as sub id
MessageConsumer consumer = new MessageConsumer(url2, dest,
"pulsar.repl.");
@@ -446,7 +447,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
public void testReplicatePeekAndSkip() throws Exception {
final TopicName dest = TopicName.get(
- String.format("persistent://pulsar/ns/peekAndSeekTopic-%d",
System.currentTimeMillis()));
+
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/peekAndSeekTopic"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -472,7 +473,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
SortedSet<String> testDests = new TreeSet<String>();
final TopicName dest = TopicName
-
.get(String.format("persistent://pulsar/ns/clearBacklogTopic-%d",
System.currentTimeMillis()));
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
testDests.add(dest.toString());
@Cleanup
@@ -502,7 +503,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
// This test is to verify that reset cursor fails on global topic
final TopicName dest = TopicName
- .get(String.format("persistent://pulsar/ns/resetrepltopic-%d",
System.nanoTime()));
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetrepltopic"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -526,7 +527,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
// Run a set of producer tasks to create the topics
final TopicName dest = TopicName
- .get(String.format("persistent://pulsar/ns/repltopicbatch-%d",
System.nanoTime()));
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"));
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest, true);
@@ -580,7 +581,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Test(timeOut = 30000)
public void testDeleteReplicatorFailure() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure
---");
- final String topicName = "persistent://pulsar/ns/repltopicbatch-" +
System.currentTimeMillis() + "-";
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
final TopicName dest = TopicName.get(topicName);
@Cleanup
@@ -621,7 +622,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Test(priority = 5, timeOut = 30000)
public void testReplicatorProducerClosing() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure
---");
- final String topicName = "persistent://pulsar/ns/repltopicbatch-" +
System.currentTimeMillis() + "-";
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch");
final TopicName dest = TopicName.get(topicName);
@Cleanup
@@ -662,7 +663,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
Thread.sleep(200);
TopicName dest = TopicName
- .get(String.format("persistent://pulsar/ns1/%s-%d",
policy, System.currentTimeMillis()));
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy));
// Producer on r1
@Cleanup
@@ -721,7 +722,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
*/
@Test(timeOut = 15000)
public void testCloseReplicatorStartProducer() throws Exception {
- TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor-"
+ System.currentTimeMillis() + "-");
+ TopicName dest =
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
// Producer on r1
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
@@ -767,7 +768,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
@Test(timeOut = 30000)
public void verifyChecksumAfterReplication() throws Exception {
- final String topicName =
"persistent://pulsar/ns/checksumAfterReplication-" + System.currentTimeMillis()
+ "-";
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://pulsar/ns/checksumAfterReplication");
PulsarClient c1 =
PulsarClient.builder().serviceUrl(url1.toString()).build();
Producer<byte[]> p1 = c1.newProducer().topic(topicName)
@@ -805,11 +806,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::{} --- ", methodName);
- final String namespace = "pulsar/partitionedNs-" + isPartitionedTopic;
- final String persistentTopicName =
- "persistent://" + namespace + "/partTopic-" +
System.currentTimeMillis() + "-" + isPartitionedTopic;
- final String nonPersistentTopicName =
- "non-persistent://" + namespace + "/partTopic-" +
System.currentTimeMillis() + "-"+ isPartitionedTopic;
+ final String namespace =
BrokerTestUtil.newUniqueName("pulsar/partitionedNs-" + isPartitionedTopic);
+ final String persistentTopicName =
BrokerTestUtil.newUniqueName("persistent://" + namespace + "/partTopic-" +
isPartitionedTopic);
+ final String nonPersistentTopicName =
BrokerTestUtil.newUniqueName("non-persistent://" + namespace + "/partTopic-" +
isPartitionedTopic);
BrokerService brokerService = pulsar1.getBrokerService();
admin1.namespaces().createNamespace(namespace);
@@ -852,7 +851,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
}
assertTrue(e.getCause() instanceof NamingException);
}
-
+
}
@Test
@@ -861,7 +860,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
final String namespace = "pulsar/global/repl";
- final String topicName = String.format("persistent://%s/topic1-%d",
namespace, System.currentTimeMillis());
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/topic1");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2", "r3"));
admin1.topics().createPartitionedTopic(topicName, 4);
@@ -908,8 +907,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
- final String namespace = "pulsar/ns-" + System.nanoTime();
- final String topicName = "persistent://" + namespace + "/topic1";
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/topic1");
int startPartitions = 4;
int newPartitions = 8;
final String subscriberName = "sub1";
@@ -960,9 +959,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
- final String namespace = "pulsar/ns-" + System.nanoTime();
- final String partitionedTopicName = topicPrefix + namespace +
topicName + "-partitioned";
- final String nonPartitionedTopicName = topicPrefix + namespace +
topicName + "-non-partitioned";
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
+ final String partitionedTopicName =
BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName +
"-partitioned");
+ final String nonPartitionedTopicName =
BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName +
"-non-partitioned");
final int startPartitions = 4;
admin1.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster1, cluster2));
admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2", "r3"));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 799d7f1..804dfbb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -71,7 +72,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase
{
@Test
public void testDelayedDelivery()
throws Exception {
- String topic = "testNegativeAcks-" + System.nanoTime();
+ String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
@Cleanup
Consumer<String> failoverConsumer =
pulsarClient.newConsumer(Schema.STRING)
@@ -126,7 +127,7 @@ public class DelayedDeliveryTest extends
ProducerConsumerBase {
@Test
public void testInterleavedMessages()
throws Exception {
- String topic = "testInterleavedMessages-" + System.nanoTime();
+ String topic = BrokerTestUtil.newUniqueName("testInterleavedMessages");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -178,7 +179,7 @@ public class DelayedDeliveryTest extends
ProducerConsumerBase {
@Test
public void testEverythingFilteredInMultipleReads()
throws Exception {
- String topic = "testEverythingFilteredInMultipleReads-" +
System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("testEverythingFilteredInMultipleReads");
@Cleanup
Consumer<String> sharedConsumer =
pulsarClient.newConsumer(Schema.STRING)
@@ -227,7 +228,7 @@ public class DelayedDeliveryTest extends
ProducerConsumerBase {
@Test
public void testDelayedDeliveryWithMultipleConcurrentReadEntries()
throws Exception {
- String topic = "persistent://public/default/testDelayedDelivery-" +
System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedDelivery");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -289,7 +290,7 @@ public class DelayedDeliveryTest extends
ProducerConsumerBase {
@Test
public void testOrderingDispatch() throws PulsarClientException {
- String topic = "persistent://public/default/testOrderingDispatch-" +
System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/testOrderingDispatch");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
index d284254..192f8fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertTrue;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -49,7 +50,7 @@ public class ReplicatedSubscriptionConfigTest extends
ProducerConsumerBase {
@Test
public void createReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
- String topic = "createReplicatedSubscription-" + System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("createReplicatedSubscription");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -71,7 +72,7 @@ public class ReplicatedSubscriptionConfigTest extends
ProducerConsumerBase {
@Test
public void upgradeToReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
- String topic = "upgradeToReplicatedSubscription-" + System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscription");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -97,7 +98,7 @@ public class ReplicatedSubscriptionConfigTest extends
ProducerConsumerBase {
@Test
public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception
{
this.conf.setEnableReplicatedSubscriptions(true);
- String topic = "upgradeToReplicatedSubscriptionAfterRestart-" +
System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscriptionAfterRestart");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -125,7 +126,7 @@ public class ReplicatedSubscriptionConfigTest extends
ProducerConsumerBase {
@Test
public void testDisableReplicatedSubscriptions() throws Exception {
this.conf.setEnableReplicatedSubscriptions(false);
- String topic = "disableReplicatedSubscriptions-" + System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("disableReplicatedSubscriptions");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index a1c9a00..c3a7324 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -601,7 +602,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
@Test
public void testMessageBuilderLoadConf() throws Exception {
- String topic = "my-topic-" + System.nanoTime();
+ String topic = BrokerTestUtil.newUniqueName("my-topic");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 2b63411..1bec4d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -222,7 +223,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
- final String topicName = "persistent://" + namespace +
"/throttlingAll-" + System.nanoTime();
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
final int byteRate = 100;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
index a81f6a1..af26630 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -65,7 +66,7 @@ public class ConsumerDedupPermitsUpdate extends
ProducerConsumerBase {
@Test(timeOut = 30000, dataProvider = "combinations")
public void testConsumerDedup(boolean batchingEnabled, int
receiverQueueSize) throws Exception {
- String topic = "persistent://my-property/my-ns/my-topic-" +
System.nanoTime();
+ String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 6c9de8b..366695a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -103,7 +104,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
throws Exception {
log.info("Test negative acks batching={} partitions={} subType={}
negAckDelayMs={}", batching, usePartitions,
subscriptionType, negAcksDelayMillis);
- String topic = "testNegativeAcks-" + System.nanoTime();
+ String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 4845c2d..8f433d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -50,6 +50,7 @@ import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -266,7 +267,7 @@ public class ProxyPublishConsumeTest extends
ProducerConsumerBase {
public void unsubscribeTest() throws Exception {
final String namespace = "my-property/my-ns";
final String topic = namespace + "/" + "my-topic7";
- final String topicName = "persistent://" + topic + System.nanoTime();
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ topic);
admin.topics().createPartitionedTopic(topicName, 3);
final String subscription = "my-sub";