This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9e211  [pulsar-broker] topic resources use metadata-store api (#9485)
0f9e211 is described below

commit 0f9e211a85d0e5db4b7f96a9f8e402ad814c552d
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Feb 18 10:35:41 2021 -0800

    [pulsar-broker] topic resources use metadata-store api (#9485)
    
    * [pulsar-broker] topics resources use metadata-store api
    
    [pulsar-broker] MockZK: Handle zk-children watch notification
    
    fix test
    
    fix sync function initialization
    
    fix tests
    
    fix zk-create
    
    add timeout
    
    * address comments
---
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +-
 .../apache/pulsar/broker/admin/AdminResource.java  | 126 ++++++--------
 .../pulsar/broker/admin/impl/BaseResources.java    |  22 ++-
 .../pulsar/broker/admin/impl/ClusterResources.java |  11 +-
 .../admin/impl/DynamicConfigurationResources.java  |   5 +-
 .../broker/admin/impl/NamespaceResources.java      |  30 +++-
 .../broker/admin/impl/PersistentTopicsBase.java    | 189 ++++++++-------------
 .../pulsar/broker/admin/impl/PulsarResources.java  |  12 +-
 .../pulsar/broker/admin/impl/TenantResources.java  |   4 +-
 .../pulsar/broker/service/BrokerService.java       |  13 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   4 +
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   2 +-
 .../client/impl/BrokerClientIntegrationTest.java   |  14 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |   6 +-
 .../pulsar/client/cli/PulsarClientToolTest.java    |  22 +--
 .../runtime/thread/ThreadRuntimeFactory.java       |   1 +
 .../cache/impl/JSONMetadataSerdeSimpleType.java    |   2 -
 .../metadata/impl/AbstractMetadataStore.java       |   2 +
 .../metadata/impl/LocalMemoryMetadataStore.java    |   1 +
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   3 +
 .../apache/pulsar/metadata/MetadataStoreTest.java  |   1 +
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |   6 +-
 22 files changed, 221 insertions(+), 258 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aedd965..2f0e712 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -485,7 +485,8 @@ public class PulsarService implements AutoCloseable {
             coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
 
             configurationMetadataStore = createConfigurationMetadataStore();
-            pulsarResources = new PulsarResources(localMetadataStore, 
configurationMetadataStore);
+            pulsarResources = new PulsarResources(localMetadataStore, 
configurationMetadataStore,
+                    config.getZooKeeperOperationTimeoutSeconds());
 
             orderedExecutor = OrderedExecutor.newBuilder()
                     .numThreads(config.getNumOrderedExecutorThreads())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1228351..59af6ca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -71,6 +71,9 @@ import 
org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -81,7 +84,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,14 +135,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 CreateMode.PERSISTENT, callback, null);
     }
 
-    protected boolean zkPathExists(String path) throws KeeperException, 
InterruptedException {
-        Stat stat = globalZk().exists(path, false);
-        if (null != stat) {
-            return true;
-        }
-        return false;
-    }
-
     protected void zkSync(String path) throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
         AtomicInteger rc = new 
AtomicInteger(KeeperException.Code.OK.intValue());
@@ -247,29 +241,31 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
     private CompletableFuture<Void> tryCreatePartitionAsync(final int 
partition, CompletableFuture<Void> reuseFuture) {
         CompletableFuture<Void> result = reuseFuture == null ? new 
CompletableFuture<>() : reuseFuture;
-        zkCreateOptimisticAsync(localZk(),
-                
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
-                (rc, s, o, s1) -> {
-                    if (KeeperException.Code.OK.intValue() == rc) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Topic partition {} created.", 
clientAppId(),
-                                    topicName.getPartition(partition));
-                        }
-                        result.complete(null);
-                    } else if (KeeperException.Code.NODEEXISTS.intValue() == 
rc) {
+        namespaceResources().getLocalStore()
+                
.put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new 
byte[0], Optional.of(-1L))
+                .thenAccept(r -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Topic partition {} created.", 
clientAppId(), topicName.getPartition(partition));
+                    }
+                    result.complete(null);
+                }).exceptionally(ex -> {
+                    if (ex.getCause() instanceof AlreadyExistsException) {
                         log.info("[{}] Topic partition {} is exists, doing 
nothing.", clientAppId(),
                                 topicName.getPartition(partition));
                         result.complete(null);
-                } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
-                    log.warn("[{}] Fail to create topic partition {} with 
concurrent modification, retry now.",
-                            clientAppId(), topicName.getPartition(partition));
-                    tryCreatePartitionAsync(partition, result);
-                } else {
-                    log.error("[{}] Fail to create topic partition {}", 
clientAppId(),
-                        topicName.getPartition(partition), 
KeeperException.create(KeeperException.Code.get(rc)));
-                    
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
-                }
-        });
+                    } else if (ex.getCause() instanceof BadVersionException) {
+                        log.warn("[{}] Partitioned topic {} is already 
created.", clientAppId(),
+                                topicName.getPartition(partition));
+                        // metadata-store api returns BadVersionException if 
node already exists while creating the
+                        // resource
+                        result.complete(null);
+                    } else {
+                        log.error("[{}] Fail to create topic partition {}", 
clientAppId(),
+                                topicName.getPartition(partition), 
ex.getCause());
+                        result.completeExceptionally(ex.getCause());
+                    }
+                    return null;
+                });
         return result;
     }
 
