This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9e211 [pulsar-broker] topic resources use metadata-store api (#9485)
0f9e211 is described below
commit 0f9e211a85d0e5db4b7f96a9f8e402ad814c552d
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Feb 18 10:35:41 2021 -0800
[pulsar-broker] topic resources use metadata-store api (#9485)
* [pulsar-broker] topics resources use metadata-store api
[pulsar-broker] MockZK: Handle zk-children watch notification
fix test
fix sync function initialization
fix tests
fix zk-create
add timeout
* address comments
---
.../org/apache/pulsar/broker/PulsarService.java | 3 +-
.../apache/pulsar/broker/admin/AdminResource.java | 126 ++++++--------
.../pulsar/broker/admin/impl/BaseResources.java | 22 ++-
.../pulsar/broker/admin/impl/ClusterResources.java | 11 +-
.../admin/impl/DynamicConfigurationResources.java | 5 +-
.../broker/admin/impl/NamespaceResources.java | 30 +++-
.../broker/admin/impl/PersistentTopicsBase.java | 189 ++++++++-------------
.../pulsar/broker/admin/impl/PulsarResources.java | 12 +-
.../pulsar/broker/admin/impl/TenantResources.java | 4 +-
.../pulsar/broker/service/BrokerService.java | 13 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 4 +
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 2 +-
.../client/impl/BrokerClientIntegrationTest.java | 14 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 6 +-
.../pulsar/client/cli/PulsarClientToolTest.java | 22 +--
.../runtime/thread/ThreadRuntimeFactory.java | 1 +
.../cache/impl/JSONMetadataSerdeSimpleType.java | 2 -
.../metadata/impl/AbstractMetadataStore.java | 2 +
.../metadata/impl/LocalMemoryMetadataStore.java | 1 +
.../pulsar/metadata/impl/ZKMetadataStore.java | 3 +
.../apache/pulsar/metadata/MetadataStoreTest.java | 1 +
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 6 +-
22 files changed, 221 insertions(+), 258 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aedd965..2f0e712 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -485,7 +485,8 @@ public class PulsarService implements AutoCloseable {
coordinationService = new
CoordinationServiceImpl(localMetadataStore);
configurationMetadataStore = createConfigurationMetadataStore();
- pulsarResources = new PulsarResources(localMetadataStore,
configurationMetadataStore);
+ pulsarResources = new PulsarResources(localMetadataStore,
configurationMetadataStore,
+ config.getZooKeeperOperationTimeoutSeconds());
orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1228351..59af6ca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -71,6 +71,9 @@ import
org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -81,7 +84,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,14 +135,6 @@ public abstract class AdminResource extends
PulsarWebResource {
CreateMode.PERSISTENT, callback, null);
}
- protected boolean zkPathExists(String path) throws KeeperException,
InterruptedException {
- Stat stat = globalZk().exists(path, false);
- if (null != stat) {
- return true;
- }
- return false;
- }
-
protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new
AtomicInteger(KeeperException.Code.OK.intValue());
@@ -247,29 +241,31 @@ public abstract class AdminResource extends
PulsarWebResource {
private CompletableFuture<Void> tryCreatePartitionAsync(final int
partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new
CompletableFuture<>() : reuseFuture;
- zkCreateOptimisticAsync(localZk(),
-
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
- (rc, s, o, s1) -> {
- if (KeeperException.Code.OK.intValue() == rc) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Topic partition {} created.",
clientAppId(),
- topicName.getPartition(partition));
- }
- result.complete(null);
- } else if (KeeperException.Code.NODEEXISTS.intValue() ==
rc) {
+ namespaceResources().getLocalStore()
+
.put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new
byte[0], Optional.of(-1L))
+ .thenAccept(r -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic partition {} created.",
clientAppId(), topicName.getPartition(partition));
+ }
+ result.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof AlreadyExistsException) {
log.info("[{}] Topic partition {} is exists, doing
nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
- } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
- log.warn("[{}] Fail to create topic partition {} with
concurrent modification, retry now.",
- clientAppId(), topicName.getPartition(partition));
- tryCreatePartitionAsync(partition, result);
- } else {
- log.error("[{}] Fail to create topic partition {}",
clientAppId(),
- topicName.getPartition(partition),
KeeperException.create(KeeperException.Code.get(rc)));
-
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
- }
- });
+ } else if (ex.getCause() instanceof BadVersionException) {
+ log.warn("[{}] Partitioned topic {} is already
created.", clientAppId(),
+ topicName.getPartition(partition));
+ // metadata-store api returns BadVersionException if
node already exists while creating the
+ // resource
+ result.complete(null);
+ } else {
+ log.error("[{}] Fail to create topic partition {}",
clientAppId(),
+ topicName.getPartition(partition),
ex.getCause());
+ result.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
return result;
}
@@ -729,11 +725,11 @@ public abstract class AdminResource extends
PulsarWebResource {
try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
- List<String> topics = globalZk().getChildren(partitionedTopicPath,
false);
+ List<String> topics =
namespaceResources().getChildren(partitionedTopicPath);
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(),
namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
- } catch (KeeperException.NoNodeException e) {
+ } catch (NotFoundException e) {
// NoNode means there are no partitioned topics in this domain for
this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace
{}", clientAppId(),
@@ -828,49 +824,37 @@ public abstract class AdminResource extends
PulsarWebResource {
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
- byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
- zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o,
s1) -> {
- if (KeeperException.Code.OK.intValue() == rc) {
- globalZk().sync(path, (rc2, s2, ctx) -> {
- if (KeeperException.Code.OK.intValue() == rc2)
{
- log.info("[{}] Successfully created
partitioned topic {}",
+ namespaceResources().getPartitionedTopicResources()
+ .createAsync(path, new
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
+ log.info("[{}] Successfully created
partitioned topic {}", clientAppId(), topicName);
+
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
+ log.info("[{}] Successfully created
partitions for topic {}", clientAppId(),
+ topicName);
+
asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to create
partitions for topic {}", clientAppId(),
+ topicName);
+ // The partitioned topic is created but
there are some partitions create failed
+ asyncResponse.resume(new RestException(e));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof
AlreadyExistsException) {
+ log.warn("[{}] Failed to create already
existing partitioned topic {}",
clientAppId(), topicName);
-
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
- log.info("[{}] Successfully created
partitions for topic {}",
- clientAppId(), topicName);
-
asyncResponse.resume(Response.noContent().build());
- }).exceptionally(e -> {
- log.error("[{}] Failed to create
partitions for topic {}",
- clientAppId(), topicName);
- // The partitioned topic is created
but there are some partitions create failed
- asyncResponse.resume(new
RestException(e));
- return null;
- });
- } else {
- log.error("[{}] Failed to create
partitioned topic {}",
- clientAppId(), topicName,
-
KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(
- new
RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+ new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
+ } else if (ex.getCause() instanceof
BadVersionException) {
+ log.warn("[{}] Failed to create
partitioned topic {}: concurrent modification",
+ clientAppId(), topicName);
+ asyncResponse.resume(new
RestException(Status.CONFLICT, "Concurrent modification"));
+ } else {
+ log.error("[{}] Failed to create
partitioned topic {}", clientAppId(), topicName,
+ ex.getCause());
+ asyncResponse.resume(new
RestException(ex.getCause()));
}
- }, null);
- } else if (KeeperException.Code.NODEEXISTS.intValue()
== rc) {
- log.warn("[{}] Failed to create already existing
partitioned topic {}",
- clientAppId(), topicName);
- asyncResponse.resume(new
RestException(Status.CONFLICT,
- "Partitioned topic already exists"));
- } else if (KeeperException.Code.BADVERSION.intValue()
== rc) {
- log.warn("[{}] Failed to create partitioned topic
{}: concurrent modification",
- clientAppId(),
- topicName);
- asyncResponse.resume(new
RestException(Status.CONFLICT, "Concurrent modification"));
- } else {
- log.error("[{}] Failed to create partitioned topic
{}",
- clientAppId(), topicName,
KeeperException.create(KeeperException.Code.get(rc)));
- asyncResponse.resume(
- new
RestException(KeeperException.create(KeeperException.Code.get(rc))));
- }
- });
+ return null;
+ });
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
index 83d598c..16ec0f8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -41,20 +42,23 @@ public class BaseResources<T> {
private final MetadataStoreExtended store;
@Getter
private final MetadataCache<T> cache;
+ private int operationTimeoutSec;
- public BaseResources(MetadataStoreExtended store, Class<T> clazz) {
+ public BaseResources(MetadataStoreExtended store, Class<T> clazz, int
operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(clazz);
+ this.operationTimeoutSec = operationTimeoutSec;
}
- public BaseResources(MetadataStoreExtended store, TypeReference<T>
typeRef) {
+ public BaseResources(MetadataStoreExtended store, TypeReference<T>
typeRef, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(typeRef);
+ this.operationTimeoutSec = operationTimeoutSec;
}
public List<String> getChildren(String path) throws MetadataStoreException
{
try {
- return getChildrenAsync(path).get();
+ return getChildrenAsync(path).get(operationTimeoutSec,
TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -69,7 +73,7 @@ public class BaseResources<T> {
public Optional<T> get(String path) throws MetadataStoreException {
try {
- return getAsync(path).get();
+ return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -84,7 +88,7 @@ public class BaseResources<T> {
public void set(String path, Function<T, T> modifyFunction) throws
MetadataStoreException {
try {
- setAsync(path, modifyFunction).get();
+ setAsync(path, modifyFunction).get(operationTimeoutSec,
TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -99,7 +103,7 @@ public class BaseResources<T> {
public void setWithCreate(String path, Function<Optional<T>, T>
createFunction) throws MetadataStoreException {
try {
- setWithCreateAsync(path, createFunction).get();
+ setWithCreateAsync(path, createFunction).get(operationTimeoutSec,
TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -114,7 +118,7 @@ public class BaseResources<T> {
public void create(String path, T data) throws MetadataStoreException {
try {
- createAsync(path, data).get();
+ createAsync(path, data).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -129,7 +133,7 @@ public class BaseResources<T> {
public void delete(String path) throws MetadataStoreException {
try {
- deleteAsync(path).get();
+ deleteAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
@@ -144,7 +148,7 @@ public class BaseResources<T> {
public boolean exists(String path) throws MetadataStoreException {
try {
- return existsAsync(path).get();
+ return existsAsync(path).get(operationTimeoutSec,
TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
index d580f2e..296d709 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
@@ -32,9 +32,9 @@ public class ClusterResources extends
BaseResources<ClusterData> {
@Getter
private FailureDomainResources failureDomainResources;
- public ClusterResources(MetadataStoreExtended store) {
- super(store, ClusterData.class);
- this.failureDomainResources = new FailureDomainResources(store,
FailureDomain.class);
+ public ClusterResources(MetadataStoreExtended store, int
operationTimeoutSec) {
+ super(store, ClusterData.class, operationTimeoutSec);
+ this.failureDomainResources = new FailureDomainResources(store,
FailureDomain.class, operationTimeoutSec);
}
public Set<String> list() throws MetadataStoreException {
@@ -44,8 +44,9 @@ public class ClusterResources extends
BaseResources<ClusterData> {
public static class FailureDomainResources extends
BaseResources<FailureDomain> {
public static final String FAILURE_DOMAIN = "failureDomain";
- public FailureDomainResources(MetadataStoreExtended store,
Class<FailureDomain> clazz) {
- super(store, clazz);
+ public FailureDomainResources(MetadataStoreExtended store,
Class<FailureDomain> clazz,
+ int operationTimeoutSec) {
+ super(store, clazz, operationTimeoutSec);
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
index 99c8d3c..4e64636 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/DynamicConfigurationResources.java
@@ -24,8 +24,9 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
public class DynamicConfigurationResources extends BaseResources<Map<String,
String>> {
- public DynamicConfigurationResources(MetadataStoreExtended store) {
- super(store, new TypeReference<Map<String, String>>(){});
+ public DynamicConfigurationResources(MetadataStoreExtended store, int
operationTimeoutSec) {
+ super(store, new TypeReference<Map<String, String>>() {
+ }, operationTimeoutSec);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
index 4869867..b123829 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
@@ -33,17 +34,24 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
public class NamespaceResources extends BaseResources<Policies> {
private IsolationPolicyResources isolationPolicies;
private LocalPoliciesResources localPolicies;
+ private PartitionedTopicResources partitionedTopicResources;
+ private MetadataStoreExtended localStore;
+ private MetadataStoreExtended configurationStore;
- public NamespaceResources(MetadataStoreExtended localStore,
MetadataStoreExtended configurationStore) {
- super(configurationStore, Policies.class);
- isolationPolicies = new IsolationPolicyResources(configurationStore);
- localPolicies = new LocalPoliciesResources(localStore);
+ public NamespaceResources(MetadataStoreExtended localStore,
MetadataStoreExtended configurationStore,
+ int operationTimeoutSec) {
+ super(configurationStore, Policies.class, operationTimeoutSec);
+ this.localStore = localStore;
+ this.configurationStore = configurationStore;
+ isolationPolicies = new IsolationPolicyResources(configurationStore,
operationTimeoutSec);
+ localPolicies = new LocalPoliciesResources(localStore,
operationTimeoutSec);
+ partitionedTopicResources = new
PartitionedTopicResources(configurationStore, operationTimeoutSec);
}
public static class IsolationPolicyResources extends
BaseResources<Map<String, NamespaceIsolationData>> {
- public IsolationPolicyResources(MetadataStoreExtended store) {
+ public IsolationPolicyResources(MetadataStoreExtended store, int
operationTimeoutSec) {
super(store, new TypeReference<Map<String,
NamespaceIsolationData>>() {
- });
+ }, operationTimeoutSec);
}
public Optional<NamespaceIsolationPolicies> getPolicies(String path)
throws MetadataStoreException {
@@ -53,8 +61,14 @@ public class NamespaceResources extends
BaseResources<Policies> {
}
public static class LocalPoliciesResources extends
BaseResources<LocalPolicies> {
- public LocalPoliciesResources(MetadataStoreExtended
configurationStore) {
- super(configurationStore, LocalPolicies.class);
+ public LocalPoliciesResources(MetadataStoreExtended
configurationStore, int operationTimeoutSec) {
+ super(configurationStore, LocalPolicies.class,
operationTimeoutSec);
+ }
+ }
+
+ public static class PartitionedTopicResources extends
BaseResources<PartitionedTopicMetadata> {
+ public PartitionedTopicResources(MetadataStoreExtended
configurationStore, int operationTimeoutSec) {
+ super(configurationStore, PartitionedTopicMetadata.class,
operationTimeoutSec);
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c8b2026..bbf7c87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -121,8 +121,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,10 +138,11 @@ public class PersistentTopicsBase extends AdminResource {
// Validate that namespace exists, throws 404 if it doesn't exist
try {
- policiesCache().get(path(POLICIES, namespaceName.toString()));
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to get topic list {}: Namespace does not
exist", clientAppId(), namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ if (!namespaceResources().exists(path(POLICIES,
namespaceName.toString()))) {
+ throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ }
+ } catch (RestException re) {
+ throw re;
} catch (Exception e) {
log.error("[{}] Failed to get topic list {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
@@ -153,12 +152,12 @@ public class PersistentTopicsBase extends AdminResource {
try {
String path = String.format("/managed-ledgers/%s/%s",
namespaceName.toString(), domain());
- for (String topic : managedLedgerListCache().get(path)) {
+ for (String topic :
namespaceResources().getLocalPolicies().getChildren(path)) {
if (domain().equals(TopicDomain.persistent.toString())) {
topics.add(TopicName.get(domain(), namespaceName,
decode(topic)).toString());
}
}
- } catch (KeeperException.NoNodeException e) {
+ } catch
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
// NoNode means there are no topics in this domain for this
namespace
} catch (Exception e) {
log.error("[{}] Failed to get topics list for namespace {}",
clientAppId(), namespaceName, e);
@@ -171,14 +170,15 @@ public class PersistentTopicsBase extends AdminResource {
protected List<String> internalGetPartitionedTopicList() {
validateAdminAccessForTenant(namespaceName.getTenant());
-
// Validate that namespace exists, throws 404 if it doesn't exist
try {
- policiesCache().get(path(POLICIES, namespaceName.toString()));
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to get partitioned topic list {}: Namespace
does not exist", clientAppId(),
- namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ if (!namespaceResources().exists(path(POLICIES,
namespaceName.toString()))) {
+ log.warn("[{}] Failed to get partitioned topic list {}:
Namespace does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ }
+ } catch (RestException e) {
+ throw e;
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace
{}", clientAppId(), namespaceName, e);
throw new RestException(e);
@@ -193,7 +193,7 @@ public class PersistentTopicsBase extends AdminResource {
String topicUri = topicName.toString();
try {
- Policies policies = policiesCache().get(path(POLICIES,
namespaceName.toString()))
+ Policies policies = namespaceResources().get(path(POLICIES,
namespaceName.toString()))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
Map<String, Set<AuthAction>> permissions = Maps.newTreeMap();
@@ -319,34 +319,19 @@ public class PersistentTopicsBase extends AdminResource {
private void grantPermissions(String topicUri, String role,
Set<AuthAction> actions) {
try {
- Stat nodeStat = new Stat();
- byte[] content = globalZk().getData(path(POLICIES,
namespaceName.toString()), null, nodeStat);
- Policies policies = jsonMapper().readValue(content,
Policies.class);
-
- if
(!policies.auth_policies.destination_auth.containsKey(topicUri)) {
- policies.auth_policies.destination_auth.put(topicUri, new
TreeMap<String, Set<AuthAction>>());
- }
-
- policies.auth_policies.destination_auth.get(topicUri).put(role,
actions);
-
- // Write the new policies to zookeeper
- globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policies),
- nodeStat.getVersion());
-
- // invalidate the local cache to force update
- policiesCache().invalidate(path(POLICIES,
namespaceName.toString()));
+ namespaceResources().set(path(POLICIES, namespaceName.toString()),
(policies) -> {
+ if
(!policies.auth_policies.destination_auth.containsKey(topicUri)) {
+ policies.auth_policies.destination_auth.put(topicUri, new
TreeMap<String, Set<AuthAction>>());
+ }
+
policies.auth_policies.destination_auth.get(topicUri).put(role, actions);
+ return policies;
+ });
log.info("[{}] Successfully granted access for role {}: {} - topic
{}", clientAppId(), role, actions,
topicUri);
-
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to grant permissions on topic {}: Namespace
does not exist", clientAppId(),
- topicUri);
+ } catch
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+ log.warn("[{}] Failed to grant permissions on topic {}: Namespace
does not exist", clientAppId(), topicUri);
throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
- } catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to grant permissions on topic {}: concurrent
modification", clientAppId(),
- topicUri);
- throw new RestException(Status.CONFLICT, "Concurrent
modification");
} catch (Exception e) {
log.error("[{}] Failed to grant permissions for topic {}",
clientAppId(), topicUri, e);
throw new RestException(e);
@@ -385,45 +370,28 @@ public class PersistentTopicsBase extends AdminResource {
}
private void revokePermissions(String topicUri, String role) {
- Stat nodeStat = new Stat();
Policies policies;
-
try {
- byte[] content = globalZk().getData(path(POLICIES,
namespaceName.toString()), null, nodeStat);
- policies = jsonMapper().readValue(content, Policies.class);
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to revoke permissions on topic {}: Namespace
does not exist", clientAppId(),
- topicUri);
- throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
- } catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to revoke permissions on topic {}:
concurrent modification", clientAppId(),
- topicUri);
- throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ policies = namespaceResources().get(path(POLICIES,
namespaceName.toString()))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}",
clientAppId(), topicUri, e);
throw new RestException(e);
}
-
if (!policies.auth_policies.destination_auth.containsKey(topicUri)
||
!policies.auth_policies.destination_auth.get(topicUri).containsKey(role)) {
- log.warn("[{}] Failed to revoke permission from role {} on topic:
Not set at topic level {}",
- clientAppId(), role, topicUri);
+ log.warn("[{}] Failed to revoke permission from role {} on topic:
Not set at topic level {}", clientAppId(),
+ role, topicUri);
throw new RestException(Status.PRECONDITION_FAILED, "Permissions
are not set at the topic level");
}
-
- policies.auth_policies.destination_auth.get(topicUri).remove(role);
-
try {
// Write the new policies to zookeeper
String namespacePath = path(POLICIES, namespaceName.toString());
- globalZk().setData(namespacePath,
jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
-
- // invalidate the local cache to force update
- policiesCache().invalidate(namespacePath);
- globalZkCache().invalidate(namespacePath);
-
- log.info("[{}] Successfully revoke access for role {} - topic {}",
clientAppId(), role,
- topicUri);
+ namespaceResources().set(namespacePath, (p) -> {
+ p.auth_policies.destination_auth.get(topicUri).remove(role);
+ return p;
+ });
+ log.info("[{}] Successfully revoke access for role {} - topic {}",
clientAppId(), role, topicUri);
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}",
clientAppId(), topicUri, e);
throw new RestException(e);
@@ -528,19 +496,15 @@ public class PersistentTopicsBase extends AdminResource {
final String path =
ZkAdminPaths.partitionedTopicPath(topicName);
updatePartitionInOtherCluster(numPartitions,
clusters).thenAccept((res) -> {
try {
- byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
- globalZk().setData(path, data, -1, (rc, path1, ctx,
stat) -> {
- if (rc == KeeperException.Code.OK.intValue()) {
- updatePartition.complete(null);
- } else {
-
updatePartition.completeExceptionally(KeeperException
- .create(KeeperException.Code.get(rc),
"failed to create update partitions"));
- }
- }, null);
+
namespaceResources().getPartitionedTopicResources().setAsync(path, (p) -> {
+ return new PartitionedTopicMetadata(numPartitions);
+ }).thenAccept(r ->
updatePartition.complete(null)).exceptionally(ex -> {
+
updatePartition.completeExceptionally(ex.getCause());
+ return null;
+ });
} catch (Exception e) {
updatePartition.completeExceptionally(e);
}
-
}).exceptionally(ex -> {
updatePartition.completeExceptionally(ex);
return null;
@@ -737,42 +701,33 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
-
// Only tries to delete the znode for partitioned topic when all
its partitions are successfully deleted
String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
-
- globalZk().delete(path, -1, (rc, s, o) -> {
- if (KeeperException.Code.OK.intValue() == rc) {
- try {
- globalZkCache().invalidate(path);
- globalZk().sync(path, (rc2, s2, ctx) -> {
- if (KeeperException.Code.OK.intValue() == rc2) {
- log.info("[{}] Deleted partitioned topic {}",
clientAppId(), topicName);
-
asyncResponse.resume(Response.noContent().build());
- } else {
- log.error("[{}] Failed to delete partitioned
topic {}", clientAppId(),
- topicName,
KeeperException.create(KeeperException.Code.get(rc2)));
- asyncResponse.resume(new RestException(
-
KeeperException.create(KeeperException.Code.get(rc2))));
- }
- }, null);
- } catch (Exception e) {
- log.error("Failed to delete partitioned topic.", e);
- asyncResponse.resume(new RestException(e));
+ try {
+
namespaceResources().getPartitionedTopicResources().deleteAsync(path).thenAccept(r2
-> {
+ log.info("[{}] Deleted partitioned topic {}",
clientAppId(), topicName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex1 -> {
+ log.error("[{}] Failed to delete partitioned topic {}",
clientAppId(), topicName, ex1.getCause());
+ if (ex1.getCause()
+ instanceof
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new RestException(
+ new RestException(Status.NOT_FOUND,
"Partitioned topic does not exist")));
+ } else if (ex1
+ .getCause()
+ instanceof
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException) {
+ asyncResponse.resume(
+ new RestException(new
RestException(Status.CONFLICT, "Concurrent modification")));
+ } else {
+ asyncResponse.resume(new
RestException((ex1.getCause())));
}
- } else if (KeeperException.Code.NONODE.intValue() == rc) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Partitioned topic does not exist"));
- } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
- log.warn("[{}] Failed to delete partitioned topic {}:
concurrent modification", clientAppId(),
- topicName);
- asyncResponse.resume(new RestException(Status.CONFLICT,
"Concurrent modification"));
- } else {
- log.error("[{}] Failed to delete partitioned topic {}",
clientAppId(),
- topicName,
KeeperException.create(KeeperException.Code.get(rc)));
- asyncResponse.resume(new
RestException(KeeperException.create(KeeperException.Code.get(rc))));
- }
- }, null);
+ return null;
+ });
+ } catch (Exception e1) {
+ log.error("[{}] Failed to delete partitioned topic {}",
clientAppId(), topicName, e1);
+ asyncResponse.resume(new RestException(e1));
+ }
});
}
@@ -1327,7 +1282,7 @@ public class PersistentTopicsBase extends AdminResource {
if (perPartition && stats.partitions.isEmpty()) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
try {
- boolean zkPathExists = zkPathExists(path);
+ boolean zkPathExists =
namespaceResources().getPartitionedTopicResources().exists(path);
if (zkPathExists) {
stats.partitions.put(topicName.toString(), new
TopicStats());
} else {
@@ -1336,7 +1291,7 @@ public class PersistentTopicsBase extends AdminResource {
"Internal topics have not been
generated yet"));
return null;
}
- } catch (KeeperException | InterruptedException e) {
+ } catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
@@ -2513,8 +2468,8 @@ public class PersistentTopicsBase extends AdminResource {
// Validate that namespace exists, throw 404 if it doesn't exist
// note that we do not want to load the topic and hence skip
validateAdminOperationOnTopic()
try {
- policiesCache().get(path(POLICIES, namespaceName.toString()));
- } catch (KeeperException.NoNodeException e) {
+ namespaceResources().get(path(POLICIES, namespaceName.toString()));
+ } catch
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
log.warn("[{}] Failed to get topic backlog {}: Namespace does not
exist", clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
} catch (Exception e) {
@@ -3433,15 +3388,9 @@ public class PersistentTopicsBase extends AdminResource {
CompletableFuture<Void> updatePartition = new CompletableFuture<>();
createSubscriptions(topicName, numPartitions).thenAccept(res -> {
try {
- byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
- globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
- if (rc == KeeperException.Code.OK.intValue()) {
- updatePartition.complete(null);
- } else {
-
updatePartition.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
- "failed to create update partitions"));
- }
- }, null);
+ namespaceResources().getPartitionedTopicResources().set(path,
+ p -> new PartitionedTopicMetadata(numPartitions));
+ updatePartition.complete(null);
} catch (Exception e) {
updatePartition.completeExceptionally(e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
index daaa313..2aff632 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
@@ -30,10 +30,12 @@ public class PulsarResources {
private NamespaceResources namespaceResources;
private DynamicConfigurationResources dynamicConfigResources;
- public PulsarResources(MetadataStoreExtended localMetadataStore,
MetadataStoreExtended configurationMetadataStore) {
- tenatResources = new TenantResources(configurationMetadataStore);
- clusterResources = new ClusterResources(configurationMetadataStore);
- dynamicConfigResources = new
DynamicConfigurationResources(localMetadataStore);
- namespaceResources = new NamespaceResources(localMetadataStore,
configurationMetadataStore);
+ public PulsarResources(MetadataStoreExtended localMetadataStore,
MetadataStoreExtended configurationMetadataStore,
+ int operationTimeoutSec) {
+ tenatResources = new TenantResources(configurationMetadataStore,
operationTimeoutSec);
+ clusterResources = new ClusterResources(configurationMetadataStore,
operationTimeoutSec);
+ dynamicConfigResources = new
DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
+ namespaceResources = new NamespaceResources(localMetadataStore,
configurationMetadataStore,
+ operationTimeoutSec);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
index 1a4fc38..3b3e21a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
public class TenantResources extends BaseResources<TenantInfo> {
- public TenantResources(MetadataStoreExtended store) {
- super(store, TenantInfo.class);
+ public TenantResources(MetadataStoreExtended store, int
operationTimeoutSec) {
+ super(store, TenantInfo.class, operationTimeoutSec);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index cd0a56b..b016274 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2229,13 +2229,12 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
public CompletableFuture<PartitionedTopicMetadata>
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
- // gets the number of partitions from the zk cache
- return
pulsar.getGlobalZkCache().getDataAsync(partitionedTopicPath(topicName), (key,
content) -> {
- return ObjectMapperFactory.getThreadLocal().readValue(content,
PartitionedTopicMetadata.class);
- }).thenApply(metadata -> {
- // if the partitioned topic is not found in zk, then the topic is
not partitioned
- return metadata.orElseGet(() -> new PartitionedTopicMetadata());
- });
+ // gets the number of partitions from the configuration cache
+ return
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .getAsync(partitionedTopicPath(topicName)).thenApply(metadata
-> {
+ // if the partitioned topic is not found in zk, then the
topic is not partitioned
+ return metadata.orElseGet(() -> new
PartitionedTopicMetadata());
+ });
}
private static String partitionedTopicPath(TopicName topicName) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index ee157e7..d21d43c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
@@ -755,6 +756,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertEquals(persistentTopics.getPartitionedTopicList(property,
cluster, namespace), Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s",
property, cluster, namespace, topic)));
+ TopicName topicName = TopicName.get("persistent", property, cluster,
namespace, topic);
+ assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName,
true, false).partitions, 5);
+
CountDownLatch notificationLatch = new CountDownLatch(2);
configurationCache.policiesCache().registerListener((path, data, stat)
-> {
notificationLatch.countDown();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 3764a6a..7a5f5a1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -599,7 +599,7 @@ public class V1_AdminApiTest extends
MockedPulsarServiceBaseTest {
}
}
- @Test(invocationCount = 1)
+ @Test
public void namespaces() throws PulsarAdminException,
PulsarServerException, Exception {
admin.clusters().createCluster("usw", new ClusterData());
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 2838293..0d8f7ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -65,7 +65,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.admin.impl.BaseResources;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.Topic;
@@ -97,7 +97,6 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -679,13 +678,9 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
- // Need broker to throw InternalServerError. so, make global-zk
unavailable
- Field globalZkCacheField =
PulsarService.class.getDeclaredField("globalZkCache");
- globalZkCacheField.setAccessible(true);
- GlobalZooKeeperCache oldZkCache = (GlobalZooKeeperCache)
globalZkCacheField.get(pulsar);
- globalZkCacheField.set(pulsar, null);
-
- oldZkCache.close();
+ Field cacheField = BaseResources.class.getDeclaredField("cache");
+ cacheField.setAccessible(true);
+
cacheField.set(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(),
null);
try {
pulsarClient.newProducer().topic(topicName).create();
@@ -935,5 +930,4 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
private static final class TestMessageObject{
private String value;
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index e108535..ecba1ce 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -832,8 +832,8 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 6;
- final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" +
key;
- final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" +
key;
+ final String topicName1 = "persistent://my-property/my-ns/topic-1-" +
key;
+ final String topicName2 = "persistent://my-property/my-ns/topic-2-" +
key;
List<String> topicNames = Lists.newArrayList(topicName1, topicName2);
TenantInfo tenantInfo = createDefaultTenantInfo();
@@ -892,7 +892,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
@Test(timeOut = testTimeout)
public void
testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws
Exception {
- final String topicName =
"persistent://prop/use/ns-abc/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
+ final String topicName =
"persistent://my-property/my-ns/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
final String subName = "failover-test";
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 5290031..dde7875 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.cli;
+import static org.testng.Assert.assertEquals;
+
import java.util.List;
import java.util.Properties;
import java.util.UUID;
@@ -30,19 +32,19 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class PulsarClientToolTest extends BrokerTestBase {
- @BeforeClass
+ @BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
}
- @AfterClass
+ @AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
@@ -127,17 +129,15 @@ public class PulsarClientToolTest extends BrokerTestBase {
});
// Make sure subscription has been created
- while (true) {
+ retryStrategically((test) -> {
try {
- List<String> subscriptions =
admin.topics().getSubscriptions(topicName);
- if (subscriptions.size() == 1) {
- break;
- }
+ return admin.topics().getSubscriptions(topicName).size() == 1;
} catch (Exception e) {
+ return false;
}
- Thread.sleep(200);
- }
+ }, 10, 500);
+ assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
PulsarClientTool pulsarClientToolProducer = new
PulsarClientTool(properties);
String[] args = {"produce", "--messages", "Have a nice day", "-n",
Integer.toString(numberOfMessages), "-r",
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index d44cf1b..35e4392 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -193,6 +193,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
} catch (PulsarClientException e) {
log.warn("Failed to close pulsar client when closing function
container factory", e);
}
+
pulsarAdmin.close();
// Shutdown instance cache
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
index d3f071b..ba1d969 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
@@ -19,9 +19,7 @@
package org.apache.pulsar.metadata.cache.impl;
import com.fasterxml.jackson.databind.JavaType;
-
import java.io.IOException;
-
import org.apache.pulsar.common.util.ObjectMapperFactory;
public class JSONMetadataSerdeSimpleType<T> implements MetadataSerde<T> {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f11ec69..76cf580 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
@@ -44,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 848d170..916aa1a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 8c6fcd9..d91856f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
@@ -296,6 +297,8 @@ public class ZKMetadataStore extends AbstractMetadataStore
implements MetadataSt
return new BadVersionException(ex);
case NONODE:
return new NotFoundException(ex);
+ case NODEEXISTS:
+ return new AlreadyExistsException(ex);
default:
return new MetadataStoreException(ex);
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index e4a2e9e..4ab45e3 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 48618b8..9689552 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -494,7 +494,11 @@ public class LocalBookkeeperEnsemble {
LOG.debug("Local ZK/BK stopping ...");
for (BookieServer bookie : bs) {
- bookie.shutdown();
+ try {
+ bookie.shutdown();
+ } catch (Exception e) {
+ LOG.warn("failed to shutdown bookie", e);
+ }
}
zkc.close();