This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6bc233dc3 [cleanup] Convert 13 test classes to SharedPulsarBaseTest 
(#25331)
7f6bc233dc3 is described below

commit 7f6bc233dc37d0bd981f9bfe3ccc104069ac39a4
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Mar 17 11:15:53 2026 -0700

    [cleanup] Convert 13 test classes to SharedPulsarBaseTest (#25331)
---
 .../impl/ConsumerUnsubscribeIntegrationTest.java   |  24 +--
 .../api/ExposeMessageRedeliveryCountTest.java      |  24 +--
 .../apache/pulsar/client/api/MemoryLimitTest.java  |  22 +--
 .../api/SimpleTypedProducerConsumerTest.java       | 179 ++++++++++++---------
 .../client/impl/CompactedOutBatchMessageTest.java  |  21 +--
 .../client/impl/ConsumeBaseExceptionTest.java      |  23 +--
 .../pulsar/client/impl/ConsumerCloseTest.java      |  26 +--
 .../impl/ConsumerDedupPermitsUpdateTest.java       |  22 +--
 .../client/impl/ConsumerMemoryLimitTest.java       |  21 +--
 .../client/impl/DispatchAccordingPermitsTest.java  |  21 +--
 .../impl/HierarchyTopicAutoCreationTest.java       |  33 ++--
 .../client/impl/ProduceWithMessageIdTest.java      |  16 +-
 .../pulsar/client/impl/TopicFromMessageTest.java   |  42 +++--
 13 files changed, 178 insertions(+), 296 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
index 547958de845..ef33a61139d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
@@ -21,38 +21,22 @@ package org.apache.bookkeeper.mledger.impl;
 import static org.testng.Assert.assertEquals;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-impl")
-public class ConsumerUnsubscribeIntegrationTest extends ProducerConsumerBase {
-
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerUnsubscribeIntegrationTest extends SharedPulsarBaseTest {
 
     @Test
     public void testUnSubscribeWhenCursorNotExists() throws Exception {
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topic = newTopicName();
         final String subscription = "s1";
         admin.topics().createNonPartitionedTopic(topic);
         admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
@@ -65,7 +49,7 @@ public class ConsumerUnsubscribeIntegrationTest extends 
ProducerConsumerBase {
         consumer.acknowledge(consumer.receive(2, TimeUnit.SECONDS));
 
         PersistentTopic persistentTopic =
-                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+                (PersistentTopic) getTopic(topic, false).join().get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(subscription);
         Awaitility.await().untilAsserted(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
index a9e21f77aa1..0ddaad34e0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
@@ -21,32 +21,18 @@ package org.apache.pulsar.client.api;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class ExposeMessageRedeliveryCountTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ExposeMessageRedeliveryCountTest extends SharedPulsarBaseTest {
 
     @Test(timeOut = 30000)
     public void testRedeliveryCount() throws PulsarClientException {
 
-        final String topic = "persistent://my-property/my-ns/redeliveryCount";
+        final String topic = newTopicName();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
@@ -81,7 +67,7 @@ public class ExposeMessageRedeliveryCountTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testRedeliveryCountWithPartitionedTopic() throws 
PulsarClientException, PulsarAdminException {
 
-        final String topic = 
"persistent://my-property/my-ns/redeliveryCount.partitioned";
+        final String topic = newTopicName();
 
         admin.topics().createPartitionedTopic(topic, 3);
 
@@ -119,7 +105,7 @@ public class ExposeMessageRedeliveryCountTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testRedeliveryCountWhenConsumerDisconnected() throws 
PulsarClientException {
 
-        String topic = 
"persistent://my-property/my-ns/testRedeliveryCountWhenConsumerDisconnected";
+        String topic = newTopicName();
 
         Consumer<String> consumer0 = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
index 60eb79e77bc..6ed5a1118b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
@@ -23,17 +23,16 @@ import static org.testng.Assert.fail;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import 
org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarTestClient;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class MemoryLimitTest extends ProducerConsumerBase {
+public class MemoryLimitTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "batchingAndMemoryLimit")
     public Object[][] provider() {
@@ -44,26 +43,13 @@ public class MemoryLimitTest extends ProducerConsumerBase {
         };
     }
 
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test(dataProvider = "batchingAndMemoryLimit")
     public void testRejectMessages(boolean batching, int memoryLimit)
             throws Exception {
         String topic = newTopicName();
 
         ClientBuilder clientBuilder = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .serviceUrl(getBrokerServiceUrl())
                 .memoryLimit(memoryLimit, SizeUnit.KILO_BYTES);
 
         @Cleanup
@@ -120,7 +106,7 @@ public class MemoryLimitTest extends ProducerConsumerBase {
         String t2 = newTopicName();
 
         ClientBuilder clientBuilder = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .serviceUrl(getBrokerServiceUrl())
                 .memoryLimit(memoryLimit, SizeUnit.KILO_BYTES);
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 293d3adcb87..d03f6746d1e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -32,7 +32,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -46,43 +47,37 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
+public class SimpleTypedProducerConsumerTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);
 
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
+    private <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T 
receivedMessage, T expectedMessage) {
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
     }
 
     @Test
     public void testJsonProducerAndConsumer() throws Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", "testJsonProducerAndConsumer");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         JSONSchema<JsonEncodedPojo> jsonSchema =
             
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
             .newConsumer(jsonSchema)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .subscriptionName("my-subscriber-name")
             .subscribe();
 
         Producer<JsonEncodedPojo> producer = pulsarClient
             .newProducer(jsonSchema)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .create();
 
         for (int i = 0; i < 10; i++) {
@@ -103,24 +98,28 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.acknowledgeCumulative(msg);
         consumer.close();
 
-        SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-            .getSchema("my-property/my-ns/my-topic1")
+        SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+            .getSchemaRegistryService()
+            .getSchema(schemaKey)
             .get();
 
         Assert.assertEquals(storedSchema.schema.getData(), 
jsonSchema.getSchemaInfo().getSchema());
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", "testJsonProducerAndConsumer");
     }
 
     @Test
     public void testJsonProducerAndConsumerWithPrestoredSchema() throws 
Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", 
"testJsonProducerAndConsumerWithPrestoredSchema");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         JSONSchema<JsonEncodedPojo> jsonSchema =
             
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
 
-        pulsar.getSchemaRegistryService()
-            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+        SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+            .putSchemaIfAbsent(schemaKey,
                 SchemaData.builder()
                     .type(SchemaType.JSON)
                     .isDeleted(false)
@@ -134,36 +133,40 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
             .newConsumer(jsonSchema)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .subscriptionName("my-subscriber-name")
             .subscribe();
 
         Producer<JsonEncodedPojo> producer = pulsarClient
             .newProducer(jsonSchema)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .create();
 
         consumer.close();
         producer.close();
 
-        SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-            .getSchema("my-property/my-ns/my-topic1")
+        SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+            .getSchemaRegistryService()
+            .getSchema(schemaKey)
             .get();
 
         Assert.assertEquals(storedSchema.schema.getData(), 
jsonSchema.getSchemaInfo().getSchema());
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", 
"testJsonProducerAndConsumerWithPrestoredSchema");
     }
 
     @Test
     public void testWrongCorruptedSchema() throws Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", "testWrongCorruptedSchema");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         byte[] randomSchemaBytes = "hello".getBytes();
 
         try {
-            pulsar.getSchemaRegistryService()
-                .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+            
SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+                .putSchemaIfAbsent(schemaKey,
                     SchemaData.builder()
                         .type(SchemaType.JSON)
                         .isDeleted(false)
@@ -179,25 +182,28 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
             assertTrue(e.getCause() instanceof InvalidSchemaDataException);
         }
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", "testWrongCorruptedSchema");
     }
 
     @Test
     public void testProtobufProducerAndConsumer() throws Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", "testProtobufProducerAndConsumer");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> 
protobufSchema =
                 
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
 
         Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> 
consumer = pulsarClient
                 .newConsumer(protobufSchema)
-                .topic("persistent://my-property/my-ns/my-topic1")
+                .topic(topic)
                 .subscriptionName("my-subscriber-name")
                 .subscribe();
 
         Producer<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> 
producer = pulsarClient
                 .newProducer(protobufSchema)
-                .topic("persistent://my-property/my-ns/my-topic1")
+                .topic(topic)
                 .create();
 
         for (int i = 0; i < 10; i++) {
@@ -222,24 +228,28 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.acknowledgeCumulative(msg);
         consumer.close();
 
-        SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-                .getSchema("my-property/my-ns/my-topic1")
+        SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+                .getSchemaRegistryService()
+                .getSchema(schemaKey)
                 .get();
 
         Assert.assertEquals(storedSchema.schema.getData(), 
protobufSchema.getSchemaInfo().getSchema());
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", "testProtobufProducerAndConsumer");
     }
 
     @Test(expectedExceptions = {PulsarClientException.class})
     public void testProtobufConsumerWithWrongPrestoredSchema() throws 
Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", 
"testProtobufConsumerWithWrongPrestoredSchema");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> 
schema =
                 
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
 
-        pulsar.getSchemaRegistryService()
-                .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+        SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+                .putSchemaIfAbsent(schemaKey,
                         SchemaData.builder()
                                 .type(SchemaType.PROTOBUF)
                                 .isDeleted(false)
@@ -255,16 +265,19 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
                 .newConsumer(AvroSchema.of
                         
(SchemaDefinition.<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong>builder().
                         
withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build()))
-                .topic("persistent://my-property/my-ns/my-topic1")
+                .topic(topic)
                 .subscriptionName("my-subscriber-name")
                 .subscribe();
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", 
"testProtobufConsumerWithWrongPrestoredSchema");
     }
 
    @Test
    public void testAvroProducerAndConsumer() throws Exception {
-       log.info("-- Starting {} test --", methodName);
+       log.info("-- Starting {} test --", "testAvroProducerAndConsumer");
+
+       final String topic = newTopicName();
+       final String schemaKey = topic.replace("persistent://", "");
 
        AvroSchema<AvroEncodedPojo> avroSchema =
            AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -272,13 +285,13 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
        Consumer<AvroEncodedPojo> consumer = pulsarClient
            .newConsumer(avroSchema)
-           .topic("persistent://my-property/my-ns/my-topic1")
+           .topic(topic)
            .subscriptionName("my-subscriber-name")
            .subscribe();
 
        Producer<AvroEncodedPojo> producer = pulsarClient
            .newProducer(avroSchema)
-           .topic("persistent://my-property/my-ns/my-topic1")
+           .topic(topic)
            .create();
 
        for (int i = 0; i < 10; i++) {
@@ -299,19 +312,23 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
        consumer.acknowledgeCumulative(msg);
        consumer.close();
 
-       SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-           .getSchema("my-property/my-ns/my-topic1")
+       SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+           .getSchemaRegistryService()
+           .getSchema(schemaKey)
            .get();
 
        Assert.assertEquals(storedSchema.schema.getData(), 
avroSchema.getSchemaInfo().getSchema());
 
-       log.info("-- Exiting {} test --", methodName);
+       log.info("-- Exiting {} test --", "testAvroProducerAndConsumer");
 
    }
 
     @Test(expectedExceptions = {PulsarClientException.class})
     public void testAvroConsumerWithWrongRestoredSchema() throws Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", 
"testAvroConsumerWithWrongRestoredSchema");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         byte[] randomSchemaBytes = ("{\n"
             + "     \"type\": \"record\",\n"
@@ -323,8 +340,8 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
             + "     ]\n"
             + "} ").getBytes();
 
-        pulsar.getSchemaRegistryService()
-            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+        SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+            .putSchemaIfAbsent(schemaKey,
                 SchemaData.builder()
                     .type(SchemaType.AVRO)
                     .isDeleted(false)
@@ -338,11 +355,11 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
         Consumer<AvroEncodedPojo> consumer = pulsarClient
             
.newConsumer(AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
                     
withPojo(AvroEncodedPojo.class).withAlwaysAllowNull(false).build()))
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .subscriptionName("my-subscriber-name")
             .subscribe();
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", 
"testAvroConsumerWithWrongRestoredSchema");
     }
 
     public static class AvroEncodedPojo {
@@ -433,7 +450,10 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
-       log.info("-- Starting {} test --", methodName);
+       log.info("-- Starting {} test --", 
"testAvroProducerAndAutoSchemaConsumer");
+
+       final String topic = newTopicName();
+       final String schemaKey = topic.replace("persistent://", "");
 
        AvroSchema<AvroEncodedPojo> avroSchema =
            AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -441,7 +461,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
        Producer<AvroEncodedPojo> producer = pulsarClient
            .newProducer(avroSchema)
-           .topic("persistent://my-property/my-ns/my-topic1")
+           .topic(topic)
            .create();
 
        for (int i = 0; i < 10; i++) {
@@ -451,7 +471,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
        Consumer<GenericRecord> consumer = pulsarClient
            .newConsumer(Schema.AUTO_CONSUME())
-           .topic("persistent://my-property/my-ns/my-topic1")
+           .topic(topic)
            .subscriptionName("my-subscriber-name")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscribe();
@@ -470,19 +490,23 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
        consumer.acknowledgeCumulative(msg);
        consumer.close();
 
-       SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-           .getSchema("my-property/my-ns/my-topic1")
+       SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+           .getSchemaRegistryService()
+           .getSchema(schemaKey)
            .get();
 
        Assert.assertEquals(storedSchema.schema.getData(), 
avroSchema.getSchemaInfo().getSchema());
 
-       log.info("-- Exiting {} test --", methodName);
+       log.info("-- Exiting {} test --", 
"testAvroProducerAndAutoSchemaConsumer");
 
    }
 
    @Test
     public void testAvroProducerAndAutoSchemaReader() throws Exception {
-       log.info("-- Starting {} test --", methodName);
+       log.info("-- Starting {} test --", 
"testAvroProducerAndAutoSchemaReader");
+
+       final String topic = newTopicName();
+       final String schemaKey = topic.replace("persistent://", "");
 
        AvroSchema<AvroEncodedPojo> avroSchema =
            AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -490,7 +514,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
        Producer<AvroEncodedPojo> producer = pulsarClient
            .newProducer(avroSchema)
-           .topic("persistent://my-property/my-ns/my-topic1")
+           .topic(topic)
            .create();
 
        for (int i = 0; i < 10; i++) {
@@ -500,7 +524,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
        Reader<GenericRecord> reader = pulsarClient
                .newReader(Schema.AUTO_CONSUME())
-               .topic("persistent://my-property/my-ns/my-topic1")
+               .topic(topic)
                .startMessageId(MessageId.earliest)
            .create();
 
@@ -517,19 +541,23 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
        // Acknowledge the consumption of all messages at once
        reader.close();
 
-       SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-           .getSchema("my-property/my-ns/my-topic1")
+       SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+           .getSchemaRegistryService()
+           .getSchema(schemaKey)
            .get();
 
        Assert.assertEquals(storedSchema.schema.getData(), 
avroSchema.getSchemaInfo().getSchema());
 
-       log.info("-- Exiting {} test --", methodName);
+       log.info("-- Exiting {} test --", 
"testAvroProducerAndAutoSchemaReader");
 
    }
 
     @Test
     public void testAutoBytesProducer() throws Exception {
-        log.info("-- Starting {} test --", methodName);
+        log.info("-- Starting {} test --", "testAutoBytesProducer");
+
+        final String topic = newTopicName();
+        final String schemaKey = topic.replace("persistent://", "");
 
         AvroSchema<AvroEncodedPojo> avroSchema =
             AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -537,7 +565,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
         try (Producer<AvroEncodedPojo> producer = pulsarClient
             .newProducer(avroSchema)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .create()) {
             for (int i = 0; i < 10; i++) {
                 String message = "my-message-" + i;
@@ -547,7 +575,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
         try (Producer<byte[]> producer = pulsarClient
             .newProducer(Schema.AUTO_PRODUCE_BYTES())
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .create()) {
             // try to produce junk data
             for (int i = 10; i < 20; i++) {
@@ -572,7 +600,7 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
 
         Consumer<GenericRecord> consumer = pulsarClient
             .newConsumer(Schema.AUTO_CONSUME())
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .subscriptionName("my-subscriber-name")
             .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
             .subscribe();
@@ -591,19 +619,20 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.acknowledgeCumulative(msg);
         consumer.close();
 
-        SchemaRegistry.SchemaAndMetadata storedSchema = 
pulsar.getSchemaRegistryService()
-            .getSchema("my-property/my-ns/my-topic1")
+        SchemaRegistry.SchemaAndMetadata storedSchema = 
SharedPulsarCluster.get().getPulsarService()
+            .getSchemaRegistryService()
+            .getSchema(schemaKey)
             .get();
 
         Assert.assertEquals(storedSchema.schema.getData(), 
avroSchema.getSchemaInfo().getSchema());
 
-        log.info("-- Exiting {} test --", methodName);
+        log.info("-- Exiting {} test --", "testAutoBytesProducer");
 
     }
 
     @Test
     public void testMessageBuilderLoadConf() throws Exception {
-        String topic = BrokerTestUtil.newUniqueName("my-topic");
+        String topic = newTopicName();
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index 0eff19d31a3..56bf88c6429 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -22,36 +22,21 @@ import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.testng.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class CompactedOutBatchMessageTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class CompactedOutBatchMessageTest extends SharedPulsarBaseTest {
 
     @Test
     public void testCompactedOutMessages() throws Exception {
-        final String topic1 = "persistent://my-property/my-ns/my-topic";
+        final String topic1 = newTopicName();
 
         BrokerEntryMetadata brokerEntryMetadata = new 
BrokerEntryMetadata().setBrokerTimestamp(1).setBrokerTimestamp(1);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
index ef1c993642b..e64871f9d75 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
@@ -18,33 +18,18 @@
  */
 package org.apache.pulsar.client.impl;
 
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class ConsumeBaseExceptionTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumeBaseExceptionTest extends SharedPulsarBaseTest {
 
     @Test
     public void testClosedConsumer() throws PulsarClientException {
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName")
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(newTopicName())
                 .subscriptionName("my-subscription").subscribe();
         consumer.close();
         Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
@@ -62,7 +47,7 @@ public class ConsumeBaseExceptionTest extends 
ProducerConsumerBase {
     @Test
     public void testListener() throws PulsarClientException {
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName")
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(newTopicName())
                 
.subscriptionName("my-subscription").messageListener((consumer1, msg) -> {
 
                 }).subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
index b9355d19c27..6d29e30f46b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
@@ -21,36 +21,20 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertTrue;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
-public class ConsumerCloseTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerCloseTest extends SharedPulsarBaseTest {
 
     @Test
     public void testReceiveWillDoneAfterClosedConsumer() throws Exception {
-        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String tpName = newTopicName();
         String subName = "test-sub";
         admin.topics().createNonPartitionedTopic(tpName);
         admin.topics().createSubscription(tpName, subName, MessageId.earliest);
@@ -65,10 +49,10 @@ public class ConsumerCloseTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReceiveWillDoneAfterTopicDeleted() throws Exception {
-        String namespace = "public/default";
+        String namespace = getNamespace();
         admin.namespaces().setAutoTopicCreation(namespace, 
AutoTopicCreationOverride.builder()
                 .allowAutoTopicCreation(false).build());
-        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String tpName = newTopicName();
         String subName = "test-sub";
         admin.topics().createNonPartitionedTopic(tpName);
         admin.topics().createSubscription(tpName, subName, MessageId.earliest);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index 612af814d3a..d5fd0bf1a18 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -22,32 +22,16 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class ConsumerDedupPermitsUpdateTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerDedupPermitsUpdateTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "combinations")
     public Object[][] combinations() {
@@ -65,7 +49,7 @@ public class ConsumerDedupPermitsUpdateTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000, dataProvider = "combinations")
     public void testConsumerDedup(boolean batchingEnabled, int 
receiverQueueSize) throws Exception {
-        String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic");
+        String topic = newTopicName();
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
index cf647eeda2e..f3bf80a646c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
@@ -20,40 +20,25 @@ package org.apache.pulsar.client.impl;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
 @Slf4j
-public class ConsumerMemoryLimitTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerMemoryLimitTest extends SharedPulsarBaseTest {
 
     @Test
     public void testConsumerMemoryLimit() throws Exception {
         String topic = newTopicName();
 
         ClientBuilder clientBuilder = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .serviceUrl(getBrokerServiceUrl())
                 .memoryLimit(10, SizeUnit.KILO_BYTES);
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
index 878a368e473..f989a60d36c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
@@ -19,36 +19,21 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class DispatchAccordingPermitsTest extends ProducerConsumerBase {
-
-    @Override
-    @BeforeMethod
-    public void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    @AfterMethod(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class DispatchAccordingPermitsTest extends SharedPulsarBaseTest {
 
     /**
      * The test case is to simulate dispatch batches with different batch size 
to the consumer.
@@ -59,7 +44,7 @@ public class DispatchAccordingPermitsTest extends 
ProducerConsumerBase {
      */
     @Test
     public void testFlowPermitsWithMultiBatchesDispatch() throws 
PulsarAdminException, PulsarClientException {
-        final String topic = 
"persistent://public/default/testFlowPermitsWithMultiBatchesDispatch";
+        final String topic = newTopicName();
         final String subName = "test";
         admin.topics().createSubscription(topic, "test", MessageId.earliest);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
index f7e6f1c60d2..133cf1ad28f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
@@ -19,44 +19,27 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.List;
-import java.util.UUID;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
 @Slf4j
-public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
-
-    @Override
-    @BeforeMethod
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    @AfterMethod(alwaysRun = true)
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class HierarchyTopicAutoCreationTest extends SharedPulsarBaseTest {
 
     @Test(invocationCount = 3)
     @SneakyThrows
     public void testPartitionedTopicAutoCreation() {
-        // Create namespace
-        final String namespace = "public/testPartitionedTopicAutoCreation";
-        admin.namespaces().createNamespace(namespace);
+        final String namespace = getNamespace();
         // Set policies
         final AutoTopicCreationOverride expectedPolicies = 
AutoTopicCreationOverride.builder()
                 .allowAutoTopicCreation(true)
@@ -69,7 +52,8 @@ public class HierarchyTopicAutoCreationTest extends 
ProducerConsumerBase {
                 .getAutoTopicCreation(namespace);
         Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies);
         // Background invalidate cache
-        final MetadataCache<Policies> nsCache = 
pulsar.getPulsarResources().getNamespaceResources().getCache();
+        final MetadataCache<Policies> nsCache = 
SharedPulsarCluster.get().getPulsarService()
+                .getPulsarResources().getNamespaceResources().getCache();
         @Cleanup("interrupt")
         final Thread t1 = new Thread(() -> {
             while (!Thread.currentThread().isInterrupted()) {
@@ -79,7 +63,7 @@ public class HierarchyTopicAutoCreationTest extends 
ProducerConsumerBase {
         t1.start();
 
         // trigger auto-creation
-        final String topicName = "persistent://" + namespace + "/test-" + 
UUID.randomUUID();
+        final String topicName = newTopicName();
         @Cleanup final Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topicName)
                 .create();
@@ -89,7 +73,8 @@ public class HierarchyTopicAutoCreationTest extends 
ProducerConsumerBase {
                 TopicName.get(topicName).getPartition(0).toString()); // 
expect partitioned topic
 
         // double-check policies
-        final AutoTopicCreationOverride actualPolicies2 = 
admin.namespaces().getAutoTopicCreation(namespace);
+        final AutoTopicCreationOverride actualPolicies2 = admin.namespaces()
+                .getAutoTopicCreation(namespace);
         Assert.assertEquals(actualPolicies2, expectedPolicies);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
index bb69c0daefc..ff72f23b0a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
@@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MockBrokerService;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -42,25 +42,21 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
 @Slf4j
-public class ProduceWithMessageIdTest extends ProducerConsumerBase {
+public class ProduceWithMessageIdTest extends SharedPulsarBaseTest {
     MockBrokerService mockBrokerService;
 
     @BeforeClass(alwaysRun = true)
-    public void setup() throws Exception {
+    public void setupMockBroker() throws Exception {
         mockBrokerService = new MockBrokerService();
         mockBrokerService.start();
-        super.internalSetup();
-        super.producerBaseSetup();
     }
 
-    @Override
     @AfterClass(alwaysRun = true)
-    public void cleanup() throws Exception {
+    public void cleanupMockBroker() throws Exception {
         if (mockBrokerService != null) {
             mockBrokerService.stop();
             mockBrokerService = null;
         }
-        super.internalCleanup();
     }
 
     @Test
@@ -81,7 +77,7 @@ public class ProduceWithMessageIdTest extends 
ProducerConsumerBase {
                 .serviceUrl(mockBrokerService.getBrokerAddress())
                 .build();
 
-        String topic = "persistent://public/default/t1";
+        String topic = newTopicName();
         ProducerImpl<byte[]> producer =
                 (ProducerImpl<byte[]>) 
client.newProducer().topic(topic).enableBatching(false).create();
 
@@ -129,7 +125,7 @@ public class ProduceWithMessageIdTest extends 
ProducerConsumerBase {
 
         int batchSize = 10;
 
-        String topic = "persistent://public/default/testSendWithCallBack";
+        String topic = newTopicName();
         ProducerImpl<byte[]> producer =
                 (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic)
                         .enableBatching(true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
index 2856de00bf6..68f0f6efedb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
@@ -21,31 +21,38 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Lists;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class TopicFromMessageTest extends ProducerConsumerBase {
+public class TopicFromMessageTest extends SharedPulsarBaseTest {
 
     private static final long TEST_TIMEOUT = 90000; // 1.5 min
     private static final int BATCHING_MAX_MESSAGES_THRESHOLD = 2;
 
     @Override
-    @BeforeMethod
-    public void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    @AfterMethod(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
+    @BeforeClass(alwaysRun = true)
+    public void setupSharedCluster() throws Exception {
+        super.setupSharedCluster();
+        // These tests use short topic names (e.g. "topic1") which resolve to 
public/default
+        try {
+            admin.tenants().createTenant("public",
+                    new TenantInfoImpl(Set.of(), 
Set.of(SharedPulsarCluster.CLUSTER_NAME)));
+        } catch (Exception e) {
+            // tenant may already exist
+        }
+        try {
+            admin.namespaces().createNamespace("public/default",
+                    Set.of(SharedPulsarCluster.CLUSTER_NAME));
+        } catch (Exception e) {
+            // namespace may already exist
+        }
     }
 
     @Test(timeOut = TEST_TIMEOUT)
@@ -61,12 +68,13 @@ public class TopicFromMessageTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = TEST_TIMEOUT)
     public void testSingleTopicConsumerNoBatchFullName() throws Exception {
+        final String topic = newTopicName();
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                
.topic("my-property/my-ns/topic1").subscriptionName("sub1").subscribe();
+                .topic(topic).subscriptionName("sub1").subscribe();
              Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("my-property/my-ns/topic1").enableBatching(false).create()) {
+                .topic(topic).enableBatching(false).create()) {
             producer.send("foobar".getBytes());
-            Assert.assertEquals(consumer.receive().getTopicName(), 
"persistent://my-property/my-ns/topic1");
+            Assert.assertEquals(consumer.receive().getTopicName(), topic);
         }
     }
 

Reply via email to