This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0edcaa0 [pulsar-broker] cluster resources use metadata-store api
(#9338)
0edcaa0 is described below
commit 0edcaa09150521a2a7e189de43d004ed799db2ee
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Jan 28 12:20:10 2021 -0800
[pulsar-broker] cluster resources use metadata-store api (#9338)
* [pulsar-broker] Make tenant rest-api async and use metadata-store api
fix tests
fix intermittent test failure
* [pulsar-broker] cluster resources use metadata-store api
* fix test
---
.../org/apache/pulsar/broker/PulsarService.java | 18 +-
.../apache/pulsar/broker/admin/AdminResource.java | 6 +-
.../pulsar/broker/admin/impl/BaseResources.java | 152 ++++++++++
.../pulsar/broker/admin/impl/ClusterResources.java | 51 ++++
.../pulsar/broker/admin/impl/ClustersBase.java | 204 +++++---------
.../broker/admin/impl/NamespaceResources.java | 51 ++++
.../pulsar/broker/admin/impl/PulsarResources.java | 37 +++
.../pulsar/broker/admin/impl/TenantResources.java | 28 ++
.../pulsar/broker/admin/impl/TenantsBase.java | 307 ++++++++++++---------
.../pulsar/broker/web/PulsarWebResource.java | 157 ++++++++++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 8 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 238 +++++++++++-----
.../broker/auth/MockedPulsarServiceBaseTest.java | 1 +
.../OwnerShipForCurrentServerTestBase.java | 2 +-
.../pulsar/broker/service/BrokerServiceTest.java | 8 +-
.../broker/transaction/TransactionTestBase.java | 2 +-
.../apache/pulsar/broker/web/WebServiceTest.java | 23 +-
.../metadata/api/MetadataStoreException.java | 4 +
.../metadata/cache/impl/MetadataCacheImpl.java | 10 +-
.../metadata/impl/AbstractMetadataStore.java | 7 +
.../MockedZooKeeperClientFactoryImpl.java | 2 +-
21 files changed, 958 insertions(+), 358 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a017d28..3e03297 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -69,6 +69,7 @@ import
org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.PulsarResources;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -220,6 +221,8 @@ public class PulsarService implements AutoCloseable {
private MetadataStoreExtended localMetadataStore;
private CoordinationService coordinationService;
+ private MetadataStoreExtended configurationMetadataStore;
+ private PulsarResources pulsarResources;
public enum State {
Init, Started, Closed
@@ -280,6 +283,14 @@ public class PulsarService implements AutoCloseable {
new DefaultThreadFactory("zk-cache-callback"));
}
+ public MetadataStoreExtended createConfigurationMetadataStore() throws
MetadataStoreException {
+ return
MetadataStoreExtended.create(config.getConfigurationStoreServers(),
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis((int)
config.getZooKeeperSessionTimeoutMillis())
+ .allowReadOnlyOperations(false)
+ .build());
+ }
+
/**
* Close the current pulsar service. All resources are released.
*/
@@ -396,6 +407,9 @@ public class PulsarService implements AutoCloseable {
if (localMetadataStore != null) {
localMetadataStore.close();
}
+ if (configurationMetadataStore != null) {
+ configurationMetadataStore.close();
+ }
state = State.Closed;
isClosedCondition.signalAll();
@@ -467,9 +481,11 @@ public class PulsarService implements AutoCloseable {
}
localMetadataStore = createLocalMetadataStore();
-
coordinationService = new
CoordinationServiceImpl(localMetadataStore);
+ configurationMetadataStore = createConfigurationMetadataStore();
+ pulsarResources = new PulsarResources(configurationMetadataStore);
+
orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
.name("pulsar-ordered")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1b98ba1..1728c88 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -89,7 +89,7 @@ import org.slf4j.LoggerFactory;
public abstract class AdminResource extends PulsarWebResource {
private static final Logger log =
LoggerFactory.getLogger(AdminResource.class);
- private static final String POLICIES_READONLY_FLAG_PATH =
"/admin/flags/policies-readonly";
+ public static final String POLICIES_READONLY_FLAG_PATH =
"/admin/flags/policies-readonly";
public static final String PARTITIONED_TOPIC_PATH_ZNODE =
"partitioned-topics";
private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";
@@ -169,7 +169,7 @@ public abstract class AdminResource extends
PulsarWebResource {
// This is a stub method for Mockito
@Override
- protected void validateSuperUserAccess() {
+ public void validateSuperUserAccess() {
super.validateSuperUserAccess();
}
@@ -740,7 +740,7 @@ public abstract class AdminResource extends
PulsarWebResource {
protected void validateClusterExists(String cluster) {
try {
- if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+ if (!clusterResources().get(path("clusters",
cluster)).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster "
+ cluster + " does not exist.");
}
} catch (Exception e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
new file mode 100644
index 0000000..07cd9c4
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import lombok.Getter;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * Base class for all configuration resources to access configurations from
metadata-store.
+ *
+ * @param <T>
+ * type of configuration-resources.
+ */
+public class BaseResources<T> {
+
+ @Getter
+ private final MetadataStoreExtended store;
+ @Getter
+ private final MetadataCache<T> cache;
+
+ public BaseResources(MetadataStoreExtended store, Class<T> clazz) {
+ this.store = store;
+ this.cache = store.getMetadataCache(clazz);
+ }
+
+ public BaseResources(MetadataStoreExtended store, TypeReference<T>
typeRef) {
+ this.store = store;
+ this.cache = store.getMetadataCache(typeRef);
+ }
+
+ public List<String> getChildren(String path) throws MetadataStoreException
{
+ try {
+ return getChildrenAsync(path).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to get childeren of " +
path, e);
+ }
+ }
+
+ public CompletableFuture<List<String>> getChildrenAsync(String path) {
+ return cache.getChildren(path);
+ }
+
+ public Optional<T> get(String path) throws MetadataStoreException {
+ try {
+ return getAsync(path).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to get data from " +
path, e);
+ }
+ }
+
+ public CompletableFuture<Optional<T>> getAsync(String path) {
+ return cache.get(path);
+ }
+
+ public void set(String path, Function<T, T> modifyFunction) throws
MetadataStoreException {
+ try {
+ setAsync(path, modifyFunction).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to set data for " + path,
e);
+ }
+ }
+
+ public CompletableFuture<Void> setAsync(String path, Function<T, T>
modifyFunction) {
+ return cache.readModifyUpdate(path, modifyFunction);
+ }
+
+ public void create(String path, T data) throws MetadataStoreException {
+ create(path, t -> data);
+ }
+
+ public void create(String path, Function<Optional<T>, T> createFunction)
throws MetadataStoreException {
+ try {
+ createAsync(path, createFunction).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to create " + path, e);
+ }
+ }
+
+ public CompletableFuture<Void> createAsync(String path, T data) {
+ return createAsync(path, t -> data);
+ }
+
+ public CompletableFuture<Void> createAsync(String path,
Function<Optional<T>, T> createFunction) {
+ return cache.readModifyUpdateOrCreate(path, createFunction);
+ }
+
+ public void delete(String path) throws MetadataStoreException {
+ try {
+ deleteAsync(path).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to delete " + path, e);
+ }
+ }
+
+ public CompletableFuture<Void> deleteAsync(String path) {
+ return cache.delete(path);
+ }
+
+ public boolean exists(String path) throws MetadataStoreException {
+ try {
+ return existsAsync(path).get();
+ } catch (ExecutionException e) {
+ throw (e.getCause() instanceof MetadataStoreException) ?
(MetadataStoreException) e.getCause()
+ : new MetadataStoreException(e.getCause());
+ } catch (Exception e) {
+ throw new MetadataStoreException("Failed to check exist " + path,
e);
+ }
+ }
+
+ public CompletableFuture<Boolean> existsAsync(String path) {
+ return cache.exists(path);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
new file mode 100644
index 0000000..d580f2e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClusterResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Getter;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class ClusterResources extends BaseResources<ClusterData> {
+
+ public static final String CLUSTERS_ROOT = "/admin/clusters";
+ @Getter
+ private FailureDomainResources failureDomainResources;
+
+ public ClusterResources(MetadataStoreExtended store) {
+ super(store, ClusterData.class);
+ this.failureDomainResources = new FailureDomainResources(store,
FailureDomain.class);
+ }
+
+ public Set<String> list() throws MetadataStoreException {
+ return new HashSet<>(super.getChildren(CLUSTERS_ROOT));
+ }
+
+ public static class FailureDomainResources extends
BaseResources<FailureDomain> {
+ public static final String FAILURE_DOMAIN = "failureDomain";
+
+ public FailureDomainResources(MetadataStoreExtended store,
Class<FailureDomain> clazz) {
+ super(store, clazz);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index d2714ec..b483114 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.admin.impl;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static
org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
@@ -30,7 +28,6 @@ import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
-import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
@@ -51,9 +48,10 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.AdminResource;
+import
org.apache.pulsar.broker.admin.impl.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.common.naming.Constants;
@@ -66,14 +64,11 @@ import
org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ClustersBase extends AdminResource {
+public class ClustersBase extends PulsarWebResource {
@GET
@ApiOperation(
@@ -87,7 +82,7 @@ public class ClustersBase extends AdminResource {
public Set<String> getClusters() throws Exception {
try {
// Remove "global" cluster from returned list
- Set<String> clusters = clustersListCache().get().stream()
+ Set<String> clusters = clusterResources().list().stream()
.filter(cluster ->
!Constants.GLOBAL_CLUSTER.equals(cluster)).collect(Collectors.toSet());
return clusters;
} catch (Exception e) {
@@ -119,7 +114,7 @@ public class ClustersBase extends AdminResource {
validateSuperUserAccess();
try {
- return clustersCache().get(path("clusters", cluster))
+ return clusterResources().get(path("clusters", cluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster,
e);
@@ -171,11 +166,12 @@ public class ClustersBase extends AdminResource {
try {
NamedEntity.checkName(cluster);
- zkCreate(path("clusters", cluster),
jsonMapper().writeValueAsBytes(clusterData));
+ if (clusterResources().get(path("clusters", cluster)).isPresent())
{
+ log.warn("[{}] Failed to create already existing cluster {}",
clientAppId(), cluster);
+ throw new RestException(Status.CONFLICT, "Cluster already
exists");
+ }
+ clusterResources().create(path("clusters", cluster), clusterData);
log.info("[{}] Created cluster {}", clientAppId(), cluster);
- } catch (KeeperException.NodeExistsException e) {
- log.warn("[{}] Failed to create already existing cluster {}",
clientAppId(), cluster);
- throw new RestException(Status.CONFLICT, "Cluster already exists");
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create cluster with invalid name {}",
clientAppId(), cluster, e);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster name
is not valid");
@@ -222,23 +218,12 @@ public class ClustersBase extends AdminResource {
validatePoliciesReadOnlyAccess();
try {
- String clusterPath = path("clusters", cluster);
- Stat nodeStat = new Stat();
- byte[] content = globalZk().getData(clusterPath, null, nodeStat);
- ClusterData currentClusterData = null;
- if (content.length > 0) {
- currentClusterData = jsonMapper().readValue(content,
ClusterData.class);
- // only update cluster-url-data and not overwrite other
metadata such as peerClusterNames
- currentClusterData.update(clusterData);
- } else {
- currentClusterData = clusterData;
- }
- // Write back the new updated ClusterData into zookeeper
- globalZk().setData(clusterPath,
jsonMapper().writeValueAsBytes(currentClusterData),
- nodeStat.getVersion());
- globalZkCache().invalidate(clusterPath);
+ clusterResources().set(path("clusters", cluster), old -> {
+ old.update(clusterData);
+ return old;
+ });
log.info("[{}] Updated cluster {}", clientAppId(), cluster);
- } catch (KeeperException.NoNodeException e) {
+ } catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist",
clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not
exist");
} catch (Exception e) {
@@ -292,7 +277,7 @@ public class ClustersBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of
peer-list");
}
- clustersCache().get(path("clusters", peerCluster))
+ clusterResources().get(path("clusters", peerCluster))
.orElseThrow(() -> new
RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not
exist"));
} catch (RestException e) {
@@ -308,16 +293,12 @@ public class ClustersBase extends AdminResource {
}
try {
- String clusterPath = path("clusters", cluster);
- Stat nodeStat = new Stat();
- byte[] content = globalZk().getData(clusterPath, null, nodeStat);
- ClusterData currentClusterData = jsonMapper().readValue(content,
ClusterData.class);
- currentClusterData.setPeerClusterNames(peerClusterNames);
- // Write back the new updated ClusterData into zookeeper
- globalZk().setData(clusterPath,
jsonMapper().writeValueAsBytes(currentClusterData), nodeStat.getVersion());
- globalZkCache().invalidate(clusterPath);
+ clusterResources().set(path("clusters", cluster), old -> {
+ old.setPeerClusterNames(peerClusterNames);
+ return old;
+ });
log.info("[{}] Successfully added peer-cluster {} for {}",
clientAppId(), peerClusterNames, cluster);
- } catch (KeeperException.NoNodeException e) {
+ } catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist",
clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not
exist");
} catch (Exception e) {
@@ -347,15 +328,10 @@ public class ClustersBase extends AdminResource {
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
-
try {
- String clusterPath = path("clusters", cluster);
- byte[] content = globalZk().getData(clusterPath, null, null);
- ClusterData clusterData = jsonMapper().readValue(content,
ClusterData.class);
+ ClusterData clusterData = clusterResources().get(path("clusters",
cluster))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster does not exist"));
return clusterData.getPeerClusterNames();
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to get cluster {}: Does not exist",
clientAppId(), cluster);
- throw new RestException(Status.NOT_FOUND, "Cluster does not
exist");
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster,
e);
throw new RestException(e);
@@ -388,12 +364,12 @@ public class ClustersBase extends AdminResource {
// Check that the cluster is not used by any property (eg: no
namespaces provisioned there)
boolean isClusterUsed = false;
try {
- for (String property : globalZk().getChildren(path(POLICIES),
false)) {
- if (globalZk().exists(path(POLICIES, property, cluster),
false) == null) {
+ for (String property :
tenantResources().getChildren(path(POLICIES))) {
+ if (!clusterResources().exists(path(POLICIES, property,
cluster))) {
continue;
}
- if (!globalZk().getChildren(path(POLICIES, property, cluster),
false).isEmpty()) {
+ if (!clusterResources().getChildren(path(POLICIES, property,
cluster)).isEmpty()) {
// We found a property that has at least a namespace in
this cluster
isClusterUsed = true;
break;
@@ -402,13 +378,12 @@ public class ClustersBase extends AdminResource {
// check the namespaceIsolationPolicies associated with the cluster
String path = path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES);
- Optional<NamespaceIsolationPolicies> nsIsolationPolicies =
namespaceIsolationPoliciesCache().get(path);
+ Optional<NamespaceIsolationPolicies> nsIsolationPolicies =
namespaceIsolationPolicies().getPolicies(path);
// Need to delete the isolation policies if present
if (nsIsolationPolicies.isPresent()) {
if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
- globalZk().delete(path, -1);
- namespaceIsolationPoliciesCache().invalidate(path);
+ namespaceIsolationPolicies().delete(path);
} else {
isClusterUsed = true;
}
@@ -426,10 +401,9 @@ public class ClustersBase extends AdminResource {
try {
String clusterPath = path("clusters", cluster);
deleteFailureDomain(clusterPath);
- globalZk().delete(clusterPath, -1);
- globalZkCache().invalidate(clusterPath);
+ clusterResources().delete(clusterPath);
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
- } catch (KeeperException.NoNodeException e) {
+ } catch (NotFoundException e) {
log.warn("[{}] Failed to delete cluster {} - Does not exist",
clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not
exist");
} catch (Exception e) {
@@ -441,16 +415,14 @@ public class ClustersBase extends AdminResource {
private void deleteFailureDomain(String clusterPath) {
try {
String failureDomain = joinPath(clusterPath,
ConfigurationCacheService.FAILURE_DOMAIN);
- if (globalZk().exists(failureDomain, false) == null) {
+ if (!clusterResources().exists(failureDomain)) {
return;
}
- for (String domain : globalZk().getChildren(failureDomain, false))
{
+ for (String domain :
clusterResources().getChildren(failureDomain)) {
String domainPath = joinPath(failureDomain, domain);
- globalZk().delete(domainPath, -1);
+ clusterResources().delete(domainPath);
}
- globalZk().delete(failureDomain, -1);
- failureDomainCache().clear();
- failureDomainListCache().clear();
+ clusterResources().delete(failureDomain);
} catch (Exception e) {
log.warn("Failed to delete failure-domain under cluster {}",
clusterPath);
throw new RestException(e);
@@ -478,13 +450,13 @@ public class ClustersBase extends AdminResource {
@PathParam("cluster") String cluster
) throws Exception {
validateSuperUserAccess();
- if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+ if (!clusterResources().exists(path("clusters", cluster))) {
throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + "
does not exist.");
}
try {
- NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPoliciesCache()
- .get(path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES))
+ NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPolicies()
+ .getPolicies(path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " +
cluster + " does not exist"));
// construct the response to Namespace isolation data map
@@ -524,8 +496,8 @@ public class ClustersBase extends AdminResource {
validateClusterExists(cluster);
try {
- NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPoliciesCache()
- .get(path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES))
+ NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPolicies()
+ .getPolicies(path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " +
cluster + " does not exist"));
// construct the response to Namespace isolation data map
@@ -577,8 +549,8 @@ public class ClustersBase extends AdminResource {
throw new RestException(e);
}
try {
- Optional<NamespaceIsolationPolicies> nsPoliciesResult =
namespaceIsolationPoliciesCache()
- .get(nsIsolationPoliciesPath);
+ Optional<NamespaceIsolationPolicies> nsPoliciesResult =
namespaceIsolationPolicies()
+ .getPolicies(nsIsolationPoliciesPath);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation
policies not found for " + cluster);
}
@@ -639,8 +611,8 @@ public class ClustersBase extends AdminResource {
final String nsIsolationPoliciesPath = AdminResource.path("clusters",
cluster, NAMESPACE_ISOLATION_POLICIES);
Map<String, NamespaceIsolationData> nsPolicies;
try {
- Optional<NamespaceIsolationPolicies> nsPoliciesResult =
namespaceIsolationPoliciesCache()
- .get(nsIsolationPoliciesPath);
+ Optional<NamespaceIsolationPolicies> nsPoliciesResult =
namespaceIsolationPolicies()
+ .getPolicies(nsIsolationPoliciesPath);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation
policies not found for " + cluster);
}
@@ -710,21 +682,18 @@ public class ClustersBase extends AdminResource {
jsonInput =
ObjectMapperFactory.create().writeValueAsString(policyData);
String nsIsolationPolicyPath = path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES);
- NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPoliciesCache()
- .get(nsIsolationPolicyPath).orElseGet(() -> {
+ NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPolicies()
+ .getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
- this.createZnodeIfNotExist(nsIsolationPolicyPath,
Optional.of(Collections.emptyMap()));
+
namespaceIsolationPolicies().create(nsIsolationPolicyPath,
Collections.emptyMap());
return new NamespaceIsolationPolicies();
- } catch (KeeperException | InterruptedException e) {
+ } catch (Exception e) {
throw new RestException(e);
}
});
nsIsolationPolicies.setPolicy(policyName, policyData);
- globalZk().setData(nsIsolationPolicyPath,
jsonMapper().writeValueAsBytes(nsIsolationPolicies.getPolicies()),
- -1);
- // make sure that the cache content will be refreshed for the next
read access
-
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
+ namespaceIsolationPolicies().set(nsIsolationPolicyPath, old ->
nsIsolationPolicies.getPolicies());
// whether or not make the isolation update on time.
if
(pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
@@ -738,7 +707,7 @@ public class ClustersBase extends AdminResource {
clientAppId(), cluster, policyName, iae);
asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Invalid format of input policy data. policy: " +
policyName + "; data: " + jsonInput));
- } catch (KeeperException.NoNodeException nne) {
+ } catch (NotFoundException nne) {
log.warn("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
cluster);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -832,29 +801,6 @@ public class ClustersBase extends AdminResource {
});
}
- private boolean createZnodeIfNotExist(String path, Optional<Object> value)
- throws KeeperException, InterruptedException {
- // create persistent node on ZooKeeper
- if (globalZk().exists(path, false) == null) {
- // create all the intermediate nodes
- try {
- ZkUtils.createFullPathOptimistic(globalZk(), path,
- value.isPresent() ?
jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- return true;
- } catch (KeeperException.NodeExistsException nee) {
- if (log.isDebugEnabled()) {
- log.debug("Other broker preempted the full path [{}]
already. Continue...", path);
- }
- } catch (JsonGenerationException e) {
- // ignore json error as it is empty hash
- } catch (JsonMappingException e) {
- } catch (IOException e) {
- }
- }
- return false;
- }
-
@DELETE
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(
@@ -886,22 +832,19 @@ public class ClustersBase extends AdminResource {
try {
String nsIsolationPolicyPath = path("clusters", cluster,
NAMESPACE_ISOLATION_POLICIES);
- NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPoliciesCache()
- .get(nsIsolationPolicyPath).orElseGet(() -> {
+ NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPolicies()
+ .getPolicies(nsIsolationPolicyPath).orElseGet(() -> {
try {
- this.createZnodeIfNotExist(nsIsolationPolicyPath,
Optional.of(Collections.emptyMap()));
+
namespaceIsolationPolicies().create(nsIsolationPolicyPath,
Collections.emptyMap());
return new NamespaceIsolationPolicies();
- } catch (KeeperException | InterruptedException e) {
+ } catch (Exception e) {
throw new RestException(e);
}
});
nsIsolationPolicies.deletePolicy(policyName);
- globalZk().setData(nsIsolationPolicyPath,
jsonMapper().writeValueAsBytes(nsIsolationPolicies.getPolicies()),
- -1);
- // make sure that the cache content will be refreshed for the next
read access
-
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);
- } catch (KeeperException.NoNodeException nne) {
+ namespaceIsolationPolicies().set(nsIsolationPolicyPath, old ->
nsIsolationPolicies.getPolicies());
+ } catch (NotFoundException nne) {
log.warn("[{}] Failed to update
brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
cluster);
throw new RestException(Status.NOT_FOUND,
@@ -949,15 +892,9 @@ public class ClustersBase extends AdminResource {
try {
String domainPath =
joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT,
domainName);
- if (this.createZnodeIfNotExist(domainPath,
Optional.ofNullable(domain))) {
- // clear domains-children cache
- this.failureDomainListCache().clear();
- } else {
- globalZk().setData(domainPath,
jsonMapper().writeValueAsBytes(domain), -1);
- // make sure that the domain-cache will be refreshed for the
next read access
- failureDomainCache().invalidate(domainPath);
- }
- } catch (KeeperException.NoNodeException nne) {
+ FailureDomainResources failureDomainListCache =
clusterResources().getFailureDomainResources();
+ failureDomainListCache.create(domainPath, old -> domain);
+ } catch (NotFoundException nne) {
log.warn("[{}] Failed to update domain {}. clusters {} Does not
exist", clientAppId(), cluster,
domainName);
throw new RestException(Status.NOT_FOUND,
@@ -992,16 +929,17 @@ public class ClustersBase extends AdminResource {
Map<String, FailureDomain> domains = Maps.newHashMap();
try {
final String failureDomainRootPath =
pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
- for (String domainName : failureDomainListCache().get()) {
+ FailureDomainResources failureDomainListCache =
clusterResources().getFailureDomainResources();
+ for (String domainName :
failureDomainListCache.getChildren(failureDomainRootPath)) {
try {
- Optional<FailureDomain> domain = failureDomainCache()
+ Optional<FailureDomain> domain = failureDomainListCache
.get(joinPath(failureDomainRootPath, domainName));
domain.ifPresent(failureDomain -> domains.put(domainName,
failureDomain));
} catch (Exception e) {
log.warn("Failed to get domain {}", domainName, e);
}
}
- } catch (KeeperException.NoNodeException e) {
+ } catch (NotFoundException e) {
log.warn("[{}] Failure-domain is not configured for cluster {}",
clientAppId(), cluster, e);
return Collections.emptyMap();
} catch (Exception e) {
@@ -1041,7 +979,7 @@ public class ClustersBase extends AdminResource {
try {
final String failureDomainRootPath =
pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
- return failureDomainCache().get(joinPath(failureDomainRootPath,
domainName))
+ return
clusterResources().getFailureDomainResources().get(joinPath(failureDomainRootPath,
domainName))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Domain " + domainName + " for cluster " + cluster
+ " does not exist"));
} catch (RestException re) {
@@ -1082,11 +1020,8 @@ public class ClustersBase extends AdminResource {
try {
final String domainPath =
joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT,
domainName);
- globalZk().delete(domainPath, -1);
- // clear domain cache
- failureDomainCache().invalidate(domainPath);
- failureDomainListCache().clear();
- } catch (KeeperException.NoNodeException nne) {
+ clusterResources().getFailureDomainResources().delete(domainPath);
+ } catch (NotFoundException nne) {
log.warn("[{}] Domain {} does not exist in {}", clientAppId(),
domainName, cluster);
throw new RestException(Status.NOT_FOUND,
"Domain-name " + domainName + " or cluster " + cluster + "
does not exist");
@@ -1101,13 +1036,14 @@ public class ClustersBase extends AdminResource {
if (inputDomain != null && inputDomain.brokers != null) {
try {
final String failureDomainRootPath =
pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
- for (String domainName : failureDomainListCache().get()) {
+ for (String domainName :
clusterResources().getFailureDomainResources()
+ .getChildren(failureDomainRootPath)) {
if (inputDomainName.equals(domainName)) {
continue;
}
try {
Optional<FailureDomain> domain =
- failureDomainCache()
+ clusterResources().getFailureDomainResources()
.get(joinPath(failureDomainRootPath,
domainName));
if (domain.isPresent() && domain.get().brokers !=
null) {
List<String> duplicateBrokers =
domain.get().brokers.stream().parallel()
@@ -1124,7 +1060,7 @@ public class ClustersBase extends AdminResource {
log.warn("Failed to get domain {}", domainName, e);
}
}
- } catch (KeeperException.NoNodeException e) {
+ } catch (NotFoundException e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Domain is not configured for cluster",
clientAppId(), e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
new file mode 100644
index 0000000..966d421
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespaceResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Getter;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class NamespaceResources extends BaseResources<Policies> {
+ @Getter
+ private IsolationPolicyResources isolationPolicies;
+
+ public NamespaceResources(MetadataStoreExtended store) {
+ super(store, Policies.class);
+ isolationPolicies = new IsolationPolicyResources(store);
+ }
+
+ public static class IsolationPolicyResources extends
BaseResources<Map<String, NamespaceIsolationData>> {
+ public IsolationPolicyResources(MetadataStoreExtended store) {
+ super(store, new TypeReference<Map<String,
NamespaceIsolationData>>() {
+ });
+ }
+
+ public Optional<NamespaceIsolationPolicies> getPolicies(String path)
throws MetadataStoreException {
+ Optional<Map<String, NamespaceIsolationData>> data =
super.get(path);
+ return data.isPresent() ? Optional.of(new
NamespaceIsolationPolicies(data.get())) : Optional.empty();
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
new file mode 100644
index 0000000..4384762
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PulsarResources.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+@Getter(AccessLevel.PUBLIC)
+public class PulsarResources {
+
+ private TenantResources tenatResources;
+ private ClusterResources clusterResources;
+ private NamespaceResources namespaceResources;
+
+ public PulsarResources(MetadataStoreExtended configurationMetadataStore) {
+ tenatResources = new TenantResources(configurationMetadataStore);
+ clusterResources = new ClusterResources(configurationMetadataStore);
+ namespaceResources = new
NamespaceResources(configurationMetadataStore);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
new file mode 100644
index 0000000..1a4fc38
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantResources.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class TenantResources extends BaseResources<TenantInfo> {
+ public TenantResources(MetadataStoreExtended store) {
+ super(store, TenantInfo.class);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index e1c270d..9c28046 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -24,8 +24,10 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
@@ -33,35 +35,45 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TenantsBase extends AdminResource {
+public class TenantsBase extends PulsarWebResource {
+
+ private static final Logger log =
LoggerFactory.getLogger(TenantsBase.class);
@GET
@ApiOperation(value = "Get the list of existing tenants.", response =
String.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant doesn't exist") })
- public List<String> getTenants() {
- validateSuperUserAccess();
-
+ public void getTenants(@Suspended final AsyncResponse asyncResponse) {
+ final String clientAppId = clientAppId();
try {
- List<String> tenants = globalZk().getChildren(path(POLICIES),
false);
- tenants.sort(null);
- return tenants;
+ validateSuperUserAccess();
} catch (Exception e) {
- log.error("[{}] Failed to get tenants list", clientAppId(), e);
- throw new RestException(e);
+ asyncResponse.resume(e);
+ return;
}
+
tenantResources().getChildrenAsync(path(POLICIES)).whenComplete((tenants, e) ->
{
+ if (e != null) {
+ log.error("[{}] Failed to get tenants list", clientAppId, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ tenants.sort(null);
+ asyncResponse.resume(tenants);
+ });
}
@GET
@@ -69,18 +81,25 @@ public class TenantsBase extends AdminResource {
@ApiOperation(value = "Get the admin configuration for a given tenant.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist") })
- public TenantInfo getTenantAdmin(
- @ApiParam(value = "The tenant name")
- @PathParam("tenant") String tenant) {
- validateSuperUserAccess();
-
+ public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "The tenant name") @PathParam("tenant") String
tenant) {
+ final String clientAppId = clientAppId();
try {
- return tenantsCache().get(path(POLICIES, tenant))
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Tenant does not exist"));
+ validateSuperUserAccess();
} catch (Exception e) {
- log.error("[{}] Failed to get tenant {}", clientAppId(), tenant,
e);
- throw new RestException(e);
+ asyncResponse.resume(e);
}
+
+ tenantResources().getAsync(path(POLICIES,
tenant)).whenComplete((tenantInfo, e) -> {
+ if (e != null) {
+ log.error("[{}] Failed to get Tenant {}", clientAppId,
e.getMessage());
+ asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get Tenant"));
+ return;
+ }
+ boolean response = tenantInfo.isPresent() ?
asyncResponse.resume(tenantInfo.get())
+ : asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Tenant does not exist"));
+ return;
+ });
}
@PUT
@@ -91,103 +110,113 @@ public class TenantsBase extends AdminResource {
@ApiResponse(code = 412, message = "Tenant name is not valid"),
@ApiResponse(code = 412, message = "Clusters can not be empty"),
@ApiResponse(code = 412, message = "Clusters do not exist") })
- public void createTenant(
- @ApiParam(value = "The tenant name")
- @PathParam("tenant") String tenant,
- @ApiParam(value = "TenantInfo") TenantInfo config) {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
- validateClusters(config);
+ public void createTenant(@Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "The tenant name") @PathParam("tenant") String
tenant,
+ @ApiParam(value = "TenantInfo") TenantInfo tenantInfo) {
+ final String clientAppId = clientAppId();
try {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+ validateClusters(tenantInfo);
NamedEntity.checkName(tenant);
+ } catch (IllegalArgumentException e) {
+ log.warn("[{}] Failed to create tenant with invalid name {}",
clientAppId(), tenant, e);
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Tenant name is not valid"));
+ return;
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ return;
+ }
+
+
tenantResources().getChildrenAsync(path(POLICIES)).whenComplete((tenants, e) ->
{
+ if (e != null) {
+ log.error("[{}] Failed to create tenant ", clientAppId,
e.getCause());
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
int maxTenants = pulsar().getConfiguration().getMaxTenants();
- //Due to the cost of distributed locks, no locks are added here.
- //In a concurrent scenario, the threshold will be exceeded.
+ // Due to the cost of distributed locks, no locks are added here.
+ // In a concurrent scenario, the threshold will be exceeded.
if (maxTenants > 0) {
- List<String> tenants = globalZk().getChildren(path(POLICIES),
false);
if (tenants != null && tenants.size() >= maxTenants) {
- throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of tenants");
+ asyncResponse.resume(
+ new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of tenants"));
+ return;
}
}
- zkCreate(path(POLICIES, tenant),
jsonMapper().writeValueAsBytes(config));
- log.info("[{}] Created tenant {}", clientAppId(), tenant);
- } catch (KeeperException.NodeExistsException e) {
- log.warn("[{}] Failed to create already existing tenant {}",
clientAppId(), tenant);
- throw new RestException(Status.CONFLICT, "Tenant already exists");
- } catch (IllegalArgumentException e) {
- log.warn("[{}] Failed to create tenant with invalid name {}",
clientAppId(), tenant, e);
- throw new RestException(Status.PRECONDITION_FAILED, "Tenant name
is not valid");
- } catch (Exception e) {
- log.error("[{}] Failed to create tenant {}", clientAppId(),
tenant, e);
- throw new RestException(e);
- }
+ tenantResources().existsAsync(path(POLICIES,
tenant)).thenAccept(exist ->{
+ if (exist) {
+ asyncResponse.resume(new RestException(Status.CONFLICT,
"Tenant already exist"));
+ return;
+ }
+ tenantResources().createAsync(path(POLICIES, tenant),
tenantInfo).thenAccept((r) -> {
+ log.info("[{}] Created tenant {}", clientAppId(), tenant);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create tenant {}", clientAppId,
tenant, e);
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create tenant {}", clientAppId(),
tenant, e);
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ });
}
@POST
@Path("/{tenant}")
@ApiOperation(value = "Update the admins for a tenant.",
- notes = "This operation requires Pulsar super-user privileges.")
+ notes = "This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Clusters can not be empty"),
@ApiResponse(code = 412, message = "Clusters do not exist") })
- public void updateTenant(
- @ApiParam(value = "The tenant name")
- @PathParam("tenant") String tenant,
- @ApiParam(value = "TenantInfo") TenantInfo newTenantAdmin) {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
- validateClusters(newTenantAdmin);
-
- Stat nodeStat = new Stat();
+ public void updateTenant(@Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "The tenant name") @PathParam("tenant") String
tenant,
+ @ApiParam(value = "TenantInfo") TenantInfo newTenantAdmin) {
try {
- byte[] content = globalZk().getData(path(POLICIES, tenant), null,
nodeStat);
- TenantInfo oldTenantAdmin = jsonMapper().readValue(content,
TenantInfo.class);
- List<String> clustersWithActiveNamespaces = Lists.newArrayList();
- if (oldTenantAdmin.getAllowedClusters().size() >
newTenantAdmin.getAllowedClusters().size()) {
- // Get the colo(s) being removed from the list
-
oldTenantAdmin.getAllowedClusters().removeAll(newTenantAdmin.getAllowedClusters());
- log.debug("Following clusters are being removed : [{}]",
oldTenantAdmin.getAllowedClusters());
- for (String cluster : oldTenantAdmin.getAllowedClusters()) {
- if (Constants.GLOBAL_CLUSTER.equals(cluster)) {
- continue;
- }
- List<String> activeNamespaces = Lists.newArrayList();
- try {
- activeNamespaces =
globalZk().getChildren(path(POLICIES, tenant, cluster), false);
- if (activeNamespaces.size() != 0) {
- // There are active namespaces in this cluster
- clustersWithActiveNamespaces.add(cluster);
- }
- } catch (KeeperException.NoNodeException nne) {
- // Fine, some cluster does not have active namespace.
Move on!
- }
- }
- if (!clustersWithActiveNamespaces.isEmpty()) {
- // Throw an exception because colos being removed are
having active namespaces
- String msg = String.format(
- "Failed to update the tenant because active
namespaces are present in colos %s."
- + " Please delete those namespaces first",
- clustersWithActiveNamespaces);
- throw new RestException(Status.CONFLICT, msg);
- }
- }
- String tenantPath = path(POLICIES, tenant);
- globalZk().setData(tenantPath,
jsonMapper().writeValueAsBytes(newTenantAdmin), -1);
- globalZkCache().invalidate(tenantPath);
- log.info("[{}] updated tenant {}", clientAppId(), tenant);
- } catch (RestException re) {
- throw re;
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update tenant {}: does not exist",
clientAppId(), tenant);
- throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+ validateClusters(newTenantAdmin);
} catch (Exception e) {
- log.error("[{}] Failed to update tenant {}", clientAppId(),
tenant, e);
- throw new RestException(e);
+ asyncResponse.resume(e);
+ return;
}
+
+ final String clientAddId = clientAppId();
+ tenantResources().getAsync(path(POLICIES,
tenant)).thenAccept(tenantAdmin -> {
+ if (!tenantAdmin.isPresent()) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Tenant " + tenant + " not found"));
+ return;
+ }
+ TenantInfo oldTenantAdmin = tenantAdmin.get();
+ Set<String> newClusters = new
HashSet<>(newTenantAdmin.getAllowedClusters());
+ canUpdateCluster(tenant, oldTenantAdmin.getAllowedClusters(),
newClusters).thenApply(r -> {
+ tenantResources().setAsync(path(POLICIES, tenant), old -> {
+ return newTenantAdmin;
+ }).thenAccept(done -> {
+ log.info("Successfully updated tenant info {}", tenant);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.warn("Failed to update tenant {}", tenant,
ex.getCause());
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ return null;
+ }).exceptionally(nsEx -> {
+ asyncResponse.resume(nsEx.getCause());
+ return null;
+ });
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get tenant {}", clientAddId, tenant,
ex.getCause());
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
}
@DELETE
@@ -196,47 +225,59 @@ public class TenantsBase extends AdminResource {
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "The tenant still has active
namespaces") })
- public void deleteTenant(
- @PathParam("tenant")
- @ApiParam(value = "The tenant name")
- String tenant) {
- validateSuperUserAccess();
- validatePoliciesReadOnlyAccess();
-
- boolean isTenantEmpty;
+ public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") @ApiParam(value = "The tenant name") String
tenant) {
try {
- isTenantEmpty = getListOfNamespaces(tenant).isEmpty();
- } catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to delete tenant {}: does not exist",
clientAppId(), tenant);
- throw new RestException(Status.NOT_FOUND, "The tenant does not
exist");
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
} catch (Exception e) {
- log.error("[{}] Failed to get tenant status {}", clientAppId(),
tenant, e);
- throw new RestException(e);
+ asyncResponse.resume(e);
+ return;
}
- if (!isTenantEmpty) {
- log.warn("[{}] Failed to delete tenant {}: not empty",
clientAppId(), tenant);
- throw new RestException(Status.CONFLICT, "The tenant still has
active namespaces");
- }
-
- try {
- // First try to delete every cluster z-node
- for (String cluster : globalZk().getChildren(path(POLICIES,
tenant), false)) {
- globalZk().delete(path(POLICIES, tenant, cluster), -1);
+ tenantResources().existsAsync(path(POLICIES, tenant)).thenApply(exists
->{
+ if (!exists) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Tenant doesn't exist"));
+ return null;
}
-
- globalZk().delete(path(POLICIES, tenant), -1);
- log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
- } catch (Exception e) {
- log.error("[{}] Failed to delete tenant {}", clientAppId(),
tenant, e);
- throw new RestException(e);
- }
+ return hasActiveNamespace(tenant).thenAccept(ns -> {
+ try {
+ // already fetched children and they should be in the cache
+ List<CompletableFuture<Void>> clusterList =
Lists.newArrayList();
+ for (String cluster :
tenantResources().getChildrenAsync(path(POLICIES, tenant)).get()) {
+
clusterList.add(tenantResources().deleteAsync(path(POLICIES, tenant, cluster)));
+ }
+ FutureUtil.waitForAll(clusterList).thenAccept(c -> {
+ tenantResources().deleteAsync(path(POLICIES,
tenant)).thenAccept(t -> {
+ log.info("[{}] Deleted tenant {}", clientAppId(),
tenant);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("Failed to delete tenant {}", tenant,
ex.getCause());
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ }).exceptionally(ex -> {
+ log.error("Failed to delete clusters under tenant {}",
tenant, ex.getCause());
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete tenant {}",
clientAppId(), tenant, e);
+ asyncResponse.resume(new RestException(e));
+ }
+ }).exceptionally(ex -> {
+ log.error("Failed to delete tenant due to active namespace
{}", tenant, ex.getCause());
+ asyncResponse.resume(new RestException(ex));
+ return null;
+ });
+ });
}
private void validateClusters(TenantInfo info) {
// empty cluster shouldn't be allowed
- if (info == null || info.getAllowedClusters().stream()
- .filter(c ->
!StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
+ if (info == null || info.getAllowedClusters().stream().filter(c ->
!StringUtils.isBlank(c))
+ .collect(Collectors.toSet()).isEmpty()
|| info.getAllowedClusters().stream().anyMatch(ac ->
StringUtils.isBlank(ac))) {
log.warn("[{}] Failed to validate due to clusters are empty",
clientAppId());
throw new RestException(Status.PRECONDITION_FAILED, "Clusters can
not be empty");
@@ -244,11 +285,11 @@ public class TenantsBase extends AdminResource {
List<String> nonexistentClusters;
try {
- Set<String> availableClusters = clustersListCache().get();
+ Set<String> availableClusters = clusterResources().list();
Set<String> allowedClusters = info.getAllowedClusters();
- nonexistentClusters = allowedClusters.stream()
- .filter(cluster -> !(availableClusters.contains(cluster) ||
Constants.GLOBAL_CLUSTER.equals(cluster)))
- .collect(Collectors.toList());
+ nonexistentClusters = allowedClusters.stream().filter(
+ cluster -> !(availableClusters.contains(cluster) ||
Constants.GLOBAL_CLUSTER.equals(cluster)))
+ .collect(Collectors.toList());
} catch (Exception e) {
log.error("[{}] Failed to get available clusters", clientAppId(),
e);
throw new RestException(e);
@@ -258,6 +299,4 @@ public class TenantsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED, "Clusters do
not exist");
}
}
-
- private static final Logger log =
LoggerFactory.getLogger(TenantsBase.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index f10431c..1a307a8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,12 +21,16 @@ package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static
org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -44,6 +48,10 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.ClusterResources;
+import org.apache.pulsar.broker.admin.impl.NamespaceResources;
+import
org.apache.pulsar.broker.admin.impl.NamespaceResources.IsolationPolicyResources;
+import org.apache.pulsar.broker.admin.impl.TenantResources;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -65,7 +73,9 @@ import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.path.PolicyPath;
-import org.apache.zookeeper.KeeperException;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,7 +179,7 @@ public abstract class PulsarWebResource {
* @throws WebApplicationException
* if not authorized
*/
- protected void validateSuperUserAccess() {
+ public void validateSuperUserAccess() {
if (config().isAuthenticationEnabled()) {
String appId = clientAppId();
if (log.isDebugEnabled()) {
@@ -245,15 +255,8 @@ public abstract class PulsarWebResource {
(isClientAuthenticated(clientAppId)), clientAppId);
}
- TenantInfo tenantInfo;
-
- try {
- tenantInfo =
pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, tenant))
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Tenant does not exist"));
- } catch (KeeperException.NoNodeException e) {
- log.warn("Failed to get tenant info data for non existing tenant
{}", tenant);
- throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
- }
+ TenantInfo tenantInfo =
pulsar.getPulsarResources().getTenatResources().get(path(POLICIES, tenant))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant
does not exist"));
if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId)) {
@@ -308,7 +311,7 @@ public abstract class PulsarWebResource {
protected void validateClusterForTenant(String tenant, String cluster) {
TenantInfo tenantInfo;
try {
- tenantInfo =
pulsar().getConfigurationCache().propertiesCache().get(path(POLICIES, tenant))
+ tenantInfo =
pulsar().getPulsarResources().getTenatResources().get(path(POLICIES, tenant))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Tenant does not exist"));
} catch (RestException e) {
log.warn("Failed to get tenant admin data for tenant {}", tenant);
@@ -859,4 +862,134 @@ public abstract class PulsarWebResource {
}
}
}
+
+ protected TenantResources tenantResources() {
+ return pulsar().getPulsarResources().getTenatResources();
+ }
+
+ protected ClusterResources clusterResources() {
+ return pulsar().getPulsarResources().getClusterResources();
+ }
+
+ protected NamespaceResources namespaceResources() {
+ return pulsar().getPulsarResources().getNamespaceResources();
+ }
+
+ protected IsolationPolicyResources namespaceIsolationPolicies(){
+ return namespaceResources().getIsolationPolicies();
+ }
+
+ public static ObjectMapper jsonMapper() {
+ return ObjectMapperFactory.getThreadLocal();
+ }
+
+ public void validatePoliciesReadOnlyAccess() {
+ try {
+ if
(clusterResources().existsAsync(AdminResource.POLICIES_READONLY_FLAG_PATH).get())
{
+ log.debug("Policies are read-only. Broker cannot do read-write
operations");
+ throw new RestException(Status.FORBIDDEN, "Broker is forbidden
to do read-write operations");
+ }
+ } catch (Exception e) {
+ log.warn("Unable to fetch read-only policy config {}",
POLICIES_READONLY_FLAG_PATH, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected CompletableFuture<Void> hasActiveNamespace(String tenant) {
+ CompletableFuture<Void> activeNamespaceFuture = new
CompletableFuture<>();
+ tenantResources().getChildrenAsync(path(POLICIES,
tenant)).thenAccept(clusterOrNamespaceList -> {
+ if (clusterOrNamespaceList == null ||
clusterOrNamespaceList.isEmpty()) {
+ activeNamespaceFuture.complete(null);
+ return;
+ }
+ List<CompletableFuture<Void>> activeNamespaceListFuture =
Lists.newArrayList();
+ clusterOrNamespaceList.forEach(clusterOrNamespace -> {
+ // get list of active V1 namespace
+ CompletableFuture<Void> checkNs = new CompletableFuture<>();
+ activeNamespaceListFuture.add(checkNs);
+ tenantResources().getChildrenAsync(path(POLICIES, tenant,
clusterOrNamespace))
+ .whenComplete((children, ex) -> {
+ if (ex != null) {
+ checkNs.completeExceptionally(ex);
+ return;
+ }
+ if (children != null && !children.isEmpty()) {
+ checkNs.completeExceptionally(
+ new
RestException(Status.PRECONDITION_FAILED, "Tenant has active namespace"));
+ return;
+ }
+ String namespace = NamespaceName.get(tenant,
clusterOrNamespace).toString();
+ // if the length is 0 then this is probably a
leftover cluster from namespace
+ // created
+ // with the v1 admin format (prop/cluster/ns) and
then deleted, so no need to
+ // add it to the list
+ namespaceResources().getAsync(path(POLICIES,
namespace)).thenApply(data -> {
+ if (data.isPresent()) {
+ checkNs.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Tenant has active namespace"));
+ } else {
+ checkNs.complete(null);
+ }
+ return null;
+ }).exceptionally(ex2 -> {
+ if (ex2.getCause() instanceof
MetadataStoreException.ContentDeserializationException) {
+ // it's not a valid namespace-node
+ checkNs.complete(null);
+ } else {
+ checkNs.completeExceptionally(
+ new
RestException(Status.INTERNAL_SERVER_ERROR, ex2.getCause()));
+ }
+ return null;
+ });
+ });
+ FutureUtil.waitForAll(activeNamespaceListFuture).thenAccept(r
-> {
+ activeNamespaceFuture.complete(null);
+ }).exceptionally(ex -> {
+ activeNamespaceFuture.completeExceptionally(ex.getCause());
+ return null;
+ });
+ });
+ }).exceptionally(ex -> {
+ activeNamespaceFuture.completeExceptionally(ex.getCause());
+ return null;
+ });
+ return activeNamespaceFuture;
+ }
+
+ protected void validateClusterExists(String cluster) {
+ try {
+ if (!clusterResources().get(path("clusters",
cluster)).isPresent()) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Cluster "
+ cluster + " does not exist.");
+ }
+ } catch (Exception e) {
+ throw new RestException(e);
+ }
+ }
+
+ protected CompletableFuture<Void> canUpdateCluster(String tenant,
Set<String> oldClusters,
+ Set<String> newClusters) {
+ List<CompletableFuture<Void>> activeNamespaceFuture =
Lists.newArrayList();
+ for (String cluster : oldClusters) {
+ if (Constants.GLOBAL_CLUSTER.equals(cluster) ||
newClusters.contains(cluster)) {
+ continue;
+ }
+ CompletableFuture<Void> checkNs = new CompletableFuture<>();
+ activeNamespaceFuture.add(checkNs);
+ tenantResources().getChildrenAsync(path(POLICIES, tenant,
cluster)).whenComplete((activeNamespaces, ex) -> {
+ if (ex != null) {
+ log.warn("Failed to get namespaces under {}-{}, {}",
tenant, cluster, ex.getCause().getMessage());
+ checkNs.completeExceptionally(ex.getCause());
+ return;
+ }
+ if (activeNamespaces.size() > 0) {
+ log.warn("{}/{} Active-namespaces {}", tenant, cluster,
activeNamespaces);
+ checkNs.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED, "Active namespaces"));
+ return;
+ }
+ checkNs.complete(null);
+ });
+ }
+ return activeNamespaceFuture.isEmpty() ?
CompletableFuture.completedFuture(null)
+ : FutureUtil.waitForAll(activeNamespaceFuture);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 8179afa..268a4ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -339,7 +339,6 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}
-
// verify delete cluster failed
try {
admin.clusters().deleteCluster("test");
@@ -626,6 +625,13 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(enabled = true)
public void properties() throws PulsarAdminException {
+ try {
+ admin.tenants().getTenantInfo("does-not-exist");
+ fail("should have failed");
+ } catch (PulsarAdminException e) {
+ assertTrue(e instanceof NotFoundException);
+ }
+
Set<String> allowedClusters = Sets.newHashSet("test");
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), allowedClusters);
admin.tenants().updateTenant("prop-xyz", tenantInfo);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 047a174..f51eed1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -43,11 +43,15 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
@@ -82,12 +86,13 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -129,18 +134,15 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
clusters = spy(new Clusters());
clusters.setPulsar(pulsar);
- doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
+ /*doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
-
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();
+
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();*/
doReturn("test").when(clusters).clientAppId();
doNothing().when(clusters).validateSuperUserAccess();
properties = spy(new Properties());
- properties.setServletContext(new MockServletContext());
properties.setPulsar(pulsar);
- doReturn(mockZooKeeperGlobal).when(properties).globalZk();
-
doReturn(configurationCache.propertiesCache()).when(properties).tenantsCache();
doReturn("test").when(properties).clientAppId();
doNothing().when(properties).validateSuperUserAccess();
@@ -239,7 +241,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
clusters.createCluster("use", new
ClusterData("http://broker.messaging.use.example.com"));
verify(clusters, times(1)).validateSuperUserAccess();
// ensure to read from ZooKeeper directly
- clusters.clustersListCache().clear();
+ //clusters.clustersListCache().clear();
assertEquals(clusters.getClusters(), Lists.newArrayList("use"));
// Check creating existing cluster
@@ -329,6 +331,14 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
&& path.equals("/admin/clusters");
});
configurationCache.clustersListCache().clear();
+ // clear caches to load data from metadata-store again
+ MetadataCacheImpl<ClusterData> clusterCache =
(MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources()
+ .getClusterResources().getCache();
+ MetadataCacheImpl isolationPolicyCache = (MetadataCacheImpl)
pulsar.getPulsarResources()
+ .getNamespaceResources().getIsolationPolicies().getCache();
+ AbstractMetadataStore store = (AbstractMetadataStore)
clusterCache.getStore();
+ clusterCache.invalidateAll();
+ store.invalidateAll();
try {
clusters.getClusters();
fail("should have failed");
@@ -351,6 +361,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
+ clusterCache.invalidateAll();
+ store.invalidateAll();
try {
clusters.updateCluster("test", new
ClusterData("http://broker.messaging.test.example.com"));
fail("should have failed");
@@ -386,6 +398,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
return op == MockZooKeeper.Op.GET
&&
path.equals("/admin/clusters/use/namespaceIsolationPolicies");
});
+ clusterCache.invalidateAll();
+ isolationPolicyCache.invalidateAll();
+ store.invalidateAll();
try {
clusters.deleteCluster("use");
fail("should have failed");
@@ -402,9 +417,19 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
}
}
+ Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception
{
+ TestAsyncResponse ctx = new TestAsyncResponse();
+ function.accept(ctx);
+ ctx.latch.await();
+ if (ctx.e != null) {
+ throw (Exception) ctx.e;
+ }
+ return ctx.response;
+ }
@Test
- public void properties() throws Exception {
- assertEquals(properties.getTenants(), Lists.newArrayList());
+ public void properties() throws Throwable {
+ Object response = asynRequests(ctx -> properties.getTenants(ctx));
+ assertEquals(response, Lists.newArrayList());
verify(properties, times(1)).validateSuperUserAccess();
// create local cluster
@@ -413,29 +438,33 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), allowedClusters);
- properties.createTenant("test-property", tenantInfo);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"test-property", tenantInfo));
verify(properties, times(2)).validateSuperUserAccess();
- assertEquals(properties.getTenants(),
Lists.newArrayList("test-property"));
+ response = asynRequests(ctx -> properties.getTenants(ctx));
+ assertEquals(response, Lists.newArrayList("test-property"));
verify(properties, times(3)).validateSuperUserAccess();
- assertEquals(properties.getTenantAdmin("test-property"), tenantInfo);
+ response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
+ assertEquals(response, tenantInfo);
verify(properties, times(4)).validateSuperUserAccess();
- TenantInfo newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1",
"other-role"), allowedClusters);
- properties.updateTenant("test-property", newPropertyAdmin);
+ final TenantInfo newPropertyAdmin = new
TenantInfo(Sets.newHashSet("role1", "other-role"), allowedClusters);
+ response = asynRequests(ctx -> properties.updateTenant(ctx,
"test-property", newPropertyAdmin));
verify(properties, times(5)).validateSuperUserAccess();
// Wait for updateTenant to take effect
Thread.sleep(100);
- assertEquals(properties.getTenantAdmin("test-property"),
newPropertyAdmin);
- assertNotSame(properties.getTenantAdmin("test-property"), tenantInfo);
+ response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
+ assertEquals(response, newPropertyAdmin);
+ response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
+ assertNotSame(response, tenantInfo);
verify(properties, times(7)).validateSuperUserAccess();
// Check creating existing property
try {
- properties.createTenant("test-property", tenantInfo);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"test-property", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
@@ -443,14 +472,14 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
// Check non-existing property
try {
- properties.getTenantAdmin("non-existing");
+ response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"non-existing"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
}
try {
- properties.updateTenant("xxx-non-existing", newPropertyAdmin);
+ response = asynRequests(ctx -> properties.updateTenant(ctx,
"xxx-non-existing", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
@@ -458,93 +487,97 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
// Check deleting non-existing property
try {
- properties.deleteTenant("non-existing");
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"non-existing"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
}
+ // clear caches to load data from metadata-store again
+ MetadataCacheImpl<TenantInfo> cache = (MetadataCacheImpl<TenantInfo>)
pulsar.getPulsarResources()
+ .getTenatResources().getCache();
+ AbstractMetadataStore store = (AbstractMetadataStore) cache.getStore();
+ cache.invalidateAll();
+ store.invalidateAll();
// Test zk failures
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
- return op == MockZooKeeper.Op.GET_CHILDREN
- && path.equals("/admin/policies");
- });
+ return op == MockZooKeeper.Op.GET_CHILDREN &&
path.equals("/admin/policies");
+ });
try {
- properties.getTenants();
+ response = asynRequests(ctx -> properties.getTenants(ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
- return op == MockZooKeeper.Op.GET
- && path.equals("/admin/policies/my-tenant");
- });
+ return op == MockZooKeeper.Op.GET &&
path.equals("/admin/policies/my-tenant");
+ });
try {
- properties.getTenantAdmin("my-tenant");
+ response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"my-tenant"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
- return op == MockZooKeeper.Op.GET
- && path.equals("/admin/policies/my-tenant");
- });
+ return op == MockZooKeeper.Op.GET &&
path.equals("/admin/policies/my-tenant");
+ });
try {
- properties.updateTenant("my-tenant", newPropertyAdmin);
+ response = asynRequests(ctx -> properties.updateTenant(ctx,
"my-tenant", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
- return op == MockZooKeeper.Op.CREATE
- && path.equals("/admin/policies/test");
- });
+ return op == MockZooKeeper.Op.CREATE &&
path.equals("/admin/policies/test");
+ });
try {
- properties.createTenant("test", tenantInfo);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"test", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
- return op == MockZooKeeper.Op.GET_CHILDREN
- && path.equals("/admin/policies/my-tenant");
- });
+ return op == MockZooKeeper.Op.GET_CHILDREN &&
path.equals("/admin/policies/test-property");
+ });
try {
- properties.deleteTenant("my-tenant");
+ cache.invalidateAll();
+ store.invalidateAll();
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"test-property"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
- properties.createTenant("error-property", tenantInfo);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"error-property", tenantInfo));
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
- return op == MockZooKeeper.Op.DELETE
- && path.equals("/admin/policies/error-property");
- });
+ return op == MockZooKeeper.Op.DELETE &&
path.equals("/admin/policies/error-property");
+ });
try {
- properties.deleteTenant("error-property");
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"error-property"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
- properties.deleteTenant("test-property");
- properties.deleteTenant("error-property");
- assertEquals(properties.getTenants(), Lists.newArrayList());
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"test-property"));
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"error-property"));
+ response = Lists.newArrayList();
+ response = asynRequests(ctx -> properties.getTenants(ctx));
+ assertEquals(response, Lists.newArrayList());
// Create a namespace to test deleting a non-empty property
- newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1",
"other-role"), Sets.newHashSet("use"));
- properties.createTenant("my-tenant", newPropertyAdmin);
+ TenantInfo newPropertyAdmin2 = new TenantInfo(Sets.newHashSet("role1",
"other-role"), Sets.newHashSet("use"));
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"my-tenant", newPropertyAdmin2));
namespaces.createNamespace("my-tenant", "use", "my-namespace", new
BundlesData());
try {
- properties.deleteTenant("my-tenant");
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"my-tenant"));
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -552,7 +585,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check name validation
try {
- properties.createTenant("test&", tenantInfo);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"test&", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -560,7 +593,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check tenantInfo is null
try {
- properties.createTenant("tenant-config-is-null", null);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"tenant-config-is-null", null));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -571,7 +604,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
Set<String> blankClusters = Sets.newHashSet(blankCluster);
TenantInfo tenantWithEmptyCluster = new
TenantInfo(Sets.newHashSet("role1", "role2"), blankClusters);
try {
- properties.createTenant("tenant-config-is-empty",
tenantWithEmptyCluster);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"tenant-config-is-empty", tenantWithEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -582,18 +615,18 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
containBlankClusters.add(configClusterName);
TenantInfo tenantContainEmptyCluster = new
TenantInfo(Sets.newHashSet(), containBlankClusters);
try {
- properties.createTenant("tenant-config-contain-empty",
tenantContainEmptyCluster);
+ response = asynRequests(ctx -> properties.createTenant(ctx,
"tenant-config-contain-empty", tenantContainEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
- AsyncResponse response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, "my-tenant", "use",
"my-namespace", false, false);
+ AsyncResponse response2 = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response2, "my-tenant", "use",
"my-namespace", false, false);
ArgumentCaptor<Response> captor =
ArgumentCaptor.forClass(Response.class);
- verify(response, timeout(5000).times(1)).resume(captor.capture());
+ verify(response2, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(),
Status.NO_CONTENT.getStatusCode());
- properties.deleteTenant("my-tenant");
+ response = asynRequests(ctx -> properties.deleteTenant(ctx,
"my-tenant"));
}
@Test
@@ -654,9 +687,9 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
- mockZooKeeperGlobal.create(PulsarWebResource.path(POLICIES, property),
- ObjectMapperFactory.getThreadLocal().writeValueAsBytes(admin),
Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ ClusterData clusterData = new ClusterData(cluster);
+ clusters.createCluster(cluster, clusterData );
+ asynRequests(ctx -> properties.createTenant(ctx, property, admin));
// customized bandwidth for this namespace
double customizeBandwidth = 3000;
@@ -762,4 +795,85 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
}
+ static class TestAsyncResponse implements AsyncResponse {
+
+ Object response;
+ Throwable e;
+ CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public boolean resume(Object response) {
+ this.response = response;
+ latch.countDown();
+ return true;
+ }
+
+ @Override
+ public boolean resume(Throwable response) {
+ this.e = response;
+ latch.countDown();
+ return true;
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+
+ @Override
+ public boolean cancel(int retryAfter) {
+ return false;
+ }
+
+ @Override
+ public boolean cancel(Date retryAfter) {
+ return false;
+ }
+
+ @Override
+ public boolean isSuspended() {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public boolean setTimeout(long time, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public void setTimeoutHandler(TimeoutHandler handler) {
+
+ }
+
+ @Override
+ public Collection<Class<?>> register(Class<?> callback) {
+ return null;
+ }
+
+ @Override
+ public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback,
Class<?>... callbacks) {
+ return null;
+ }
+
+ @Override
+ public Collection<Class<?>> register(Object callback) {
+ return null;
+ }
+
+ @Override
+ public Map<Class<?>, Collection<Class<?>>> register(Object callback,
Object... callbacks) {
+ return null;
+ }
+
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 4ad5145..6f68f71 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -276,6 +276,7 @@ public abstract class MockedPulsarServiceBaseTest {
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
+ doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(pulsar).createConfigurationMetadataStore();
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new
NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index 1b0c7f3..ec9fde5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -129,7 +129,7 @@ public class OwnerShipForCurrentServerTestBase {
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
-
+ doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new
NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 5688f97..12dcac5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -57,12 +57,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -785,7 +785,11 @@ public class BrokerServiceTest extends BrokerTestBase {
@Test
public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
final String namespace = "prop/disableBundle";
- admin.namespaces().createNamespace(namespace);
+ try {
+ admin.namespaces().createNamespace(namespace);
+ } catch (PulsarAdminException.ConflictException e) {
+ // Ok.. (if test fails intermittently and namespace is already
created)
+ }
admin.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("test"));
// own namespace bundle
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 6a2cfaa..6a5e254 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -133,7 +133,7 @@ public class TransactionTestBase {
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
-
+ doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new
NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 04f9d81..b9eb85e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
@@ -57,6 +58,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.MockedBookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -67,9 +69,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
@@ -323,8 +328,6 @@ public class WebServiceTest {
}
}
- MockedZooKeeperClientFactoryImpl zkFactory = new
MockedZooKeeperClientFactoryImpl();
-
private void setupEnv(boolean enableFilter, String minApiVersion, boolean
allowUnversionedClients,
boolean enableTls, boolean enableAuth, boolean allowInsecure,
double rateLimit) throws Exception {
if (pulsar != null) {
@@ -363,8 +366,20 @@ public class WebServiceTest {
}
pulsar = spy(new PulsarService(config));
- doReturn(zkFactory).when(pulsar).getZooKeeperClientFactory();
- doReturn(new
ZKMetadataStore(MockZooKeeper.newInstance())).when(pulsar).createLocalMetadataStore();
+ // mock zk
+ MockZooKeeper mockZooKeeper =
MockedPulsarServiceBaseTest.createMockZooKeeper();
+ ZooKeeperClientFactory mockZooKeeperClientFactory = new
ZooKeeperClientFactory() {
+
+ @Override
+ public CompletableFuture<ZooKeeper> create(String serverList,
SessionType sessionType,
+ int zkSessionTimeoutMillis) {
+ // Always return the same instance (so that we don't loose
the mock ZK content on broker restart
+ return CompletableFuture.completedFuture(mockZooKeeper);
+ }
+ };
+
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+ doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
+ doReturn(new
ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
doReturn(new
MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
index fbd5f77..d9ffda0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
@@ -35,6 +35,10 @@ public class MetadataStoreException extends IOException {
super(msg);
}
+ public MetadataStoreException(String msg, Throwable t) {
+ super(msg, t);
+ }
+
/**
* Implementation is invalid
*/
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 6706108..8ad90df 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.JavaType;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
-
+import com.google.common.annotations.VisibleForTesting;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.List;
import java.util.Map;
@@ -34,7 +34,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
-
+import lombok.Getter;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -49,6 +49,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
private static final long CACHE_REFRESH_TIME_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+ @Getter
private final MetadataStore store;
private final MetadataSerde<T> serde;
@@ -227,6 +228,11 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
objCache.synchronous().invalidate(path);
}
+ @VisibleForTesting
+ public void invalidateAll() {
+ objCache.synchronous().invalidateAll();
+ }
+
@Override
public void accept(Notification t) {
String path = t.getPath();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 42293c7..a7ff703 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -211,6 +212,12 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
executor.awaitTermination(10, TimeUnit.SECONDS);
}
+ @VisibleForTesting
+ public void invalidateAll() {
+ childrenCache.synchronous().invalidateAll();
+ existsCache.synchronous().invalidateAll();
+ }
+
protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
diff --git
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
index 7a051ab..352db0b 100644
---
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
+++
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/MockedZooKeeperClientFactoryImpl.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.data.ACL;
public class MockedZooKeeperClientFactoryImpl implements
ZooKeeperClientFactory {
- Queue<MockZooKeeper> createdInstances = new ConcurrentLinkedQueue<>();
+ public Queue<MockZooKeeper> createdInstances = new
ConcurrentLinkedQueue<>();
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType
sessionType, int zkSessionTimeoutMillis) {