@@ -729,11 +725,11 @@ public abstract class AdminResource extends 
PulsarWebResource {
         try {
             String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
                     namespaceName.toString(), topicDomain.value());
-            List<String> topics = globalZk().getChildren(partitionedTopicPath, 
false);
+            List<String> topics = 
namespaceResources().getChildren(partitionedTopicPath);
             partitionedTopics = topics.stream()
                     .map(s -> String.format("%s://%s/%s", topicDomain.value(), 
namespaceName.toString(), decode(s)))
                     .collect(Collectors.toList());
-        } catch (KeeperException.NoNodeException e) {
+        } catch (NotFoundException e) {
             // NoNode means there are no partitioned topics in this domain for 
this namespace
         } catch (Exception e) {
             log.error("[{}] Failed to get partitioned topic list for namespace 
{}", clientAppId(),
@@ -828,49 +824,37 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
                 try {
                     String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-                    zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, 
s1) -> {
-                        if (KeeperException.Code.OK.intValue() == rc) {
-                            globalZk().sync(path, (rc2, s2, ctx) -> {
-                                if (KeeperException.Code.OK.intValue() == rc2) 
{
-                                    log.info("[{}] Successfully created 
partitioned topic {}",
+                    namespaceResources().getPartitionedTopicResources()
+                            .createAsync(path, new 
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
+                                log.info("[{}] Successfully created 
partitioned topic {}", clientAppId(), topicName);
+                                
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
+                                    log.info("[{}] Successfully created 
partitions for topic {}", clientAppId(),
+                                            topicName);
+                                    
asyncResponse.resume(Response.noContent().build());
+                                }).exceptionally(e -> {
+                                    log.error("[{}] Failed to create 
partitions for topic {}", clientAppId(),
+                                            topicName);
+                                    // The partitioned topic is created but 
there are some partitions create failed
+                                    asyncResponse.resume(new RestException(e));
+                                    return null;
+                                });
+                            }).exceptionally(ex -> {
+                                if (ex.getCause() instanceof 
AlreadyExistsException) {
+                                    log.warn("[{}] Failed to create already 
existing partitioned topic {}",
                                             clientAppId(), topicName);
-                                    
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                        log.info("[{}] Successfully created 
partitions for topic {}",
-                                                clientAppId(), topicName);
-                                        
asyncResponse.resume(Response.noContent().build());
-                                    }).exceptionally(e -> {
-                                        log.error("[{}] Failed to create 
partitions for topic {}",
-                                                clientAppId(), topicName);
-                                        // The partitioned topic is created 
but there are some partitions create failed
-                                        asyncResponse.resume(new 
RestException(e));
-                                        return null;
-                                    });
-                                } else {
-                                    log.error("[{}] Failed to create 
partitioned topic {}",
-                                            clientAppId(), topicName,
-                                            
KeeperException.create(KeeperException.Code.get(rc2)));
                                     asyncResponse.resume(
-                                            new 
RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                                            new RestException(Status.CONFLICT, 
"Partitioned topic already exists"));
+                                } else if (ex.getCause() instanceof 
BadVersionException) {
+                                    log.warn("[{}] Failed to create 
partitioned topic {}: concurrent modification",
+                                            clientAppId(), topicName);
+                                    asyncResponse.resume(new 
RestException(Status.CONFLICT, "Concurrent modification"));
+                                } else {
+                                    log.error("[{}] Failed to create 
partitioned topic {}", clientAppId(), topicName,
+                                            ex.getCause());
+                                    asyncResponse.resume(new 
RestException(ex.getCause()));
                                 }
-                            }, null);
-                        } else if (KeeperException.Code.NODEEXISTS.intValue() 
== rc) {
-                            log.warn("[{}] Failed to create already existing 
partitioned topic {}",
-                                    clientAppId(), topicName);
-                            asyncResponse.resume(new 
RestException(Status.CONFLICT,
-                                    "Partitioned topic already exists"));
-                        } else if (KeeperException.Code.BADVERSION.intValue() 
== rc) {
-                            log.warn("[{}] Failed to create partitioned topic 
{}: concurrent modification",
-                                    clientAppId(),
-                                    topicName);
-                            asyncResponse.resume(new 
RestException(Status.CONFLICT, "Concurrent modification"));
-                        } else {
-                            log.error("[{}] Failed to create partitioned topic 
{}",
-                                    clientAppId(), topicName, 
KeeperException.create(KeeperException.Code.get(rc)));
-                            asyncResponse.resume(
-                                    new 
RestException(KeeperException.create(KeeperException.Code.get(rc))));
-                        }
-                    });
+                                return null;
+                            });
                 } catch (Exception e) {
                     log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
                     resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
index 83d598c..16ec0f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import lombok.Getter;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -41,20 +42,23 @@ public class BaseResources<T> {
     private final MetadataStoreExtended store;
     @Getter
     private final MetadataCache<T> cache;
+    private int operationTimeoutSec;
 
-    public BaseResources(MetadataStoreExtended store, Class<T> clazz) {
+    public BaseResources(MetadataStoreExtended store, Class<T> clazz, int 
operationTimeoutSec) {
         this.store = store;
         this.cache = store.getMetadataCache(clazz);
+        this.operationTimeoutSec = operationTimeoutSec;
     }
 
-    public BaseResources(MetadataStoreExtended store, TypeReference<T> 
typeRef) {
+    public BaseResources(MetadataStoreExtended store, TypeReference<T> 
typeRef, int operationTimeoutSec) {
         this.store = store;
         this.cache = store.getMetadataCache(typeRef);
+        this.operationTimeoutSec = operationTimeoutSec;
     }
 
     public List<String> getChildren(String path) throws MetadataStoreException 
{
         try {
-            return getChildrenAsync(path).get();
+            return getChildrenAsync(path).get(operationTimeoutSec, 
TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -69,7 +73,7 @@ public class BaseResources<T> {
 
     public Optional<T> get(String path) throws MetadataStoreException {
         try {
-            return getAsync(path).get();
+            return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -84,7 +88,7 @@ public class BaseResources<T> {
 
     public void set(String path, Function<T, T> modifyFunction) throws 
MetadataStoreException {
         try {
-            setAsync(path, modifyFunction).get();
+            setAsync(path, modifyFunction).get(operationTimeoutSec, 
TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -99,7 +103,7 @@ public class BaseResources<T> {
 
     public void setWithCreate(String path, Function<Optional<T>, T> 
createFunction) throws MetadataStoreException {
         try {
-            setWithCreateAsync(path, createFunction).get();
+            setWithCreateAsync(path, createFunction).get(operationTimeoutSec, 
TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -114,7 +118,7 @@ public class BaseResources<T> {
 
     public void create(String path, T data) throws MetadataStoreException {
         try {
-            createAsync(path, data).get();
+            createAsync(path, data).get(operationTimeoutSec, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -129,7 +133,7 @@ public class BaseResources<T> {
 
     public void delete(String path) throws MetadataStoreException {
         try {
-            deleteAsync(path).get();
+            deleteAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
@@ -144,7 +148,7 @@ public class BaseResources<T> {
 
     public boolean exists(String path) throws MetadataStoreException {
         try {
-            return existsAsync(path).get();
+            return existsAsync(path).get(operationTimeoutSec, 
TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             throw (e.getCause() instanceof MetadataStoreException) ? 
(MetadataStoreException) e.getCause()
                     : new MetadataStoreException(e.getCause());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
index d580f2e..296d709 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
@@ -32,9 +32,9 @@ public class ClusterResources extends 
BaseResources<ClusterData> {
     @Getter
     private FailureDomainResources failureDomainResources;
 
-    public ClusterResources(MetadataStoreExtended store) {
-        super(store, ClusterData.class);
-        this.failureDomainResources = new FailureDomainResources(store, 
FailureDomain.class);
+    public ClusterResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
+        super(store, ClusterData.class, operationTimeoutSec);
+        this.failureDomainResources = new FailureDomainResources(store, 
FailureDomain.class, operationTimeoutSec);
     }
 
     public Set<String> list() throws MetadataStoreException {
@@ -44,8 +44,9 @@ public class ClusterResources extends 
BaseResources<ClusterData> {
     public static class FailureDomainResources extends 
BaseResources<FailureDomain> {
         public static final String FAILURE_DOMAIN = "failureDomain";
 
-        public FailureDomainResources(MetadataStoreExtended store, 
Class<FailureDomain> clazz) {
-            super(store, clazz);
+        public FailureDomainResources(MetadataStoreExtended store, 
Class<FailureDomain> clazz,
+                int operationTimeoutSec) {
+            super(store, clazz, operationTimeoutSec);
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
index 99c8d3c..4e64636 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
@@ -24,8 +24,9 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 public class DynamicConfigurationResources extends BaseResources<Map<String, 
String>> {
 
-    public DynamicConfigurationResources(MetadataStoreExtended store) {
-        super(store, new TypeReference<Map<String, String>>(){});
+    public DynamicConfigurationResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
+        super(store, new TypeReference<Map<String, String>>() {
+        }, operationTimeoutSec);
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
index 4869867..b123829 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import java.util.Map;
 import java.util.Optional;
 import lombok.Getter;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -33,17 +34,24 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 public class NamespaceResources extends BaseResources<Policies> {
     private IsolationPolicyResources isolationPolicies;
     private LocalPoliciesResources localPolicies;
+    private PartitionedTopicResources partitionedTopicResources;
+    private MetadataStoreExtended localStore;
+    private MetadataStoreExtended configurationStore;
 
-    public NamespaceResources(MetadataStoreExtended localStore, 
MetadataStoreExtended configurationStore) {
-        super(configurationStore, Policies.class);
-        isolationPolicies = new IsolationPolicyResources(configurationStore);
-        localPolicies = new LocalPoliciesResources(localStore);
+    public NamespaceResources(MetadataStoreExtended localStore, 
MetadataStoreExtended configurationStore,
+            int operationTimeoutSec) {
+        super(configurationStore, Policies.class, operationTimeoutSec);
+        this.localStore = localStore;
+        this.configurationStore = configurationStore;
+        isolationPolicies = new IsolationPolicyResources(configurationStore, 
operationTimeoutSec);
+        localPolicies = new LocalPoliciesResources(localStore, 
operationTimeoutSec);
+        partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec);
     }
 
     public static class IsolationPolicyResources extends 
BaseResources<Map<String, NamespaceIsolationData>> {
-        public IsolationPolicyResources(MetadataStoreExtended store) {
+        public IsolationPolicyResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
             super(store, new TypeReference<Map<String, 
NamespaceIsolationData>>() {
-            });
+            }, operationTimeoutSec);
         }
 
         public Optional<NamespaceIsolationPolicies> getPolicies(String path) 
throws MetadataStoreException {
@@ -53,8 +61,14 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     }
 
     public static class LocalPoliciesResources extends 
BaseResources<LocalPolicies> {
-        public LocalPoliciesResources(MetadataStoreExtended 
configurationStore) {
-            super(configurationStore, LocalPolicies.class);
+        public LocalPoliciesResources(MetadataStoreExtended 
configurationStore, int operationTimeoutSec) {
+            super(configurationStore, LocalPolicies.class, 
operationTimeoutSec);
+        }
+    }
+
+    public static class PartitionedTopicResources extends 
BaseResources<PartitionedTopicMetadata> {
+        public PartitionedTopicResources(MetadataStoreExtended 
configurationStore, int operationTimeoutSec) {
+            super(configurationStore, PartitionedTopicMetadata.class, 
operationTimeoutSec);
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c8b2026..bbf7c87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -121,8 +121,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,10 +138,11 @@ public class PersistentTopicsBase extends AdminResource {
 
         // Validate that namespace exists, throws 404 if it doesn't exist
         try {
-            policiesCache().get(path(POLICIES, namespaceName.toString()));
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to get topic list {}: Namespace does not 
exist", clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+            if (!namespaceResources().exists(path(POLICIES, 
namespaceName.toString()))) {
+                throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+            }
+        } catch (RestException re) {
+            throw re;
         } catch (Exception e) {
             log.error("[{}] Failed to get topic list {}", clientAppId(), 
namespaceName, e);
             throw new RestException(e);
@@ -153,12 +152,12 @@ public class PersistentTopicsBase extends AdminResource {
 
         try {
             String path = String.format("/managed-ledgers/%s/%s", 
namespaceName.toString(), domain());
-            for (String topic : managedLedgerListCache().get(path)) {
+            for (String topic : 
namespaceResources().getLocalPolicies().getChildren(path)) {
                 if (domain().equals(TopicDomain.persistent.toString())) {
                     topics.add(TopicName.get(domain(), namespaceName, 
decode(topic)).toString());
                 }
             }
-        } catch (KeeperException.NoNodeException e) {
+        } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
             // NoNode means there are no topics in this domain for this 
namespace
         } catch (Exception e) {
             log.error("[{}] Failed to get topics list for namespace {}", 
clientAppId(), namespaceName, e);
@@ -171,14 +170,15 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected List<String> internalGetPartitionedTopicList() {
         validateAdminAccessForTenant(namespaceName.getTenant());
-
         // Validate that namespace exists, throws 404 if it doesn't exist
         try {
-            policiesCache().get(path(POLICIES, namespaceName.toString()));
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to get partitioned topic list {}: Namespace 
does not exist", clientAppId(),
-                    namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+            if (!namespaceResources().exists(path(POLICIES, 
namespaceName.toString()))) {
+                log.warn("[{}] Failed to get partitioned topic list {}: 
Namespace does not exist", clientAppId(),
+                        namespaceName);
+                throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+            }
+        } catch (RestException e) {
+            throw e;
         } catch (Exception e) {
             log.error("[{}] Failed to get partitioned topic list for namespace 
{}", clientAppId(), namespaceName, e);
             throw new RestException(e);
@@ -193,7 +193,7 @@ public class PersistentTopicsBase extends AdminResource {
         String topicUri = topicName.toString();
 
         try {
-            Policies policies = policiesCache().get(path(POLICIES, 
namespaceName.toString()))
+            Policies policies = namespaceResources().get(path(POLICIES, 
namespaceName.toString()))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
 
             Map<String, Set<AuthAction>> permissions = Maps.newTreeMap();
@@ -319,34 +319,19 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void grantPermissions(String topicUri, String role, 
Set<AuthAction> actions) {
         try {
-            Stat nodeStat = new Stat();
-            byte[] content = globalZk().getData(path(POLICIES, 
namespaceName.toString()), null, nodeStat);
-            Policies policies = jsonMapper().readValue(content, 
Policies.class);
-
-            if 
(!policies.auth_policies.destination_auth.containsKey(topicUri)) {
-                policies.auth_policies.destination_auth.put(topicUri, new 
TreeMap<String, Set<AuthAction>>());
-            }
-
-            policies.auth_policies.destination_auth.get(topicUri).put(role, 
actions);
-
-            // Write the new policies to zookeeper
-            globalZk().setData(path(POLICIES, namespaceName.toString()), 
jsonMapper().writeValueAsBytes(policies),
-                    nodeStat.getVersion());
-
-            // invalidate the local cache to force update
-            policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
+            namespaceResources().set(path(POLICIES, namespaceName.toString()), 
(policies) -> {
+                if 
(!policies.auth_policies.destination_auth.containsKey(topicUri)) {
+                    policies.auth_policies.destination_auth.put(topicUri, new 
TreeMap<String, Set<AuthAction>>());
+                }
 
+                
policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
+                return policies;
+            });
             log.info("[{}] Successfully granted access for role {}: {} - topic 
{}", clientAppId(), role, actions,
                     topicUri);
-
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to grant permissions on topic {}: Namespace 
does not exist", clientAppId(),
-                    topicUri);
+        } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+            log.warn("[{}] Failed to grant permissions on topic {}: Namespace 
does not exist", clientAppId(), topicUri);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
-        } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to grant permissions on topic {}: concurrent 
modification", clientAppId(),
-                    topicUri);
-            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
         } catch (Exception e) {
             log.error("[{}] Failed to grant permissions for topic {}", 
clientAppId(), topicUri, e);
             throw new RestException(e);
@@ -385,45 +370,28 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private void revokePermissions(String topicUri, String role) {
-        Stat nodeStat = new Stat();
         Policies policies;
-
         try {
-            byte[] content = globalZk().getData(path(POLICIES, 
namespaceName.toString()), null, nodeStat);
-            policies = jsonMapper().readValue(content, Policies.class);
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to revoke permissions on topic {}: Namespace 
does not exist", clientAppId(),
-                    topicUri);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
-        } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to revoke permissions on topic {}: 
concurrent modification", clientAppId(),
-                    topicUri);
-            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+            policies = namespaceResources().get(path(POLICIES, 
namespaceName.toString()))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
         } catch (Exception e) {
             log.error("[{}] Failed to revoke permissions for topic {}", 
clientAppId(), topicUri, e);
             throw new RestException(e);
         }
-
         if (!policies.auth_policies.destination_auth.containsKey(topicUri)
                 || 
!policies.auth_policies.destination_auth.get(topicUri).containsKey(role)) {
-            log.warn("[{}] Failed to revoke permission from role {} on topic: 
Not set at topic level {}",
-                    clientAppId(), role, topicUri);
+            log.warn("[{}] Failed to revoke permission from role {} on topic: 
Not set at topic level {}", clientAppId(),
+                    role, topicUri);
             throw new RestException(Status.PRECONDITION_FAILED, "Permissions 
are not set at the topic level");
         }
-
-        policies.auth_policies.destination_auth.get(topicUri).remove(role);
-
         try {
             // Write the new policies to zookeeper
             String namespacePath = path(POLICIES, namespaceName.toString());
-            globalZk().setData(namespacePath, 
jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
-
-            // invalidate the local cache to force update
-            policiesCache().invalidate(namespacePath);
-            globalZkCache().invalidate(namespacePath);
-
-            log.info("[{}] Successfully revoke access for role {} - topic {}", 
clientAppId(), role,
-                    topicUri);
+            namespaceResources().set(namespacePath, (p) -> {
+                p.auth_policies.destination_auth.get(topicUri).remove(role);
+                return p;
+            });
+            log.info("[{}] Successfully revoke access for role {} - topic {}", 
clientAppId(), role, topicUri);
         } catch (Exception e) {
             log.error("[{}] Failed to revoke permissions for topic {}", 
clientAppId(), topicUri, e);
             throw new RestException(e);
@@ -528,19 +496,15 @@ public class PersistentTopicsBase extends AdminResource {
                 final String path = 
ZkAdminPaths.partitionedTopicPath(topicName);
                 updatePartitionInOtherCluster(numPartitions, 
clusters).thenAccept((res) -> {
                     try {
-                        byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-                        globalZk().setData(path, data, -1, (rc, path1, ctx, 
stat) -> {
-                            if (rc == KeeperException.Code.OK.intValue()) {
-                                updatePartition.complete(null);
-                            } else {
-                                
updatePartition.completeExceptionally(KeeperException
-                                        .create(KeeperException.Code.get(rc), 
"failed to create update partitions"));
-                            }
-                        }, null);
+                        
namespaceResources().getPartitionedTopicResources().setAsync(path, (p) -> {
+                            return new PartitionedTopicMetadata(numPartitions);
+                        }).thenAccept(r -> 
updatePartition.complete(null)).exceptionally(ex -> {
+                            
updatePartition.completeExceptionally(ex.getCause());
+                            return null;
+                        });
                     } catch (Exception e) {
                         updatePartition.completeExceptionally(e);
                     }
-
                 }).exceptionally(ex -> {
                     updatePartition.completeExceptionally(ex);
                     return null;
@@ -737,42 +701,33 @@ public class PersistentTopicsBase extends AdminResource {
                     return;
                 }
             }
-
             // Only tries to delete the znode for partitioned topic when all 
its partitions are successfully deleted
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
-
-            globalZk().delete(path, -1, (rc, s, o) -> {
-                if (KeeperException.Code.OK.intValue() == rc) {
-                    try {
-                        globalZkCache().invalidate(path);
-                        globalZk().sync(path, (rc2, s2, ctx) -> {
-                            if (KeeperException.Code.OK.intValue() == rc2) {
-                                log.info("[{}] Deleted partitioned topic {}", 
clientAppId(), topicName);
-                                
asyncResponse.resume(Response.noContent().build());
-                            } else {
-                                log.error("[{}] Failed to delete partitioned 
topic {}", clientAppId(),
-                                        topicName, 
KeeperException.create(KeeperException.Code.get(rc2)));
-                                asyncResponse.resume(new RestException(
-                                        
KeeperException.create(KeeperException.Code.get(rc2))));
-                            }
-                        }, null);
-                    } catch (Exception e) {
-                        log.error("Failed to delete partitioned topic.", e);
-                        asyncResponse.resume(new RestException(e));
+            try {
+                
namespaceResources().getPartitionedTopicResources().deleteAsync(path).thenAccept(r2
 -> {
+                    log.info("[{}] Deleted partitioned topic {}", 
clientAppId(), topicName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex1 -> {
+                    log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(), topicName, ex1.getCause());
+                    if (ex1.getCause()
+                            instanceof 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new RestException(
+                                new RestException(Status.NOT_FOUND, 
"Partitioned topic does not exist")));
+                    } else if (ex1
+                            .getCause()
+                            instanceof 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException) {
+                        asyncResponse.resume(
+                                new RestException(new 
RestException(Status.CONFLICT, "Concurrent modification")));
+                    } else {
+                        asyncResponse.resume(new 
RestException((ex1.getCause())));
                     }
-                } else if (KeeperException.Code.NONODE.intValue() == rc) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Partitioned topic does not exist"));
-                } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
-                    log.warn("[{}] Failed to delete partitioned topic {}: 
concurrent modification", clientAppId(),
-                            topicName);
-                    asyncResponse.resume(new RestException(Status.CONFLICT, 
"Concurrent modification"));
-                } else {
-                    log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(),
-                            topicName, 
KeeperException.create(KeeperException.Code.get(rc)));
-                    asyncResponse.resume(new 
RestException(KeeperException.create(KeeperException.Code.get(rc))));
-                }
-            }, null);
+                    return null;
+                });
+            } catch (Exception e1) {
+                log.error("[{}] Failed to delete partitioned topic {}", 
clientAppId(), topicName, e1);
+                asyncResponse.resume(new RestException(e1));
+            }
         });
     }
 
@@ -1327,7 +1282,7 @@ public class PersistentTopicsBase extends AdminResource {
                 if (perPartition && stats.partitions.isEmpty()) {
                     String path = ZkAdminPaths.partitionedTopicPath(topicName);
                     try {
-                        boolean zkPathExists = zkPathExists(path);
+                        boolean zkPathExists = 
namespaceResources().getPartitionedTopicResources().exists(path);
                         if (zkPathExists) {
                             stats.partitions.put(topicName.toString(), new 
TopicStats());
                         } else {
@@ -1336,7 +1291,7 @@ public class PersistentTopicsBase extends AdminResource {
                                             "Internal topics have not been 
generated yet"));
                             return null;
                         }
-                    } catch (KeeperException | InterruptedException e) {
+                    } catch (Exception e) {
                         asyncResponse.resume(new RestException(e));
                         return null;
                     }
@@ -2513,8 +2468,8 @@ public class PersistentTopicsBase extends AdminResource {
         // Validate that namespace exists, throw 404 if it doesn't exist
         // note that we do not want to load the topic and hence skip 
validateAdminOperationOnTopic()
         try {
-            policiesCache().get(path(POLICIES, namespaceName.toString()));
-        } catch (KeeperException.NoNodeException e) {
+            namespaceResources().get(path(POLICIES, namespaceName.toString()));
+        } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
             log.warn("[{}] Failed to get topic backlog {}: Namespace does not 
exist", clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
         } catch (Exception e) {
@@ -3433,15 +3388,9 @@ public class PersistentTopicsBase extends AdminResource {
         CompletableFuture<Void> updatePartition = new CompletableFuture<>();
         createSubscriptions(topicName, numPartitions).thenAccept(res -> {
             try {
-                byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
-                globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
-                    if (rc == KeeperException.Code.OK.intValue()) {
-                        updatePartition.complete(null);
-                    } else {
-                        
updatePartition.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
-                                "failed to create update partitions"));
-                    }
-                }, null);
+                namespaceResources().getPartitionedTopicResources().set(path,
+                        p -> new PartitionedTopicMetadata(numPartitions));
+                updatePartition.complete(null);
             } catch (Exception e) {
                 updatePartition.completeExceptionally(e);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
index daaa313..2aff632 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
@@ -30,10 +30,12 @@ public class PulsarResources {
     private NamespaceResources namespaceResources;
     private DynamicConfigurationResources dynamicConfigResources;
 
-    public PulsarResources(MetadataStoreExtended localMetadataStore, 
MetadataStoreExtended configurationMetadataStore) {
-        tenatResources = new TenantResources(configurationMetadataStore);
-        clusterResources = new ClusterResources(configurationMetadataStore);
-        dynamicConfigResources = new 
DynamicConfigurationResources(localMetadataStore);
-        namespaceResources = new NamespaceResources(localMetadataStore, 
configurationMetadataStore);
+    public PulsarResources(MetadataStoreExtended localMetadataStore, 
MetadataStoreExtended configurationMetadataStore,
+            int operationTimeoutSec) {
+        tenatResources = new TenantResources(configurationMetadataStore, 
operationTimeoutSec);
+        clusterResources = new ClusterResources(configurationMetadataStore, 
operationTimeoutSec);
+        dynamicConfigResources = new 
DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
+        namespaceResources = new NamespaceResources(localMetadataStore, 
configurationMetadataStore,
+                operationTimeoutSec);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
index 1a4fc38..3b3e21a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 public class TenantResources extends BaseResources<TenantInfo> {
-    public TenantResources(MetadataStoreExtended store) {
-        super(store, TenantInfo.class);
+    public TenantResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
+        super(store, TenantInfo.class, operationTimeoutSec);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index cd0a56b..b016274 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2229,13 +2229,12 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     }
 
     public CompletableFuture<PartitionedTopicMetadata> 
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
-        // gets the number of partitions from the zk cache
-        return 
pulsar.getGlobalZkCache().getDataAsync(partitionedTopicPath(topicName), (key, 
content) -> {
-            return ObjectMapperFactory.getThreadLocal().readValue(content, 
PartitionedTopicMetadata.class);
-        }).thenApply(metadata -> {
-            // if the partitioned topic is not found in zk, then the topic is 
not partitioned
-            return metadata.orElseGet(() -> new PartitionedTopicMetadata());
-        });
+        // gets the number of partitions from the configuration cache
+        return 
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .getAsync(partitionedTopicPath(topicName)).thenApply(metadata 
-> {
+                    // if the partitioned topic is not found in zk, then the 
topic is not partitioned
+                    return metadata.orElseGet(() -> new 
PartitionedTopicMetadata());
+                });
     }
 
     private static String partitionedTopicPath(TopicName topicName) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index ee157e7..d21d43c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
@@ -755,6 +756,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists
                 .newArrayList(String.format("persistent://%s/%s/%s/%s", 
property, cluster, namespace, topic)));
 
+        TopicName topicName = TopicName.get("persistent", property, cluster, 
namespace, topic);
+        assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, 
true, false).partitions, 5);
+
         CountDownLatch notificationLatch = new CountDownLatch(2);
         configurationCache.policiesCache().registerListener((path, data, stat) 
-> {
             notificationLatch.countDown();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 3764a6a..7a5f5a1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -599,7 +599,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test(invocationCount = 1)
+    @Test
     public void namespaces() throws PulsarAdminException, 
PulsarServerException, Exception {
         admin.clusters().createCluster("usw", new ClusterData());
         TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 2838293..0d8f7ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -65,7 +65,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.admin.impl.BaseResources;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.Topic;
@@ -97,7 +97,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -679,13 +678,9 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         ClientCnx cnx = producer.cnx();
         assertTrue(cnx.channel().isActive());
 
-        // Need broker to throw InternalServerError. so, make global-zk 
unavailable
-        Field globalZkCacheField = 
PulsarService.class.getDeclaredField("globalZkCache");
-        globalZkCacheField.setAccessible(true);
-        GlobalZooKeeperCache oldZkCache = (GlobalZooKeeperCache) 
globalZkCacheField.get(pulsar);
-        globalZkCacheField.set(pulsar, null);
-
-        oldZkCache.close();
+        Field cacheField = BaseResources.class.getDeclaredField("cache");
+        cacheField.setAccessible(true);
+        
cacheField.set(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(),
 null);
 
         try {
             pulsarClient.newProducer().topic(topicName).create();
@@ -935,5 +930,4 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
     private static final class TestMessageObject{
         private String value;
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index e108535..ecba1ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -832,8 +832,8 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 6;
 
-        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
-        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        final String topicName1 = "persistent://my-property/my-ns/topic-1-" + 
key;
+        final String topicName2 = "persistent://my-property/my-ns/topic-2-" + 
key;
         List<String> topicNames = Lists.newArrayList(topicName1, topicName2);
 
         TenantInfo tenantInfo = createDefaultTenantInfo();
@@ -892,7 +892,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = testTimeout)
     public void 
testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws 
Exception {
-        final String topicName = 
"persistent://prop/use/ns-abc/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
+        final String topicName = 
"persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
         final String subName = "failover-test";
         TenantInfo tenantInfo = createDefaultTenantInfo();
         admin.tenants().createTenant("prop", tenantInfo);
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 5290031..dde7875 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.cli;
 
+import static org.testng.Assert.assertEquals;
+
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -30,19 +32,19 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class PulsarClientToolTest extends BrokerTestBase {
 
-    @BeforeClass
+    @BeforeMethod
     @Override
     public void setup() throws Exception {
         super.internalSetup();
     }
 
-    @AfterClass
+    @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
@@ -127,17 +129,15 @@ public class PulsarClientToolTest extends BrokerTestBase {
         });
 
         // Make sure subscription has been created
-        while (true) {
+        retryStrategically((test) -> {
             try {
-                List<String> subscriptions = 
admin.topics().getSubscriptions(topicName);
-                if (subscriptions.size() == 1) {
-                    break;
-                }
+                return admin.topics().getSubscriptions(topicName).size() == 1;
             } catch (Exception e) {
+                return false;
             }
-            Thread.sleep(200);
-        }
+        }, 10, 500);
 
+        assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
         PulsarClientTool pulsarClientToolProducer = new 
PulsarClientTool(properties);
 
         String[] args = {"produce", "--messages", "Have a nice day", "-n", 
Integer.toString(numberOfMessages), "-r",
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index d44cf1b..35e4392 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -193,6 +193,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
         } catch (PulsarClientException e) {
             log.warn("Failed to close pulsar client when closing function 
container factory", e);
         }
+
         pulsarAdmin.close();
 
         // Shutdown instance cache
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
index d3f071b..ba1d969 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
@@ -19,9 +19,7 @@
 package org.apache.pulsar.metadata.cache.impl;
 
 import com.fasterxml.jackson.databind.JavaType;
-
 import java.io.IOException;
-
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 public class JSONMetadataSerdeSimpleType<T> implements MetadataSerde<T> {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f11ec69..76cf580 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
@@ -44,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 848d170..916aa1a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 8c6fcd9..d91856f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -296,6 +297,8 @@ public class ZKMetadataStore extends AbstractMetadataStore 
implements MetadataSt
             return new BadVersionException(ex);
         case NONODE:
             return new NotFoundException(ex);
+        case NODEEXISTS:
+            return new AlreadyExistsException(ex);
         default:
             return new MetadataStoreException(ex);
         }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index e4a2e9e..4ab45e3 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 48618b8..9689552 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -494,7 +494,11 @@ public class LocalBookkeeperEnsemble {
 
         LOG.debug("Local ZK/BK stopping ...");
         for (BookieServer bookie : bs) {
-            bookie.shutdown();
+            try {
+                bookie.shutdown();
+            } catch (Exception e) {
+                LOG.warn("failed to shutdown bookie", e);
+            }
         }
 
         zkc.close();

Reply via email to