This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 71cfe3a Ensure the handling of PartitionMetadataRequest is async
end-to-end (#5307)
71cfe3a is described below
commit 71cfe3a6e0b8c21da714902a07722afa96036a17
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 8 08:51:37 2019 -0700
Ensure the handling of PartitionMetadataRequest is async end-to-end (#5307)
* Ensure the handling of PartitionMetadataRequest is async end-to-end
* Fixed z-node path
---
.../apache/pulsar/broker/ServiceConfiguration.java | 18 +++
.../broker/cache/ConfigurationCacheService.java | 12 +-
.../apache/pulsar/broker/admin/AdminResource.java | 147 ++-------------------
.../broker/admin/impl/PersistentTopicsBase.java | 7 +-
.../broker/cache/LocalZooKeeperCacheService.java | 4 +
.../pulsar/broker/namespace/NamespaceService.java | 67 ++++++----
.../pulsar/broker/service/BrokerService.java | 74 +++++++++++
.../apache/pulsar/broker/service/ServerCnx.java | 61 ++++-----
8 files changed, 189 insertions(+), 201 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5ed90ce..4f6acb0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1382,4 +1382,22 @@ public class ServiceConfiguration implements
PulsarConfiguration {
public Optional<Integer> getWebServicePortTls() {
return webServicePortTls;
}
+
+ public boolean isDefaultTopicTypePartitioned() {
+ return
TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType);
+ }
+
+ enum TopicType {
+ PARTITIONED("partitioned"),
+ NON_PARTITIONED("non-partitioned");
+ private String type;
+
+ TopicType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 4ac6cab..06781ff 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -67,6 +67,8 @@ public class ConfigurationCacheService {
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";
+ public static final String PARTITIONED_TOPICS_ROOT =
"/admin/partitioned-topics";
+
public ConfigurationCacheService(ZooKeeperCache cache) throws
PulsarServerException {
this(cache, null);
}
@@ -98,7 +100,7 @@ public class ConfigurationCacheService {
};
this.clustersListCache = new ZooKeeperChildrenCache(cache,
CLUSTERS_ROOT);
-
+
CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" +
configuredClusterName + "/" + FAILURE_DOMAIN;
if (isNotBlank(configuredClusterName)) {
createFailureDomainRoot(cache.getZooKeeper(),
CLUSTER_FAILURE_DOMAIN_ROOT);
@@ -114,7 +116,7 @@ public class ConfigurationCacheService {
}));
}
};
-
+
this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache)
{
@Override
public FailureDomain deserialize(String path, byte[] content)
throws Exception {
@@ -169,7 +171,7 @@ public class ConfigurationCacheService {
public ZooKeeperCache cache() {
return cache;
}
-
+
public ZooKeeperDataCache<TenantInfo> propertiesCache() {
return this.propertiesCache;
}
@@ -189,7 +191,7 @@ public class ConfigurationCacheService {
public ZooKeeperChildrenCache failureDomainListCache() {
return this.failureDomainListCache;
}
-
+
public ZooKeeper getZooKeeper() {
return this.cache.getZooKeeper();
}
@@ -197,7 +199,7 @@ public class ConfigurationCacheService {
public ZooKeeperDataCache<NamespaceIsolationPolicies>
namespaceIsolationPoliciesCache() {
return this.namespaceIsolationPoliciesCache;
}
-
+
public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
return this.failureDomainCache;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 39ff495..02ebd06 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,12 +18,13 @@
*/
package org.apache.pulsar.broker.admin;
-import com.fasterxml.jackson.core.JsonProcessingException;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
import java.net.MalformedURLException;
import java.net.URI;
import java.util.List;
@@ -43,13 +44,13 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
@@ -77,13 +78,9 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-
public abstract class AdminResource extends PulsarWebResource {
private static final Logger log =
LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH =
"/admin/flags/policies-readonly";
- private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
public static final String PARTITIONED_TOPIC_PATH_ZNODE =
"partitioned-topics";
protected ZooKeeper globalZk() {
@@ -521,12 +518,11 @@ public abstract class AdminResource extends
PulsarWebResource {
throw new RestException(e);
}
- String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), domain(), topicName.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata;
if (checkAllowAutoCreation) {
- partitionMetadata =
fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName);
+ partitionMetadata =
fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), topicName);
} else {
- partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+ partitionMetadata = fetchPartitionedTopicMetadata(pulsar(),
topicName);
}
if (log.isDebugEnabled()) {
@@ -536,9 +532,9 @@ public abstract class AdminResource extends
PulsarWebResource {
return partitionMetadata;
}
- protected static PartitionedTopicMetadata
fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
+ protected static PartitionedTopicMetadata
fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
try {
- return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
+ return
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
@@ -547,37 +543,10 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
- protected static CompletableFuture<PartitionedTopicMetadata>
fetchPartitionedTopicMetadataAsync(
- PulsarService pulsar, String path) {
- CompletableFuture<PartitionedTopicMetadata> metadataFuture = new
CompletableFuture<>();
- try {
- // gets the number of partitions from the zk cache
- pulsar.getGlobalZkCache().getDataAsync(path, new
Deserializer<PartitionedTopicMetadata>() {
- @Override
- public PartitionedTopicMetadata deserialize(String key, byte[]
content) throws Exception {
- return jsonMapper().readValue(content,
PartitionedTopicMetadata.class);
- }
- }).thenAccept(metadata -> {
- // if the partitioned topic is not found in zk, then the topic
is not partitioned
- if (metadata.isPresent()) {
- metadataFuture.complete(metadata.get());
- } else {
- metadataFuture.complete(new PartitionedTopicMetadata());
- }
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex);
- return null;
- });
- } catch (Exception e) {
- metadataFuture.completeExceptionally(e);
- }
- return metadataFuture;
- }
-
protected static PartitionedTopicMetadata
fetchPartitionedTopicMetadataCheckAllowAutoCreation(
- PulsarService pulsar, String path, TopicName topicName) {
+ PulsarService pulsar, TopicName topicName) {
try {
- return
fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path,
topicName)
+ return
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)
.get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
@@ -587,85 +556,7 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
- protected static CompletableFuture<PartitionedTopicMetadata>
fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
- PulsarService pulsar, String path, TopicName topicName) {
- CompletableFuture<PartitionedTopicMetadata> metadataFuture = new
CompletableFuture<>();
- try {
- boolean allowAutoTopicCreation =
pulsar.getConfiguration().isAllowAutoTopicCreation();
- String topicType =
pulsar.getConfiguration().getAllowAutoTopicCreationType();
- boolean topicExist;
- try {
- topicExist = pulsar.getNamespaceService()
- .getListOfTopics(topicName.getNamespaceObject(),
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
- .join()
- .contains(topicName.toString());
- } catch (Exception e) {
- log.warn("Unexpected error while getting list of topics.
topic={}. Error: {}",
- topicName, e.getMessage(), e);
- throw new RestException(e);
- }
- fetchPartitionedTopicMetadataAsync(pulsar,
path).whenCompleteAsync((metadata, ex) -> {
- if (ex != null) {
- metadataFuture.completeExceptionally(ex);
- // If topic is already exist, creating partitioned topic
is not allowed.
- } else if (metadata.partitions == 0 && !topicExist &&
allowAutoTopicCreation &&
- TopicType.PARTITIONED.toString().equals(topicType)) {
- createDefaultPartitionedTopicAsync(pulsar,
path).whenComplete((defaultMetadata, e) -> {
- if (e == null) {
- metadataFuture.complete(defaultMetadata);
- } else if (e instanceof KeeperException) {
- try {
-
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
- if (!pulsar.getGlobalZkCache().exists(path)){
- metadataFuture.completeExceptionally(e);
- return;
- }
- } catch (InterruptedException | KeeperException
exc) {
- metadataFuture.completeExceptionally(exc);
- return;
- }
- fetchPartitionedTopicMetadataAsync(pulsar,
path).whenComplete((metadata2, ex2) -> {
- if (ex2 != null) {
- metadataFuture.completeExceptionally(ex2);
- } else {
- metadataFuture.complete(metadata2);
- }
- });
- } else {
- metadataFuture.completeExceptionally(e);
- }
- });
- } else {
- metadataFuture.complete(metadata);
- }
- });
- } catch (Exception e) {
- metadataFuture.completeExceptionally(e);
- }
- return metadataFuture;
- }
-
- protected static CompletableFuture<PartitionedTopicMetadata>
createDefaultPartitionedTopicAsync(
- PulsarService pulsar, String path) {
- int defaultNumPartitions =
pulsar.getConfiguration().getDefaultNumPartitions();
- checkArgument(defaultNumPartitions > 0, "Default number of partitions
should be more than 0");
- PartitionedTopicMetadata configMetadata = new
PartitionedTopicMetadata(defaultNumPartitions);
- CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture =
new CompletableFuture<>();
- try {
- byte[] content = jsonMapper().writeValueAsBytes(configMetadata);
-
ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(),
path, content,
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- // we wait for the data to be synced in all quorums and the
observers
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
- partitionedTopicFuture.complete(configMetadata);
- } catch (JsonProcessingException | KeeperException |
InterruptedException e) {
- log.error("Failed to create default partitioned topic.", e);
- partitionedTopicFuture.completeExceptionally(e);
- }
- return partitionedTopicFuture;
- }
-
- protected void validateClusterExists(String cluster) {
+ protected void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster "
+ cluster + " does not exist.");
@@ -730,18 +621,4 @@ public abstract class AdminResource extends
PulsarWebResource {
partitionedTopics.sort(null);
return partitionedTopics;
}
-
- enum TopicType {
- PARTITIONED("partitioned"),
- NON_PARTITIONED("non-partitioned");
- private String type;
-
- TopicType(String type) {
- this.type = type;
- }
-
- public String toString() {
- return type;
- }
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ba5ce25..0545274 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -115,7 +115,7 @@ import org.slf4j.LoggerFactory;
public class PersistentTopicsBase extends AdminResource {
private static final Logger log =
LoggerFactory.getLogger(PersistentTopicsBase.class);
- protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
+ public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX =
"Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX =
Version.forIntegers(1, 21);
@@ -1670,7 +1670,8 @@ public class PersistentTopicsBase extends AdminResource {
// serve/redirect request else fail partitioned-metadata-request
so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject())
- .thenCompose(res ->
fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path,
topicName))
+ .thenCompose(res -> pulsar.getBrokerService()
+
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for
topic {} is {}", clientAppId, topicName,
@@ -1786,7 +1787,7 @@ public class PersistentTopicsBase extends AdminResource {
private CompletableFuture<Void> createSubscriptions(TopicName topicName,
int numPartitions) {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
topicName.getPersistenceNamingEncoding());
CompletableFuture<Void> result = new CompletableFuture<>();
- fetchPartitionedTopicMetadataAsync(pulsar(),
path).thenAccept(partitionMetadata -> {
+
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
-> {
if (partitionMetadata.partitions <= 1) {
result.completeExceptionally(new
RestException(Status.CONFLICT, "Topic is not partitioned topic"));
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 4b28cad..625f805 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -247,4 +247,8 @@ public class LocalZooKeeperCacheService {
public ZooKeeperChildrenCache managedLedgerListCache() {
return this.managedLedgerListCache;
}
+
+ public CompletableFuture<Boolean> managedLedgerExists(String
persistentPath) {
+ return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath,
cache);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 83e3ade..7070e54 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
@@ -873,6 +874,17 @@ public class NamespaceService {
});
}
+ public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
+ if (topic.isPersistent()) {
+ return pulsar.getLocalZkCacheService()
+ .managedLedgerExists(topic.getPersistenceNamingEncoding());
+ } else {
+ return pulsar.getBrokerService()
+ .getTopicIfExists(topic.toString())
+ .thenApply(optTopic -> optTopic.isPresent());
+ }
+ }
+
public CompletableFuture<List<String>> getListOfTopics(NamespaceName
namespaceName, Mode mode) {
switch (mode) {
case ALL:
@@ -905,37 +917,36 @@ public class NamespaceService {
}
public CompletableFuture<List<String>>
getListOfNonPersistentTopics(NamespaceName namespaceName) {
- ClusterData peerClusterData;
- try {
- peerClusterData =
PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
-
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- throw new RuntimeException("Failed to contact peer replication
cluster.", e);
- }
-
- // if peer-cluster-data is present it means namespace is owned by that
peer-cluster and request should be
- // redirect to the peer-cluster
- if (peerClusterData != null) {
- return getNonPersistentTopicsFromPeerCluster(peerClusterData,
namespaceName);
- }
- // Non-persistent topics don't have managed ledgers so we have to
retrieve them from local cache.
- List<String> topics = Lists.newArrayList();
- synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
- if
(pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString()))
{
-
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
- .forEach(bundle -> {
- bundle.forEach((topicName, topic) -> {
- if (topic instanceof NonPersistentTopic &&
((NonPersistentTopic)topic).isActive()) {
- topics.add(topicName);
+ return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar,
namespaceName)
+ .thenCompose(peerClusterData -> {
+ // if peer-cluster-data is present it means namespace is
owned by that peer-cluster and request
+ // should be redirect to the peer-cluster
+ if (peerClusterData != null) {
+ return
getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
+ } else {
+ // Non-persistent topics don't have managed ledgers so
we have to retrieve them from local
+ // cache.
+ List<String> topics = Lists.newArrayList();
+ synchronized
(pulsar.getBrokerService().getMultiLayerTopicMap()) {
+ if
(pulsar.getBrokerService().getMultiLayerTopicMap()
+ .containsKey(namespaceName.toString())) {
+
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
+ .forEach(bundle -> {
+ bundle.forEach((topicName, topic)
-> {
+ if (topic instanceof
NonPersistentTopic
+ &&
((NonPersistentTopic) topic).isActive()) {
+ topics.add(topicName);
+ }
+ });
+ });
}
- });
- });
- }
- }
+ }
- topics.sort(null);
- return CompletableFuture.completedFuture(topics);
+ topics.sort(null);
+ return CompletableFuture.completedFuture(topics);
+ }
+ });
}
private CompletableFuture<List<String>>
getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fa3c14e..490a691 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -69,6 +69,7 @@ import java.util.function.Predicate;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -77,6 +78,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -84,8 +86,10 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -119,6 +123,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -139,6 +144,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -1611,6 +1617,74 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
+ public CompletableFuture<PartitionedTopicMetadata>
fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(TopicName topicName) {
+ return pulsar.getNamespaceService().checkTopicExists(topicName)
+ .thenCompose(topicExists -> {
+ return fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose(metadata -> {
+ // If topic is already exist, creating
partitioned topic is not allowed.
+ if (metadata.partitions == 0
+ && !topicExists
+ &&
pulsar.getConfiguration().isAllowAutoTopicCreation()
+ &&
pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
+ return
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+ } else {
+ return
CompletableFuture.completedFuture(metadata);
+ }
+ });
+ });
+ }
+
+ @SuppressWarnings("deprecation")
+ private CompletableFuture<PartitionedTopicMetadata>
createDefaultPartitionedTopicAsync(TopicName topicName) {
+ int defaultNumPartitions =
pulsar.getConfiguration().getDefaultNumPartitions();
+ checkArgument(defaultNumPartitions > 0, "Default number of partitions
should be more than 0");
+
+ PartitionedTopicMetadata configMetadata = new
PartitionedTopicMetadata(defaultNumPartitions);
+ CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture =
new CompletableFuture<>();
+
+ try {
+ byte[] content =
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata);
+
+
ZkUtils.asyncCreateFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(),
+ partitionedTopicPath(topicName), content,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc,
path1, ctx, name) -> {
+ if (rc == KeeperException.Code.OK.intValue()) {
+ // we wait for the data to be synced in all
quorums and the observers
+ executor().schedule(
+ SafeRunnable.safeRun(() ->
partitionedTopicFuture.complete(configMetadata)),
+
PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS,
TimeUnit.MILLISECONDS);
+ } else {
+
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
+ }
+ }, null);
+
+ } catch (Exception e) {
+ log.error("Failed to create default partitioned topic.", e);
+ return FutureUtil.failedFuture(e);
+ }
+
+ return partitionedTopicFuture;
+ }
+
+ public CompletableFuture<PartitionedTopicMetadata>
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
+ // gets the number of partitions from the zk cache
+ return
pulsar.getGlobalZkCache().getDataAsync(partitionedTopicPath(topicName), (key,
content) -> {
+ return ObjectMapperFactory.getThreadLocal().readValue(content,
PartitionedTopicMetadata.class);
+ }).thenApply(metadata -> {
+ // if the partitioned topic is not found in zk, then the topic is
not partitioned
+ return metadata.orElseGet(() -> new PartitionedTopicMetadata());
+ });
+ }
+
+ private static String partitionedTopicPath(TopicName topicName) {
+ return String.format("%s/%s/%s/%s",
+ ConfigurationCacheService.PARTITIONED_TOPICS_ROOT,
+ topicName.getNamespace(),
+ topicName.getDomain(),
+ topicName.getEncodedLocalName());
+ }
+
public OrderedExecutor getTopicOrderedExecutor() {
return topicOrderedExecutor;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7a3a3d5..005f615 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -340,40 +340,41 @@ public class ServerCnx extends PulsarHandler {
}
String finalOriginalPrincipal = originalPrincipal;
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
- if (isProxyAuthorized) {
+ if (isProxyAuthorized) {
getPartitionedTopicMetadata(getBrokerService().pulsar(),
- authRole,
finalOriginalPrincipal, authenticationData,
+ authRole, finalOriginalPrincipal,
authenticationData,
topicName).handle((metadata, ex) -> {
- if (ex == null) {
- int partitions = metadata.partitions;
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
+ if (ex == null) {
+ int partitions = metadata.partitions;
+
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
+ } else {
+ if (ex instanceof PulsarClientException) {
+ log.warn("Failed to authorize {} at
[{}] on topic {} : {}", getRole(),
+ remoteAddress, topicName,
ex.getMessage());
+
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
+
ServerError.AuthorizationError, ex.getMessage(), requestId));
} else {
- if (ex instanceof
PulsarClientException) {
- log.warn("Failed to authorize {}
at [{}] on topic {} : {}", getRole(),
- remoteAddress, topicName,
ex.getMessage());
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
-
ServerError.AuthorizationError, ex.getMessage(), requestId));
- } else {
- log.warn("Failed to get
Partitioned Metadata [{}] {}: {}", remoteAddress,
- topicName,
ex.getMessage(), ex);
- ServerError error = (ex instanceof
RestException)
- && ((RestException)
ex).getResponse().getStatus() < 500
- ?
ServerError.MetadataError : ServerError.ServiceNotReady;
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
- ex.getMessage(),
requestId));
- }
+ log.warn("Failed to get Partitioned
Metadata [{}] {}: {}", remoteAddress,
+ topicName, ex.getMessage(),
ex);
+ ServerError error = (ex instanceof
RestException)
+ && ((RestException)
ex).getResponse().getStatus() < 500
+ ?
ServerError.MetadataError
+ :
ServerError.ServiceNotReady;
+
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
+ ex.getMessage(), requestId));
}
- lookupSemaphore.release();
- return null;
- });
- } else {
- final String msg = "Proxy Client is not authorized to
Get Partition Metadata";
- log.warn("[{}] {} with role {} on topic {}",
remoteAddress, msg, authRole, topicName);
- ctx.writeAndFlush(
-
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
- lookupSemaphore.release();
- }
- return null;
+ }
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ final String msg = "Proxy Client is not authorized to Get
Partition Metadata";
+ log.warn("[{}] {} with role {} on topic {}",
remoteAddress, msg, authRole, topicName);
+ ctx.writeAndFlush(
+
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
+ lookupSemaphore.release();
+ }
+ return null;
}).exceptionally(ex -> {
final String msg = "Exception occured while trying to
authorize get Partition Metadata";
log.warn("[{}] {} with role {} on topic {}", remoteAddress,
msg, authRole, topicName);