This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa4df1b3af1 [improve][broker] Tidy up the system topic. (#15252)
aa4df1b3af1 is described below

commit aa4df1b3af1298b087661a28b1d5e0f601ac6925
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Apr 25 09:39:42 2022 +0800

    [improve][broker] Tidy up the system topic. (#15252)
    
    ### Motivation
    
    For #13520, #14643, #14949, we fix some issues related to system topic but 
result in checking the system topic in different method. So it's better to tidy 
up the system topic.
    
    So put these system topic names into a new class called SystemTopicNames.
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  |  3 +-
 .../java/org/apache/pulsar/PulsarStandalone.java   | 17 ++++++---
 .../PulsarTransactionCoordinatorMetadataSetup.java |  5 +--
 .../org/apache/pulsar/broker/PulsarService.java    | 23 ++-----------
 .../broker/TransactionMetadataStoreService.java    |  7 ++--
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++--
 .../pulsar/broker/admin/impl/TransactionsBase.java | 13 +++----
 .../pulsar/broker/namespace/NamespaceService.java  | 12 +++++--
 .../ResourceUsageTopicTransportManager.java        |  8 ++---
 .../pulsar/broker/service/BrokerService.java       | 30 ++++------------
 .../SystemTopicBasedTopicPoliciesService.java      |  3 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  8 ++---
 .../service/persistent/PersistentSubscription.java |  4 +--
 .../broker/service/persistent/PersistentTopic.java | 16 ++++-----
 .../broker/service/persistent/SystemTopic.java     | 12 +++----
 .../stats/prometheus/TransactionAggregator.java    |  4 +--
 .../NamespaceEventsSystemTopicFactory.java         | 10 +++---
 .../pendingack/impl/MLPendingAckStore.java         |  9 ++---
 .../pulsar/broker/web/PulsarWebResource.java       |  6 +---
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  4 +--
 .../v3/AdminApiTransactionMultiBrokerTest.java     | 18 +++++++---
 .../broker/admin/v3/AdminApiTransactionTest.java   | 17 +++++----
 .../ResourceUsageTransportManagerTest.java         |  3 +-
 .../BrokerServiceAutoTopicCreationTest.java        |  6 ++--
 .../pulsar/broker/service/BrokerServiceTest.java   |  4 +--
 .../pulsar/broker/service/BrokerTestBase.java      | 15 ++++++++
 .../pulsar/broker/service/ReplicatorTest.java      | 25 ++++++++------
 .../TransactionMetadataStoreServiceTest.java       |  9 +++--
 .../broker/stats/ManagedLedgerMetricsTest.java     |  4 +--
 .../broker/stats/TransactionMetricsTest.java       | 14 ++++----
 .../NamespaceEventsSystemTopicServiceTest.java     |  4 +--
 .../systopic/PartitionedSystemTopicTest.java       |  4 +--
 .../TopicTransactionBufferRecoverTest.java         |  5 ++-
 .../broker/transaction/TransactionConsumeTest.java |  4 +--
 .../pulsar/broker/transaction/TransactionTest.java |  9 +++--
 .../broker/transaction/TransactionTestBase.java    | 14 ++++++--
 .../buffer/TransactionBufferCloseTest.java         |  5 ++-
 .../buffer/TransactionLowWaterMarkTest.java        |  8 ++---
 .../TransactionMetaStoreAssignmentTest.java        |  4 +--
 .../TransactionCoordinatorClientImpl.java          |  8 +++--
 .../SystemTopicNames.java}                         | 40 ++++++++++++++++++----
 .../org/apache/pulsar/common/naming/TopicName.java |  6 ----
 .../Oauth2PerformanceTransactionTest.java          |  9 +++--
 .../testclient/PerformanceTransactionTest.java     |  8 +++--
 44 files changed, 234 insertions(+), 209 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 5146b513fb5..1dbb480f29d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TenantResources;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -311,7 +312,7 @@ public class PulsarClusterMetadataSetup {
         createNamespaceIfAbsent(resources, NamespaceName.SYSTEM_NAMESPACE, 
arguments.cluster);
 
         // Create transaction coordinator assign partitioned topic
-        createPartitionedTopic(configStore, 
TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+        createPartitionedTopic(configStore, 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                 arguments.numTransactionCoordinators);
 
         localStore.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index f94491235e6..f8d1548c842 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar;
 
 import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
-import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
 import com.beust.jcommander.Parameter;
 import com.google.common.collect.Sets;
 import java.io.File;
@@ -30,9 +30,11 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -311,10 +313,15 @@ public class PulsarStandalone implements AutoCloseable {
         createNameSpace(cluster, TopicName.PUBLIC_TENANT, 
TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE);
         //create pulsar system namespace
         createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), 
SYSTEM_NAMESPACE.toString());
-        if (config.isTransactionCoordinatorEnabled() && !admin.namespaces()
-                .getTopics(SYSTEM_NAMESPACE.toString())
-                
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
-            
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        if (config.isTransactionCoordinatorEnabled()) {
+            NamespaceResources.PartitionedTopicResources 
partitionedTopicResources =
+                    
broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
+            Optional<PartitionedTopicMetadata> getResult =
+                    
partitionedTopicResources.getPartitionedTopicMetadataAsync(TRANSACTION_COORDINATOR_ASSIGN).get();
+            if (!getResult.isPresent()) {
+                
partitionedTopicResources.createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
+            }
         }
 
         log.debug("--- setup completed ---");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
index 0f66bd01ee0..66607dd4c0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
@@ -22,7 +22,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -103,7 +103,8 @@ public class PulsarTransactionCoordinatorMetadataSetup {
                     arguments.cluster);
 
             // Create transaction coordinator assign partitioned topic
-            PulsarClusterMetadataSetup.createPartitionedTopic(configStore, 
TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+            PulsarClusterMetadataSetup.createPartitionedTopic(configStore,
+                    SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                     arguments.numTransactionCoordinators);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 35cd22d4bbe..fc5feedc5e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,7 +22,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
-import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -113,7 +113,6 @@ import 
org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
@@ -273,8 +272,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private volatile CompletableFuture<Void> closeFuture;
     // key is listener name, value is pulsar address and pulsar ssl address
     private Map<String, AdvertisedListener> advertisedListeners;
-    private NamespaceName heartbeatNamespaceV1;
-    private NamespaceName heartbeatNamespaceV2;
 
     public PulsarService(ServiceConfiguration config) {
         this(config, Optional.empty(), (exitCode) -> {
@@ -723,8 +720,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             createMetricsServlet();
             this.addWebServerHandlers(webService, metricsServlet, this.config);
             this.webService.start();
-            heartbeatNamespaceV1 = 
NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
-            heartbeatNamespaceV2 = 
NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);
 
             // Refresh addresses and update configuration, since the port 
might have been dynamically assigned
             if (config.getBrokerServicePort().equals(Optional.of(0))) {
@@ -1129,7 +1124,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                     .get(config.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS)) {
                 try {
                     TopicName topicName = TopicName.get(topic);
-                    if (bundle.includes(topicName) && 
!isTransactionSystemTopic(topicName)) {
+                    if (bundle.includes(topicName) && 
!isTransactionInternalName(topicName)) {
                         CompletableFuture<Optional<Topic>> future = 
brokerService.getTopicIfExists(topic);
                         if (future != null) {
                             persistentTopics.add(future);
@@ -1709,20 +1704,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         processTerminator.accept(-1);
     }
 
-
-    public static boolean isTransactionSystemTopic(TopicName topicName) {
-        String topic = topicName.toString();
-        return 
topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
-                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
-                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
-    }
-
-    public static boolean isTransactionInternalName(TopicName topicName) {
-        String topic = topicName.toString();
-        return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
-                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
-    }
-
     @VisibleForTesting
     protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
         return new BrokerService(pulsar, ioEventLoopGroup);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 902546958c5..569f5900342 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -124,7 +125,7 @@ public class TransactionMetadataStoreService {
                             if (ex == null) {
                                 for (String topic : topics) {
                                     TopicName name = TopicName.get(topic);
-                                    if 
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
+                                    if 
(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
                                             
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
                                             && name.isPartitioned()) {
                                         
handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
@@ -144,7 +145,7 @@ public class TransactionMetadataStoreService {
                             if (ex == null) {
                                 for (String topic : topics) {
                                     TopicName name = TopicName.get(topic);
-                                    if 
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
+                                    if 
(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
                                             
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
                                             && name.isPartitioned()) {
                                         removeTransactionMetadataStore(
@@ -170,7 +171,7 @@ public class TransactionMetadataStoreService {
             if (stores.get(tcId) != null) {
                 completableFuture.complete(null);
             } else {
-                
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+                
pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames
                         .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) 
tcId.getId()).toString())
                         .thenRun(() -> internalPinnedExecutor.execute(() -> {
                     final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 909c7de8d74..5ed19f5ed96 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static 
org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
-import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.zafarkhaja.semver.Version;
 import com.google.common.collect.Lists;
@@ -723,7 +723,7 @@ public class PersistentTopicsBase extends AdminResource {
        future.thenAccept(__ -> {
            // If the topic name is a partition name, no need to get partition 
topic metadata again
            if (topicName.isPartitioned()) {
-               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+               if (isTransactionCoordinatorAssign(topicName)) {
                    internalUnloadTransactionCoordinatorAsync(asyncResponse, 
authoritative);
                } else {
                    internalUnloadNonPartitionedTopicAsync(asyncResponse, 
authoritative);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 8eff6815404..bbd2e0be9c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
@@ -68,7 +69,7 @@ public abstract class TransactionsBase extends AdminResource {
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, 
boolean authoritative,
                                                Integer coordinatorId) {
         if (coordinatorId != null) {
-            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+            
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
                     authoritative);
             TransactionMetadataStore transactionMetadataStore =
                     pulsar().getTransactionMetadataStoreService().getStores()
@@ -80,7 +81,7 @@ public abstract class TransactionsBase extends AdminResource {
             }
             
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
         } else {
-            
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+            
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                     false, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions == 0) {
                     asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
@@ -151,7 +152,7 @@ public abstract class TransactionsBase extends 
AdminResource {
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int 
mostSigBits, long leastSigBits) {
         try {
-            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+            
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
                     authoritative);
             CompletableFuture<TransactionMetadata> transactionMetadataFuture = 
new CompletableFuture<>();
             TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
@@ -260,7 +261,7 @@ public abstract class TransactionsBase extends 
AdminResource {
                                                boolean authoritative, long 
timeout, Integer coordinatorId) {
         try {
             if (coordinatorId != null) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
                         authoritative);
                 TransactionMetadataStore transactionMetadataStore =
                         
pulsar().getTransactionMetadataStoreService().getStores()
@@ -296,7 +297,7 @@ public abstract class TransactionsBase extends 
AdminResource {
                     asyncResponse.resume(transactionMetadata);
                 });
             } else {
-                
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                         false, false).thenAccept(partitionMetadata -> {
                     if (partitionMetadata.partitions == 0) {
                         asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
@@ -349,7 +350,7 @@ public abstract class TransactionsBase extends 
AdminResource {
     protected void internalGetCoordinatorInternalStats(AsyncResponse 
asyncResponse, boolean authoritative,
                                                        boolean metadata, int 
coordinatorId) {
         try {
-            TopicName topicName = 
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            TopicName topicName = 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
             validateTopicOwnership(topicName, authoritative);
             TransactionMetadataStore metadataStore = 
pulsar().getTransactionMetadataStoreService()
                     
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index a321c035c78..e1e1dc10676 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -23,6 +23,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
 import io.prometheus.client.Counter;
@@ -1372,9 +1373,16 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public static boolean isSystemServiceNamespace(String namespace) {
+        return SYSTEM_NAMESPACE.toString().equals(namespace)
+                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
+    }
+
+    public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
+        String namespace = ns.getNamespaceObject().toString();
         return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
-                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
-                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
+                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
     }
 
     public boolean registerSLANamespace() throws PulsarServerException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
index 38d1bb3c602..6ce0781781a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public class ResourceUsageTopicTransportManager implements 
ResourceUsageTranspor
             final int sendTimeoutSecs = 10;
 
             return pulsarClient.newProducer(Schema.BYTEBUFFER)
-                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString())
                     .batchingMaxPublishDelay(publishDelayMilliSecs, 
TimeUnit.MILLISECONDS)
                     .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
                     .blockIfQueueFull(false)
@@ -123,7 +124,7 @@ public class ResourceUsageTopicTransportManager implements 
ResourceUsageTranspor
 
         public ResourceUsageReader() throws PulsarClientException {
             consumer =  pulsarClient.newReader()
-                    .topic(RESOURCE_USAGE_TOPIC_NAME)
+                    .topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString())
                     .startMessageId(MessageId.latest)
                     .readerListener(this)
                     .create();
@@ -165,7 +166,6 @@ public class ResourceUsageTopicTransportManager implements 
ResourceUsageTranspor
     }
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ResourceUsageTopicTransportManager.class);
-    public  static final String RESOURCE_USAGE_TOPIC_NAME = 
"non-persistent://pulsar/system/resource-usage";
     private final PulsarService pulsarService;
     private final PulsarClient pulsarClient;
     private final ResourceUsageWriterTask pTask;
@@ -177,7 +177,7 @@ public class ResourceUsageTopicTransportManager implements 
ResourceUsageTranspor
 
     private void createTenantAndNamespace() throws PulsarServerException, 
PulsarAdminException {
         // Create a public tenant and default namespace
-        TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
+        TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC;
 
         PulsarAdmin admin = pulsarService.getAdminClient();
         ServiceConfiguration config = pulsarService.getConfig();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 27784332fdc..c3f3dc64e5a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,7 +23,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -121,7 +121,6 @@ import 
org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.validator.BindAddressValidator;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -132,13 +131,13 @@ import 
org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.BindAddress;
 import org.apache.pulsar.common.configuration.FieldContext;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -1376,7 +1375,7 @@ public class BrokerService implements Closeable {
             return;
         }
 
-        if (isTransactionSystemTopic(topicName)) {
+        if (isTransactionInternalName(topicName)) {
             String msg = String.format("Can not create transaction system 
topic %s", topic);
             log.warn(msg);
             pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
@@ -1904,7 +1903,7 @@ public class BrokerService implements Closeable {
                     this.getPulsar().getTransactionMetadataStoreService();
             // if the store belongs to this bundle, remove and close the store
             
this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store
 ->
-                    
serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN
+                    
serviceUnit.includes(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
                             .getPartition((int) 
(store.getTransactionCoordinatorID().getId()))))
                     .map(TransactionMetadataStore::getTransactionCoordinatorID)
                     .forEach(tcId -> 
closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId)));
@@ -2936,25 +2935,8 @@ public class BrokerService implements Closeable {
     }
 
     public boolean isSystemTopic(TopicName topicName) {
-        if 
(topicName.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)
-                || 
topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV1())
-                || 
topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV2())) {
-            return true;
-        }
-
-        TopicName nonePartitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-
-        // event topic
-        if 
(EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
-            return true;
-        }
-
-        String localName = nonePartitionedTopicName.getLocalName();
-        // transaction pending ack topic
-        if (StringUtils.endsWith(localName, 
MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
-            return true;
-        }
-        return false;
+        return 
NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
+                || SystemTopicNames.isSystemTopic(topicName);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index fc0700eaf77..f2db5b6be51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -236,8 +236,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
-        if (NamespaceService.checkHeartbeatNamespace(namespace) != null
-                || NamespaceService.checkHeartbeatNamespaceV2(namespace) != 
null) {
+        if (NamespaceService.isHeartbeatNamespace(namespace)) {
             result.complete(null);
             return result;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 337b4c7fc1b..1edc7ae12e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.AbstractTopic;
@@ -71,7 +72,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
@@ -536,11 +536,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        NamespaceName heartbeatNamespace = 
brokerService.pulsar().getHeartbeatNamespaceV2();
-        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ec77fe69eed..fd0922f8f71 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.util.Collections;
@@ -148,7 +148,7 @@ public class PersistentSubscription implements Subscription 
{
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? new HashMap<>() : 
Collections.unmodifiableMap(subscriptionProperties);
         if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
+                && !isEventSystemTopic(TopicName.get(topicName))) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 85ee429c026..eed8ba0d3b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
-import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
@@ -78,6 +78,7 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.AbstractTopic;
@@ -127,8 +128,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.events.EventsTopicNames;
-import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -292,7 +292,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         checkReplicatedSubscriptionControllerState();
         TopicName topicName = TopicName.get(topic);
         if 
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(topicName)) {
+                && !isEventSystemTopic(topicName)) {
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
@@ -719,7 +719,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }
 
             try {
-                if 
(!topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
+                if 
(!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
                         && !checkSubscriptionTypesEnable(subType)) {
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Topic[{" + topic + "}] 
doesn't support "
@@ -1377,11 +1377,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        NamespaceName heartbeatNamespace = 
brokerService.pulsar().getHeartbeatNamespaceV2();
-        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 988310858cd..715a684902f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -22,9 +22,9 @@ package org.apache.pulsar.broker.service.persistent;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.common.events.EventsTopicNames;
-import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 
 public class SystemTopic extends PersistentTopic {
@@ -65,7 +65,7 @@ public class SystemTopic extends PersistentTopic {
 
     @Override
     public CompletableFuture<Void> checkReplication() {
-        if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
+        if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
             return super.checkReplication();
         }
         return CompletableFuture.completedFuture(null);
@@ -75,10 +75,6 @@ public class SystemTopic extends PersistentTopic {
     public boolean isCompactionEnabled() {
         // All system topics are using compaction except `HealthCheck`,
         // even though is not explicitly set in the policies.
-        TopicName name = TopicName.get(topic);
-        NamespaceName heartbeatNamespaceV1 = 
brokerService.pulsar().getHeartbeatNamespaceV1();
-        NamespaceName heartbeatNamespaceV2 = 
brokerService.pulsar().getHeartbeatNamespaceV2();
-        return !name.getNamespaceObject().equals(heartbeatNamespaceV1)
-                && !name.getNamespaceObject().equals(heartbeatNamespaceV2);
+        return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index e6ac1535f43..3dbda26b0a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
-import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.HashMap;
 import java.util.Map;
@@ -79,7 +79,7 @@ public class TransactionAggregator {
                             
topic.getSubscriptions().values().forEach(subscription -> {
                                 try {
                                     localManageLedgerStats.get().reset();
-                                    if 
(!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
+                                    if 
(!isEventSystemTopic(TopicName.get(subscription.getTopic().getName()))
                                             && subscription instanceof  
PersistentSubscription
                                             && ((PersistentSubscription) 
subscription).checkIfPendingAckStoreInit()) {
                                         ManagedLedger managedLedger =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 59fb8b032e9..9e162f741b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.systopic;
 import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.events.EventType;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
@@ -38,7 +38,7 @@ public class NamespaceEventsSystemTopicFactory {
 
     public TopicPoliciesSystemTopicClient 
createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+                SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
         log.info("Create topic policies system topic client {}", 
topicName.toString());
         return new TopicPoliciesSystemTopicClient(client, topicName);
     }
@@ -46,7 +46,7 @@ public class NamespaceEventsSystemTopicFactory {
     public TransactionBufferSystemTopicClient 
createTransactionBufferSystemTopicClient(NamespaceName namespaceName,
                                                    
TransactionBufferSnapshotService transactionBufferSnapshotService) {
         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
         log.info("Create transaction buffer snapshot client, topicName : {}", 
topicName.toString());
         return new TransactionBufferSystemTopicClient(client, topicName, 
transactionBufferSnapshotService);
     }
@@ -55,10 +55,10 @@ public class NamespaceEventsSystemTopicFactory {
         switch (eventType) {
             case TOPIC_POLICY:
                 return TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                        EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+                        SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
             case TRANSACTION_BUFFER_SNAPSHOT:
                 return TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                        EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
             default:
                 return null;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index ccc1628154c..289cd14a716 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -45,6 +45,7 @@ import 
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
@@ -61,10 +62,6 @@ public class MLPendingAckStore implements PendingAckStore {
 
     private final ManagedCursor cursor;
 
-    public static final String PENDING_ACK_STORE_SUFFIX = 
"__transaction_pending_ack";
-
-    public static final String PENDING_ACK_STORE_CURSOR_NAME = 
"__pending_ack_state";
-
     private final SpscArrayQueue<Entry> entryQueue;
 
     //this is for replay
@@ -412,11 +409,11 @@ public class MLPendingAckStore implements PendingAckStore 
{
     }
 
     public static String getTransactionPendingAckStoreSuffix(String 
originTopicName, String subName) {
-        return TopicName.get(originTopicName) + "-" + subName + 
PENDING_ACK_STORE_SUFFIX;
+        return TopicName.get(originTopicName) + "-" + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
     }
 
     public static String getTransactionPendingAckStoreCursorName() {
-        return PENDING_ACK_STORE_CURSOR_NAME;
+        return SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(MLPendingAckStore.class);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index baa5d3fe332..1690210a80a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -775,11 +775,7 @@ public abstract class PulsarWebResource {
 
     public static CompletableFuture<ClusterDataImpl> 
checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
                                                                                
            NamespaceName namespace) {
-        if (!namespace.isGlobal()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        NamespaceName heartbeatNamespace = 
pulsarService.getHeartbeatNamespaceV2();
-        if (namespace.equals(heartbeatNamespace)) {
+        if (!namespace.isGlobal() || 
NamespaceService.isHeartbeatNamespace(namespace)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 60e591446d1..320a375145a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -65,8 +65,8 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -2794,7 +2794,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
             
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
             
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull();
         });
-        String topicPoliciesTopic = "persistent://" + myNamespace + "/" + 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        String topicPoliciesTopic = "persistent://" + myNamespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
         PersistentTopic persistentTopic =
                 (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get();
         // Trigger compaction and make sure it is finished.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
index add277fc524..c9d554a2ca8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
@@ -23,7 +23,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -48,11 +49,18 @@ public class AdminApiTransactionMultiBrokerTest extends 
TransactionTestBase {
     @Test
     public void testRedirectOfGetCoordinatorInternalStats() throws Exception {
         Map<String, String> map = admin.lookups()
-                
.lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                
.lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         while 
(map.containsValue(getPulsarServiceList().get(0).getBrokerServiceUrl())) {
-            
admin.topics().deletePartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
-            
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-            map = 
admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+            pulsarServiceList.get(0).getPulsarResources()
+                    .getNamespaceResources()
+                    .getPartitionedTopicResources()
+                            
.deletePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+            pulsarServiceList.get(0).getPulsarResources()
+                    .getNamespaceResources()
+                    .getPartitionedTopicResources()
+                    
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                            new PartitionedTopicMetadata(NUM_PARTITIONS));
+            map = 
admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         }
         //init tc stores
         pulsarClient = PulsarClient.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index aff09f2d7f1..d46b640aac3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.http.HttpStatus;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -36,8 +35,10 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -486,14 +487,14 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         ManagedLedgerInternalStats managedLedgerInternalStats = 
stats.pendingAckLogStats.managedLedgerInternalStats;
         assertEquals(TopicName.get(TopicDomain.persistent.toString(), 
"public", "default",
                 "testGetPendingAckInternalStats" + "-"
-                        + subName + 
MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
+                        + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
                 stats.pendingAckLogStats.managedLedgerName);
 
         verifyManagedLegerInternalStats(managedLedgerInternalStats, 16);
 
         ManagedLedgerInternalStats finalManagedLedgerInternalStats = 
managedLedgerInternalStats;
         managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
-            assertEquals(s, MLPendingAckStore.PENDING_ACK_STORE_CURSOR_NAME);
+            assertEquals(s, SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME);
             assertEquals(cursorStats.readPosition, 
finalManagedLedgerInternalStats.lastConfirmedEntry);
         });
 
@@ -503,7 +504,7 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
 
         assertEquals(TopicName.get(TopicDomain.persistent.toString(), 
"public", "default",
                 "testGetPendingAckInternalStats" + "-"
-                        + subName + 
MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
+                        + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
                 stats.pendingAckLogStats.managedLedgerName);
         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
     }
@@ -528,8 +529,12 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
     }
 
     private void initTransaction(int coordinatorSize) throws Exception {
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 coordinatorSize);
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(coordinatorSize));
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
         pulsarClient.close();
         Awaitility.await().until(() ->
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
index 4e54d1b3326..7b96aed47a1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
 import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.testng.annotations.AfterClass;
@@ -55,7 +56,7 @@ public class ResourceUsageTransportManagerTest extends 
MockedPulsarServiceBaseTe
 
     @Test
     public void testNamespaceCreation() throws Exception {
-        TopicName topicName = 
TopicName.get(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME);
+        TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC;
 
         
assertTrue(admin.tenants().getTenants().contains(topicName.getTenant()));
         
assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 1b3ca1616fa..936e3e0d263 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -25,7 +25,7 @@ import static org.testng.Assert.fail;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
@@ -395,7 +395,7 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         pulsar.getConfiguration().setAllowAutoTopicCreation(false);
         pulsar.getConfiguration().setSystemTopicEnabled(true);
 
-        final String topicString = "persistent://prop/ns-abc/" + 
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
+        final String topicString = "persistent://prop/ns-abc/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
 
         pulsarClient.newProducer().topic(topicString).create();
 
@@ -408,7 +408,7 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         pulsar.getConfiguration().setAllowAutoTopicCreation(false);
         pulsar.getConfiguration().setSystemTopicEnabled(true);
 
-        final String topicString = "persistent://prop/ns-abc/" + 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        final String topicString = "persistent://prop/ns-abc/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
 
         pulsarClient.newProducer().topic(topicString).create();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index b4702fe0100..8731e782348 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
-import static 
org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 69db17f4693..6412d7f32a9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.service;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +53,18 @@ public abstract class BrokerTestBase extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", 
Sets.newHashSet("test"));
     }
 
+    protected void createTransactionCoordinatorAssign() throws 
MetadataStoreException {
+        createTransactionCoordinatorAssign(1);
+    }
+
+    protected void createTransactionCoordinatorAssign(int partitions) throws 
MetadataStoreException {
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(partitions));
+    }
+
     void rolloverPerIntervalStats() {
         try {
             pulsar.getExecutor().submit(() -> 
pulsar.getBrokerService().updateRates()).get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index c31373c05fc..c8c257992cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -82,9 +82,10 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -948,23 +949,23 @@ public class ReplicatorTest extends ReplicatorTestBase {
         admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
         
assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions,
 5);
         assertEquals((int) 
admin2.topics().getList(namespace).stream().filter(topic ->
-                
!topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5);
+                
!topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5);
         // Update partitioned topic from R3
         admin3.topics().updatePartitionedTopic(persistentTopicName, 6);
         
assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions,
 6);
         assertEquals(admin3.topics().getList(namespace).stream().filter(topic 
->
-                
!topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6);
+                
!topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6);
         // Update partitioned topic from R1
         admin1.topics().updatePartitionedTopic(persistentTopicName, 7);
         
assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions,
 7);
         
assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions,
 7);
         
assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions,
 7);
         assertEquals(admin1.topics().getList(namespace).stream().filter(topic 
->
-                
!topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
+                
!topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
         assertEquals(admin2.topics().getList(namespace).stream().filter(topic 
->
-                
!topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
+                
!topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
         assertEquals(admin3.topics().getList(namespace).stream().filter(topic 
->
-                
!topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
+                
!topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
     }
 
     /**
@@ -1299,7 +1300,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         String topic = TopicName.get("persistent", 
NamespaceName.get(namespace),
                 "testDoesNotReplicateSystemTopic").toString();
         String systemTopic = TopicName.get("persistent", 
NamespaceName.get(namespace),
-                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT).toString();
+                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT).toString();
         admin1.topics().createNonPartitionedTopic(topic);
         // Replicator will not replicate System Topic other than topic policies
         initTransaction(2, admin1, pulsar1.getBrokerServiceUrl(), pulsar1);
@@ -1326,8 +1327,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
     private void initTransaction(int coordinatorSize, PulsarAdmin admin, 
String ServiceUrl,
                                  PulsarService pulsarService) throws Exception 
{
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), 
coordinatorSize);
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 coordinatorSize);
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        pulsarService.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(coordinatorSize));
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(ServiceUrl).enableTransaction(true).build();
         pulsarClient.close();
         Awaitility.await().until(() ->
@@ -1361,7 +1366,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
             list.clear();
             list.addAll(admin.topics().getList(namespace).stream()
-                    .filter(topic -> 
!topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME))
+                    .filter(topic -> 
!topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME))
                     .collect(Collectors.toList()));
             return list.size() == expectedTopicList.size();
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index e50a39c5dfd..cd073737cec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -29,7 +29,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Sets;
 import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.mledger.Position;
@@ -39,7 +38,7 @@ import 
org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -66,8 +65,8 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         super.baseSetup(configuration);
         admin.tenants().createTenant("pulsar", new 
TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
-        
admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        createTransactionCoordinatorAssign(16);
+        
admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
     }
 
     @AfterMethod(alwaysRun = true)
@@ -81,7 +80,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
         Assert.assertNotNull(transactionMetadataStoreService);
 
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
         
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await().until(() ->
                 transactionMetadataStoreService.getStores().size() == 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index 98ac2618eea..6da6026b34f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.stats;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -31,7 +30,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -99,7 +97,7 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase {
         
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
         new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 0c9f877150a..668c9254b49 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
@@ -78,7 +78,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
                         .allowedClusters(Sets.newHashSet("test"))
                         .build());
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        createTransactionCoordinatorAssign();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -90,7 +90,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
     @Test
     public void testTransactionCoordinatorMetrics() throws Exception{
         long timeout = 10000;
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
         TransactionCoordinatorID transactionCoordinatorIDTwo = 
TransactionCoordinatorID.get(1);
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
@@ -127,7 +127,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         String topic = "persistent://" + ns1 + "/test_coordinator_metrics";
         String subName = "test_coordinator_metrics";
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
-        
admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        
admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
         admin.topics().createNonPartitionedTopic(topic);
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
@@ -229,7 +229,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics";
         String subName = "test_managed_ledger_metrics";
         admin.topics().createNonPartitionedTopic(topic);
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
@@ -291,7 +291,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         String subName = "test_managed_ledger_metrics";
         String subName2 = "test_pending_ack_no_init";
         admin.topics().createNonPartitionedTopic(topic);
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
@@ -352,7 +352,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
 
     @Test
     public void testDuplicateMetricTypeDefinitions() throws Exception{
-        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
         TransactionCoordinatorID transactionCoordinatorIDTwo = 
TransactionCoordinatorID.get(1);
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index b680d74b86b..244d7587a42 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -30,10 +30,10 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.events.TopicPoliciesEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -138,7 +138,7 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
 
     @Test(timeOut = 30000)
     public void checkSystemTopic() throws PulsarAdminException {
-        final String systemTopic = "persistent://" + NAMESPACE1 + "/" + 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        final String systemTopic = "persistent://" + NAMESPACE1 + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
         final String normalTopic = "persistent://" + NAMESPACE1 + 
"/normal_topic";
         admin.topics().createPartitionedTopic(normalTopic, 3);
         TopicName systemTopicName = TopicName.get(systemTopic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d506f712767..8923d01d92b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -33,8 +33,8 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.naming.TopicName;
@@ -88,7 +88,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         SystemTopicClient.Reader reader = 
systemTopicClientForNamespace.newReader();
 
         int partitions = admin.topics().getPartitionedTopicMetadata(
-                String.format("persistent://%s/%s", ns, 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
+                String.format("persistent://%s/%s", ns, 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
         Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 
1);
         Assert.assertEquals(partitions, PARTITIONS);
         Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 78259c23b8a..3bc31bbc4fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction;
 
-import static 
org.apache.pulsar.common.events.EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -67,7 +67,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -252,7 +251,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
                 .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
                 .startMessageId(MessageId.earliest)
-                .topic(NAMESPACE1 + "/" + 
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                .topic(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT);
         Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
 
         MessageId messageId1 = producer.newMessage(tnx1).value("test").send();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 24e5c5aaf1e..32efdb51659 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -45,7 +44,6 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
@@ -82,7 +80,7 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
         
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        createTransactionCoordinatorAssign(1);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 61d69661d62..60ecf04e602 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
+import static 
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
 import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -32,7 +32,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
 import java.lang.reflect.Field;
@@ -93,8 +92,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
@@ -230,7 +229,7 @@ public class TransactionTest extends TransactionTestBase {
 
         
Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(),
                 NamespaceName.SYSTEM_NAMESPACE, 
TRANSACTION_LOG_PREFIX).toString() + 0));
-        
Assert.assertNull(topics.get(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
+        
Assert.assertNull(topics.get(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
         
Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName,
 subName)));
     }
 
@@ -347,7 +346,7 @@ public class TransactionTest extends TransactionTestBase {
         ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
                 .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
                 .startMessageId(MessageId.earliest)
-                .topic(NAMESPACE1 + "/" + 
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                .topic(NAMESPACE1 + "/" + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
         Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
 
         long waitSnapShotTime = 
getPulsarServiceList().get(0).getConfiguration()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index bdcde1be814..dc034daa061 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -50,9 +50,11 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.tests.TestRetrySupport;
@@ -123,7 +125,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
         
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 numPartitionsOfTC);
+        createTransactionCoordinatorAssign(numPartitionsOfTC);
         if (topic != null) {
             admin.tenants().createTenant(TENANT,
                     new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
@@ -144,6 +146,14 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
                 .build();
     }
 
+    protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) 
throws MetadataStoreException {
+        pulsarServiceList.get(0).getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(numPartitionsOfTC));
+    }
+
     protected void startBroker() throws Exception {
         for (int i = 0; i < brokerCount; i++) {
             conf.setClusterName(CLUSTER_NAME);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
index 43d31e7f9ff..77ac464c1d4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
@@ -26,8 +26,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.PublisherStats;
@@ -38,7 +38,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -110,7 +109,7 @@ public class TransactionBufferCloseTest extends 
TransactionTestBase {
 
     private void checkSnapshotPublisherCount(String namespace, int 
expectCount) throws PulsarAdminException {
         TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), 
NamespaceName.get(namespace),
-                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
         List<PublisherStats> publisherStatsList =
                 (List<PublisherStats>) admin.topics()
                         
.getStats(snTopicName.getPartitionedTopicName()).getPublishers();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 873509ff6bf..192353a2da0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -49,9 +48,8 @@ import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -144,7 +142,7 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()
-                        
.getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
+                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
         Transaction lowWaterMarkTxn = null;
         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
             lowWaterMarkTxn = pulsarClient.newTransaction()
@@ -249,7 +247,7 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()
-                        
.getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
+                        
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
         Transaction lowWaterMarkTxn = null;
         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
             lowWaterMarkTxn = pulsarClient.newTransaction()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 01027868e27..c95becc2b3d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -81,7 +81,7 @@ public class TransactionMetaStoreAssignmentTest extends 
TransactionTestBase {
         // close pulsar client will not init tc again
         pulsarClient.close();
 
-        
admin.topics().unload(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        
admin.topics().unload(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
 
         for (int i = 0; i < 16; i++) {
             final int f = i;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 432fc671071..54a5e07405e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -36,6 +36,7 @@ import 
org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import org.apache.pulsar.client.util.MathUtils;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -78,7 +79,7 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
     @Override
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
-            return 
pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
+            return 
pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
                 .thenCompose(partitionMeta -> {
                     List<CompletableFuture<Void>> connectFutureList = new 
ArrayList<>();
                     if (LOG.isDebugEnabled()) {
@@ -116,9 +117,10 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
 
     private String getTCAssignTopicName(int partition) {
         if (partition >= 0) {
-            return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + 
TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+            return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()
+                    + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
         } else {
-            return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString();
+            return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString();
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
similarity index 52%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 7ed6308ec04..eaab8261460 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -16,17 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.events;
+package org.apache.pulsar.common.naming;
 
 import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.Set;
-import org.apache.pulsar.common.naming.TopicName;
 
 /**
- * System topic names for each {@link EventType}.
+ * Encapsulate the parsing of the completeTopicName name.
  */
-public class EventsTopicNames {
+public class SystemTopicNames {
 
     /**
      * Local topic name for the namespace events.
@@ -38,19 +37,34 @@ public class EventsTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = 
"__transaction_buffer_snapshot";
 
+
+    public static final String PENDING_ACK_STORE_SUFFIX = 
"__transaction_pending_ack";
+
+    public static final String PENDING_ACK_STORE_CURSOR_NAME = 
"__pending_ack_state";
+
     /**
      * The set of all local topic names declared above.
      */
     public static final Set<String> EVENTS_TOPIC_NAMES =
             
Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, 
TRANSACTION_BUFFER_SNAPSHOT));
 
-    public static boolean checkTopicIsEventsNames(TopicName topicName) {
+
+    public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = 
TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
+
+    public static final TopicName TRANSACTION_COORDINATOR_LOG = 
TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
+    public static final TopicName RESOURCE_USAGE_TOPIC = 
TopicName.get(TopicDomain.non_persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "resource-usage");
+
+    public static boolean isEventSystemTopic(TopicName topicName) {
         return 
EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
     }
 
-    public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName 
topicName) {
+    public static boolean isTransactionCoordinatorAssign(TopicName topicName) {
         return topicName != null && topicName.toString()
-                
.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                .startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString());
     }
 
     public static boolean isTopicPoliciesSystemTopic(String topic) {
@@ -59,4 +73,16 @@ public class EventsTopicNames {
         }
         return 
TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
     }
+
+    public static boolean isTransactionInternalName(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
+                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+                || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
+    }
+
+    public static boolean isSystemTopic(TopicName topicName) {
+        TopicName nonePartitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+        return isEventSystemTopic(nonePartitionedTopicName) || 
isTransactionInternalName(nonePartitionedTopicName);
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 97f99b19f19..5f03c60ece9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -60,12 +60,6 @@ public class TopicName implements ServiceUnitId {
                 }
             });
 
-    public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = 
TopicName.get(TopicDomain.persistent.value(),
-            NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
-
-    public static final TopicName TRANSACTION_COORDINATOR_LOG = 
TopicName.get(TopicDomain.persistent.value(),
-            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
-
     public static TopicName get(String domain, NamespaceName namespaceName, 
String topic) {
         String name = domain + "://" + namespaceName.toString() + '/' + topic;
         return TopicName.get(name);
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
index e4cc0bf23e6..1cf0d7da354 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
@@ -44,7 +44,8 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import 
org.apache.pulsar.client.api.TokenOauth2AuthenticatedProducerConsumerTest;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -146,7 +147,11 @@ public class Oauth2PerformanceTransactionTest extends 
ProducerConsumerBase {
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
         admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
 
         replacePulsarClient(PulsarClient.builder().serviceUrl(new 
URI(pulsar.getBrokerServiceUrl()).toString())
                 .statsInterval(0, TimeUnit.SECONDS)
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index 936f70bedf3..4ef3bba46c8 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -33,14 +33,14 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -74,7 +74,9 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
         admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
     }
 
     @AfterMethod(alwaysRun = true)

Reply via email to