This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9aed73653e1 [fix] [broker] response not-found error if topic does not
exist when calling getPartitionedTopicMetadata (#22838)
9aed73653e1 is described below
commit 9aed73653e1f706e3517072cce4a352d0838f8d7
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jun 17 23:39:08 2024 +0800
[fix] [broker] response not-found error if topic does not exist when
calling getPartitionedTopicMetadata (#22838)
---
.../broker/admin/impl/PersistentTopicsBase.java | 21 +-
.../broker/admin/v2/NonPersistentTopics.java | 16 +-
.../pulsar/broker/lookup/TopicLookupBase.java | 22 +-
.../pulsar/broker/namespace/NamespaceService.java | 101 +++-
.../pulsar/broker/namespace/TopicExistsInfo.java | 82 +++
.../pulsar/broker/service/BrokerService.java | 117 ++--
.../apache/pulsar/broker/service/ServerCnx.java | 81 +--
.../admin/GetPartitionMetadataMultiBrokerTest.java | 222 ++++++++
.../broker/admin/GetPartitionMetadataTest.java | 608 +++++++++++----------
.../org/apache/pulsar/broker/admin/TopicsTest.java | 13 +-
.../broker/lookup/http/HttpTopicLookupv2Test.java | 19 +-
.../broker/namespace/NamespaceServiceTest.java | 7 +-
.../apache/pulsar/broker/service/TopicGCTest.java | 2 +
.../pulsar/client/impl/ConsumerBuilderImpl.java | 37 +-
14 files changed, 899 insertions(+), 449 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2f2a899950a..beb8ecc8d79 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -561,13 +561,13 @@ public class PersistentTopicsBase extends AdminResource {
// is a non-partitioned topic so we shouldn't check if
the topic exists.
return
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(brokerAllowAutoTopicCreation -> {
- if (checkAllowAutoCreation) {
+ if (checkAllowAutoCreation &&
brokerAllowAutoTopicCreation) {
// Whether it exists or not, auto create a
non-partitioned topic by client.
return
CompletableFuture.completedFuture(metadata);
} else {
// If it does not exist, response a Not Found
error.
// Otherwise, response a non-partitioned
metadata.
- return
internalCheckTopicExists(topicName).thenApply(__ -> metadata);
+ return
internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata);
}
});
}
@@ -715,6 +715,17 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalCheckTopicExists(TopicName
topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
+ .thenAccept(info -> {
+ boolean exists = info.isExists();
+ info.recycle();
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
+ }
+ });
+ }
+
+ protected CompletableFuture<Void>
internalCheckNonPartitionedTopicExists(TopicName topicName) {
+ return
pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
.thenAccept(exist -> {
if (!exist) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
@@ -5338,8 +5349,10 @@ public class PersistentTopicsBase extends AdminResource {
"Only persistent topic can be set as shadow
topic"));
}
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
- .thenAccept(isExists -> {
- if (!isExists) {
+ .thenAccept(info -> {
+ boolean exists = info.isExists();
+ info.recycle();
+ if (!exists) {
throw new
RestException(Status.PRECONDITION_FAILED,
"Shadow topic [" + shadowTopic + "]
not exists.");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 5a7ea1b7632..9f58aa4ca9d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -98,8 +98,20 @@ public class NonPersistentTopics extends PersistentTopics {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "Is check configuration required to
automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false")
boolean checkAllowAutoCreation) {
- super.getPartitionedMetadata(asyncResponse, tenant, namespace,
encodedTopic, authoritative,
- checkAllowAutoCreation);
+ validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicOwnershipAsync(topicName,
authoritative).whenComplete((__, ex) -> {
+ if (ex != null) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (isNot307And404Exception(actEx)) {
+ log.error("[{}] Failed to get internal stats for topic
{}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, actEx);
+ } else {
+ // "super.getPartitionedMetadata" will handle error itself.
+ super.getPartitionedMetadata(asyncResponse, tenant, namespace,
encodedTopic, authoritative,
+ checkAllowAutoCreation);
+ }
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 7b2c7774148..9a05c3d992a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -67,16 +67,22 @@ public class TopicLookupBase extends PulsarWebResource {
.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
.thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.LOOKUP, null))
.thenCompose(__ -> {
+ // Case-1: Non-persistent topic.
// Currently, it's hard to check the
non-persistent-non-partitioned topic, because it only exists
// in the broker, it doesn't have metadata. If the topic
is non-persistent and non-partitioned,
- // we'll return the true flag.
- CompletableFuture<Boolean> existFuture =
(!topicName.isPersistent() && !topicName.isPartitioned())
- ? CompletableFuture.completedFuture(true)
- :
pulsar().getNamespaceService().checkTopicExists(topicName)
- .thenCompose(exists -> exists ?
CompletableFuture.completedFuture(true)
- :
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));
-
- return existFuture;
+ // we'll return the true flag. So either it is a
partitioned topic or not, the result will be true.
+ if (!topicName.isPersistent()) {
+ return CompletableFuture.completedFuture(true);
+ }
+ // Case-2: Persistent topic.
+ return
pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
+ boolean exists = info.isExists();
+ info.recycle();
+ if (exists) {
+ return CompletableFuture.completedFuture(true);
+ }
+ return
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName);
+ });
})
.thenCompose(exist -> {
if (!exist) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 80559b736c6..9df2b09204c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -51,6 +51,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
@@ -72,6 +73,7 @@ import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -123,6 +125,7 @@ import org.slf4j.LoggerFactory;
*
* @see org.apache.pulsar.broker.PulsarService
*/
+@Slf4j
public class NamespaceService implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(NamespaceService.class);
@@ -1400,40 +1403,86 @@ public class NamespaceService implements AutoCloseable {
});
}
- public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
- CompletableFuture<Boolean> future;
- // If the topic is persistent and the name includes `-partition-`,
find the topic from the managed/ledger.
- if (topic.isPersistent() && topic.isPartitioned()) {
- future =
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+ /***
+ * Check topic exists( partitioned or non-partitioned ).
+ */
+ public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName
topic) {
+ return pulsar.getBrokerService()
+
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
+ .thenCompose(metadata -> {
+ if (metadata.partitions > 0) {
+ return CompletableFuture.completedFuture(
+
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
+ }
+ return checkNonPartitionedTopicExists(topic)
+ .thenApply(b -> b ?
TopicExistsInfo.newNonPartitionedTopicExists()
+ : TopicExistsInfo.newTopicNotExists());
+ });
+ }
+
+ /***
+ * Check non-partitioned topic exists.
+ */
+ public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName
topic) {
+ if (topic.isPersistent()) {
+ return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
- future = CompletableFuture.completedFuture(false);
+ return
checkNonPersistentNonPartitionedTopicExists(topic.toString());
}
+ }
- return future.thenCompose(found -> {
- if (found != null && found) {
- return CompletableFuture.completedFuture(true);
+ /**
+ * Regarding non-persistent topic, we do not know whether it exists or
not. Redirect the request to the ownership
+ * broker of this topic. HTTP API has implemented the mechanism that
redirect to ownership broker, so just call
+ * HTTP API here.
+ */
+ public CompletableFuture<Boolean>
checkNonPersistentNonPartitionedTopicExists(String topic) {
+ TopicName topicName = TopicName.get(topic);
+ // "non-partitioned & non-persistent" topics only exist on the owner
broker.
+ return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned
-> {
+ // The current broker is the owner.
+ if (isOwned) {
+ CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService()
+ .getTopic(topic, false);
+ if (nonPersistentTopicFuture != null) {
+ return
nonPersistentTopicFuture.thenApply(Optional::isPresent);
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
}
- return pulsar.getBrokerService()
-
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
- .thenCompose(metadata -> {
- if (metadata.partitions > 0) {
- return CompletableFuture.completedFuture(true);
- }
-
- if (topic.isPersistent()) {
- return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
- } else {
- // The non-partitioned non-persistent topic only
exist in the broker topics.
- CompletableFuture<Optional<Topic>>
nonPersistentTopicFuture =
-
pulsar.getBrokerService().getTopics().get(topic.toString());
- if (nonPersistentTopicFuture == null) {
+ // Forward to the owner broker.
+ PulsarClientImpl pulsarClient;
+ try {
+ pulsarClient = (PulsarClientImpl) pulsar.getClient();
+ } catch (Exception ex) {
+ // This error will never occur.
+ log.error("{} Failed to get partition metadata due to create
internal admin client fails", topic, ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ LookupOptions lookupOptions =
LookupOptions.builder().readOnly(false).authoritative(true).build();
+ return getBrokerServiceUrlAsync(TopicName.get(topic),
lookupOptions)
+ .thenCompose(lookupResult -> {
+ if (!lookupResult.isPresent()) {
+ log.error("{} Failed to get partition metadata due can
not find the owner broker", topic);
+ return FutureUtil.failedFuture(new
ServiceUnitNotReadyException(
+ "No broker was available to own " +
topicName));
+ }
+ return
pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
+ .getPartitionedTopicMetadata(topicName, false)
+ .thenApply(metadata -> true)
+ .exceptionallyCompose(ex -> {
+ Throwable actEx =
FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof
PulsarClientException.NotFoundException
+ || actEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || actEx instanceof
PulsarAdminException.NotFoundException) {
return
CompletableFuture.completedFuture(false);
} else {
- return
nonPersistentTopicFuture.thenApply(Optional::isPresent);
+ log.error("{} Failed to get partition metadata
due to redirecting fails", topic, ex);
+ return CompletableFuture.failedFuture(ex);
}
- }
- });
+ });
+ });
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
new file mode 100644
index 00000000000..1c3f117719e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import io.netty.util.Recycler;
+import lombok.Getter;
+import org.apache.pulsar.common.policies.data.TopicType;
+
+public class TopicExistsInfo {
+
+ private static final Recycler<TopicExistsInfo> RECYCLER = new Recycler<>()
{
+ @Override
+ protected TopicExistsInfo newObject(Handle<TopicExistsInfo> handle) {
+ return new TopicExistsInfo(handle);
+ }
+ };
+
+ private static TopicExistsInfo nonPartitionedExists = new
TopicExistsInfo(true, 0);
+
+ private static TopicExistsInfo notExists = new TopicExistsInfo(false, 0);
+
+ public static TopicExistsInfo newPartitionedTopicExists(Integer
partitions){
+ TopicExistsInfo info = RECYCLER.get();
+ info.exists = true;
+ info.partitions = partitions.intValue();
+ return info;
+ }
+
+ public static TopicExistsInfo newNonPartitionedTopicExists(){
+ return nonPartitionedExists;
+ }
+
+ public static TopicExistsInfo newTopicNotExists(){
+ return notExists;
+ }
+
+ private final Recycler.Handle<TopicExistsInfo> handle;
+
+ @Getter
+ private int partitions;
+ @Getter
+ private boolean exists;
+
+ private TopicExistsInfo(Recycler.Handle<TopicExistsInfo> handle) {
+ this.handle = handle;
+ }
+
+ private TopicExistsInfo(boolean exists, int partitions) {
+ this.handle = null;
+ this.partitions = partitions;
+ this.exists = exists;
+ }
+
+ public void recycle() {
+ if (this == notExists || this == nonPartitionedExists || this.handle
== null) {
+ return;
+ }
+ this.exists = false;
+ this.partitions = 0;
+ this.handle.recycle(this);
+ }
+
+ public TopicType getTopicType() {
+ return this.partitions > 0 ? TopicType.PARTITIONED :
TopicType.NON_PARTITIONED;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 82d7fad3874..6ecd0a1ba60 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3178,65 +3178,66 @@ public class BrokerService implements Closeable {
if (pulsar.getNamespaceService() == null) {
return FutureUtil.failedFuture(new NamingException("namespace
service is not ready"));
}
- return
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
- .thenCompose(policies ->
pulsar.getNamespaceService().checkTopicExists(topicName)
- .thenCompose(topicExists ->
fetchPartitionedTopicMetadataAsync(topicName)
- .thenCompose(metadata -> {
- CompletableFuture<PartitionedTopicMetadata>
future = new CompletableFuture<>();
-
- // There are a couple of potentially blocking
calls, which we cannot make from the
- // MetadataStore callback thread.
- pulsar.getExecutor().execute(() -> {
- // If topic is already exist, creating
partitioned topic is not allowed.
-
- if (metadata.partitions == 0
- && !topicExists
- && !topicName.isPartitioned()
- && pulsar.getBrokerService()
-
.isDefaultTopicTypePartitioned(topicName, policies)) {
-
isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> {
- if (allowed) {
- pulsar.getBrokerService()
-
.createDefaultPartitionedTopicAsync(topicName, policies)
- .thenAccept(md ->
future.complete(md))
- .exceptionally(ex -> {
- if (ex.getCause()
- instanceof
MetadataStoreException
-
.AlreadyExistsException) {
- log.info("[{}]
The partitioned topic is already"
- + "
created, try to refresh the cache and read"
- + "
again.", topicName);
- // The
partitioned topic might be created concurrently
-
fetchPartitionedTopicMetadataAsync(topicName, true)
-
.whenComplete((metadata2, ex2) -> {
- if
(ex2 == null) {
-
future.complete(metadata2);
- }
else {
-
future.completeExceptionally(ex2);
- }
- });
- } else {
-
log.error("[{}] operation of creating partitioned"
- + "
topic metadata failed",
-
topicName, ex);
-
future.completeExceptionally(ex);
- }
- return null;
- });
- } else {
- future.complete(metadata);
- }
- }).exceptionally(ex -> {
- future.completeExceptionally(ex);
- return null;
- });
- } else {
- future.complete(metadata);
- }
- });
+ return
pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo
-> {
+ final boolean topicExists = topicExistsInfo.isExists();
+ final TopicType topicType = topicExistsInfo.getTopicType();
+ final Integer partitions = topicExistsInfo.getPartitions();
+ topicExistsInfo.recycle();
+
+ // Topic exists.
+ if (topicExists) {
+ if (topicType.equals(TopicType.PARTITIONED)) {
+ return CompletableFuture.completedFuture(new
PartitionedTopicMetadata(partitions));
+ }
+ return CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0));
+ }
- return future;
- })));
+ // Try created if allowed to create a partitioned topic
automatically.
+ return
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
+ .thenComposeAsync(policies -> {
+ return isAllowAutoTopicCreationAsync(topicName,
policies).thenComposeAsync(allowed -> {
+ // Not Allow auto-creation.
+ if (!allowed) {
+ // Do not change the original behavior, or default
return a non-partitioned topic.
+ return CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0));
+ }
+
+ // Allow auto create non-partitioned topic.
+ boolean autoCreatePartitionedTopic =
pulsar.getBrokerService()
+ .isDefaultTopicTypePartitioned(topicName,
policies);
+ if (!autoCreatePartitionedTopic ||
topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0));
+ }
+
+ // Create partitioned metadata.
+ return
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName,
policies)
+ .exceptionallyCompose(ex -> {
+ // The partitioned topic might be created
concurrently.
+ if (ex.getCause() instanceof
MetadataStoreException.AlreadyExistsException) {
+ log.info("[{}] The partitioned topic is
already created, try to refresh the cache"
+ + " and read again.", topicName);
+
CompletableFuture<PartitionedTopicMetadata> recheckFuture =
+
fetchPartitionedTopicMetadataAsync(topicName, true);
+ recheckFuture.exceptionally(ex2 -> {
+ // Just for printing a log if error
occurs.
+ log.error("[{}] Fetch partitioned
topic metadata failed", topicName, ex);
+ return null;
+ });
+ return recheckFuture;
+ } else {
+ log.error("[{}] operation of creating
partitioned topic metadata failed",
+ topicName, ex);
+ return CompletableFuture.failedFuture(ex);
+ }
+ });
+ }, pulsar.getExecutor()).exceptionallyCompose(ex -> {
+ log.error("[{}] operation of get partitioned metadata
failed due to calling"
+ + " isAllowAutoTopicCreationAsync
failed",
+ topicName, ex);
+ return CompletableFuture.failedFuture(ex);
+ });
+ }, pulsar.getExecutor());
+ }, pulsar.getExecutor());
}
@SuppressWarnings("deprecation")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6901097bbbb..b184f794949 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -84,8 +84,7 @@ import org.apache.pulsar.broker.limiter.ConnectionController;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -161,6 +160,7 @@ import
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.CommandUtils;
@@ -614,58 +614,33 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (isAuthorized) {
// Get if exists, respond not found error if not exists.
getBrokerService().isAllowAutoTopicCreationAsync(topicName).thenAccept(brokerAllowAutoCreate
-> {
- boolean autoCreateIfNotExist =
partitionMetadata.isMetadataAutoCreationEnabled();
+ boolean autoCreateIfNotExist =
partitionMetadata.isMetadataAutoCreationEnabled()
+ && brokerAllowAutoCreate;
if (!autoCreateIfNotExist) {
- final NamespaceResources namespaceResources =
getBrokerService().pulsar()
-
.getPulsarResources().getNamespaceResources();
- final TopicResources topicResources =
getBrokerService().pulsar().getPulsarResources()
- .getTopicResources();
- namespaceResources.getPartitionedTopicResources()
- .getPartitionedTopicMetadataAsync(topicName,
false)
- .handle((metadata, getMetadataEx) -> {
- if (getMetadataEx != null) {
- log.error("{} {} Failed to get
partition metadata", topicName,
- ServerCnx.this.toString(),
getMetadataEx);
- writeAndFlush(
-
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
- "Failed to get
partition metadata",
- requestId));
- } else if (metadata.isPresent()) {
-
commandSender.sendPartitionMetadataResponse(metadata.get().partitions,
- requestId);
- } else if (topicName.isPersistent()) {
-
topicResources.persistentTopicExists(topicName).thenAccept(exists -> {
- if (exists) {
-
commandSender.sendPartitionMetadataResponse(0, requestId);
- return;
- }
-
writeAndFlush(Commands.newPartitionMetadataResponse(
- ServerError.TopicNotFound,
"", requestId));
- }).exceptionally(ex -> {
- log.error("{} {} Failed to get
partition metadata", topicName,
- ServerCnx.this.toString(),
ex);
- writeAndFlush(
-
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
- "Failed to check
partition metadata",
- requestId));
- return null;
- });
- } else {
- // Regarding non-persistent topic, we
do not know whether it exists or not.
- // Just return a non-partitioned
metadata if partitioned metadata does not
- // exist.
- // Broker will respond a not found
error when doing subscribing or producing if
- // broker not allow to auto create
topics.
-
commandSender.sendPartitionMetadataResponse(0, requestId);
- }
- return null;
- }).whenComplete((ignore, ignoreEx) -> {
- lookupSemaphore.release();
- if (ignoreEx != null) {
- log.error("{} {} Failed to handle
partition metadata request", topicName,
- ServerCnx.this.toString(),
ignoreEx);
- }
- });
+ NamespaceService namespaceService =
getBrokerService().getPulsar().getNamespaceService();
+
namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> {
+ lookupSemaphore.release();
+ if (!topicExistsInfo.isExists()) {
+
writeAndFlush(Commands.newPartitionMetadataResponse(
+ ServerError.TopicNotFound, "",
requestId));
+ } else if
(topicExistsInfo.getTopicType().equals(TopicType.PARTITIONED)) {
+
commandSender.sendPartitionMetadataResponse(topicExistsInfo.getPartitions(),
+ requestId);
+ } else {
+
commandSender.sendPartitionMetadataResponse(0, requestId);
+ }
+ // release resources.
+ topicExistsInfo.recycle();
+ }).exceptionally(ex -> {
+ lookupSemaphore.release();
+ log.error("{} {} Failed to get partition
metadata", topicName,
+ ServerCnx.this.toString(), ex);
+ writeAndFlush(
+
Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+ "Failed to get partition
metadata",
+ requestId));
+ return null;
+ });
} else {
// Get if exists, create a new one if not exists.
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
new file mode 100644
index 00000000000..28cf91ee165
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.net.URL;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+@Slf4j
+public class GetPartitionMetadataMultiBrokerTest extends
GetPartitionMetadataTest {
+
+ private PulsarService pulsar2;
+ private URL url2;
+ private PulsarAdmin admin2;
+ private PulsarClientImpl clientWithHttpLookup2;
+ private PulsarClientImpl clientWitBinaryLookup2;
+
+ @BeforeClass(alwaysRun = true)
+ protected void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Override
+ protected void cleanupBrokers() throws Exception {
+ // Cleanup broker2.
+ if (clientWithHttpLookup2 != null) {
+ clientWithHttpLookup2.close();
+ clientWithHttpLookup2 = null;
+ }
+ if (clientWitBinaryLookup2 != null) {
+ clientWitBinaryLookup2.close();
+ clientWitBinaryLookup2 = null;
+ }
+ if (admin2 != null) {
+ admin2.close();
+ admin2 = null;
+ }
+ if (pulsar2 != null) {
+ pulsar2.close();
+ pulsar2 = null;
+ }
+
+ // Super cleanup.
+ super.cleanupBrokers();
+ }
+
+ @Override
+ protected void setupBrokers() throws Exception {
+ super.setupBrokers();
+ doInitConf();
+ pulsar2 = new PulsarService(conf);
+ pulsar2.start();
+ url2 = new URL(pulsar2.getWebServiceAddress());
+ admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+ clientWithHttpLookup2 =
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl(pulsar2.getWebServiceAddress()).build();
+ clientWitBinaryLookup2 =
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl(pulsar2.getBrokerServiceUrl()).build();
+ }
+
+ @Override
+ protected PulsarClientImpl[] getClientsToTest() {
+ return new PulsarClientImpl[] {clientWithHttpLookup1,
clientWitBinaryLookup1,
+ clientWithHttpLookup2, clientWitBinaryLookup2};
+ }
+
+ protected PulsarClientImpl[] getClientsToTest(boolean isUsingHttpLookup) {
+ if (isUsingHttpLookup) {
+ return new PulsarClientImpl[]{clientWithHttpLookup1,
clientWithHttpLookup2};
+ } else {
+ return new PulsarClientImpl[]{clientWitBinaryLookup1,
clientWitBinaryLookup2};
+ }
+ }
+
+ @Override
+ protected int getLookupRequestPermits() {
+ return
pulsar1.getBrokerService().getLookupRequestSemaphore().availablePermits()
+ +
pulsar2.getBrokerService().getLookupRequestSemaphore().availablePermits();
+ }
+
+ protected void verifyPartitionsNeverCreated(String topicNameStr) throws
Exception {
+ TopicName topicName = TopicName.get(topicNameStr);
+ try {
+ List<String> topicList = admin1.topics().getList("public/default");
+ for (int i = 0; i < 3; i++) {
+ assertFalse(topicList.contains(topicName.getPartition(i)));
+ }
+ } catch (Exception ex) {
+ // If the namespace bundle has not been loaded yet, it means no
non-persistent topic was created. So
+ // this behavior is also correct.
+ // This error is not expected, a seperated PR is needed to fix
this issue.
+ assertTrue(ex.getMessage().contains("Failed to find ownership
for"));
+ }
+ }
+
+ protected void verifyNonPartitionedTopicNeverCreated(String topicNameStr)
throws Exception {
+ TopicName topicName = TopicName.get(topicNameStr);
+ try {
+ List<String> topicList = admin1.topics().getList("public/default");
+
assertFalse(topicList.contains(topicName.getPartitionedTopicName()));
+ } catch (Exception ex) {
+ // If the namespace bundle has not been loaded yet, it means no
non-persistent topic was created. So
+ // this behavior is also correct.
+ // This error is not expected, a seperated PR is needed to fix
this issue.
+ assertTrue(ex.getMessage().contains("Failed to find ownership
for"));
+ }
+ }
+
+ protected void modifyTopicAutoCreation(boolean allowAutoTopicCreation,
+ TopicType
allowAutoTopicCreationType,
+ int defaultNumPartitions) throws
Exception {
+ doModifyTopicAutoCreation(admin1, pulsar1, allowAutoTopicCreation,
allowAutoTopicCreationType,
+ defaultNumPartitions);
+ doModifyTopicAutoCreation(admin2, pulsar2, allowAutoTopicCreation,
allowAutoTopicCreationType,
+ defaultNumPartitions);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "topicDomains")
+ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain
topicDomain) throws Exception {
+ super.testAutoCreatingMetadataWhenCallingOldAPI(topicDomain);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "autoCreationParamsAll", enabled = false)
+ public void testGetMetadataIfNonPartitionedTopicExists(boolean
configAllowAutoTopicCreation,
+ boolean
paramMetadataAutoCreationEnabled,
+ boolean
isUsingHttpLookup,
+ TopicDomain
topicDomain) throws Exception {
+
super.testGetMetadataIfNonPartitionedTopicExists(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled,
+ isUsingHttpLookup, topicDomain);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "autoCreationParamsAll")
+ public void testGetMetadataIfPartitionedTopicExists(boolean
configAllowAutoTopicCreation,
+ boolean
paramMetadataAutoCreationEnabled,
+ boolean
isUsingHttpLookup,
+ TopicDomain
topicDomain) throws Exception {
+
super.testGetMetadataIfNonPartitionedTopicExists(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled,
+ isUsingHttpLookup, topicDomain);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "clients")
+ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup,
TopicDomain topicDomain) throws Exception {
+ super.testAutoCreatePartitionedTopic(isUsingHttpLookup, topicDomain);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "clients")
+ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup,
TopicDomain topicDomain) throws Exception {
+ super.testAutoCreateNonPartitionedTopic(isUsingHttpLookup,
topicDomain);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "autoCreationParamsNotAllow")
+ public void testGetMetadataIfNotAllowedCreate(boolean
configAllowAutoTopicCreation,
+ boolean
paramMetadataAutoCreationEnabled,
+ boolean isUsingHttpLookup)
throws Exception {
+ super.testGetMetadataIfNotAllowedCreate(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled,
+ isUsingHttpLookup);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Test(dataProvider = "autoCreationParamsNotAllow")
+ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean
configAllowAutoTopicCreation,
+ boolean
paramMetadataAutoCreationEnabled,
+ boolean isUsingHttpLookup)
throws Exception {
+
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
+ paramMetadataAutoCreationEnabled, isUsingHttpLookup);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index 51f643d2b78..bf99b172829 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -22,70 +22,150 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.Semaphore;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
@Slf4j
-public class GetPartitionMetadataTest extends ProducerConsumerBase {
+public class GetPartitionMetadataTest {
- private static final String DEFAULT_NS = "public/default";
+ protected static final String DEFAULT_NS = "public/default";
- private PulsarClientImpl clientWithHttpLookup;
- private PulsarClientImpl clientWitBinaryLookup;
+ protected String clusterName = "c1";
- @Override
+ protected LocalBookkeeperEnsemble bkEnsemble;
+
+ protected ServiceConfiguration conf = new ServiceConfiguration();
+
+ protected PulsarService pulsar1;
+ protected URL url1;
+ protected PulsarAdmin admin1;
+ protected PulsarClientImpl clientWithHttpLookup1;
+ protected PulsarClientImpl clientWitBinaryLookup1;
+
+ @BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- clientWithHttpLookup =
- (PulsarClientImpl)
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
- clientWitBinaryLookup =
- (PulsarClientImpl)
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble.start();
+ // Start broker.
+ setupBrokers();
+ // Create default NS.
+ admin1.clusters().createCluster(clusterName, new ClusterDataImpl());
+
admin1.tenants().createTenant(NamespaceName.get(DEFAULT_NS).getTenant(),
+ new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(clusterName)));
+ admin1.namespaces().createNamespace(DEFAULT_NS);
}
- @Override
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
- super.internalCleanup();
- if (clientWithHttpLookup != null) {
- clientWithHttpLookup.close();
+ cleanupBrokers();
+ if (bkEnsemble != null) {
+ bkEnsemble.stop();
+ bkEnsemble = null;
+ }
+ }
+
+ protected void cleanupBrokers() throws Exception {
+ // Cleanup broker2.
+ if (clientWithHttpLookup1 != null) {
+ clientWithHttpLookup1.close();
+ clientWithHttpLookup1 = null;
+ }
+ if (clientWitBinaryLookup1 != null) {
+ clientWitBinaryLookup1.close();
+ clientWitBinaryLookup1 = null;
}
- if (clientWitBinaryLookup != null) {
- clientWitBinaryLookup.close();
+ if (admin1 != null) {
+ admin1.close();
+ admin1 = null;
}
+ if (pulsar1 != null) {
+ pulsar1.close();
+ pulsar1 = null;
+ }
+ // Reset configs.
+ conf = new ServiceConfiguration();
+ }
+
+ protected void setupBrokers() throws Exception {
+ doInitConf();
+ // Start broker.
+ pulsar1 = new PulsarService(conf);
+ pulsar1.start();
+ url1 = new URL(pulsar1.getWebServiceAddress());
+ admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+ clientWithHttpLookup1 =
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build();
+ clientWitBinaryLookup1 =
+ (PulsarClientImpl)
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
}
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
+ protected void doInitConf() {
+ conf.setClusterName(clusterName);
+ conf.setAdvertisedAddress("localhost");
+ conf.setBrokerServicePort(Optional.of(0));
+ conf.setWebServicePort(Optional.of(0));
+ conf.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
+ conf.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort() + "/foo");
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
+ conf.setBrokerShutdownTimeoutMs(0L);
+ conf.setLoadBalancerSheddingEnabled(false);
}
- private LookupService getLookupService(boolean isUsingHttpLookup) {
+ protected PulsarClientImpl[] getClientsToTest() {
+ return new PulsarClientImpl[] {clientWithHttpLookup1,
clientWitBinaryLookup1};
+ }
+
+ protected PulsarClientImpl[] getClientsToTest(boolean isUsingHttpLookup) {
if (isUsingHttpLookup) {
- return clientWithHttpLookup.getLookup();
+ return new PulsarClientImpl[] {clientWithHttpLookup1};
} else {
- return clientWitBinaryLookup.getLookup();
+ return new PulsarClientImpl[] {clientWitBinaryLookup1};
}
+
+ }
+
+ protected int getLookupRequestPermits() {
+ return
pulsar1.getBrokerService().getLookupRequestSemaphore().availablePermits();
+ }
+
+ protected void verifyPartitionsNeverCreated(String topicNameStr) throws
Exception {
+ TopicName topicName = TopicName.get(topicNameStr);
+ List<String> topicList = admin1.topics().getList("public/default");
+ for (int i = 0; i < 3; i++) {
+ assertFalse(topicList.contains(topicName.getPartition(i)));
+ }
+ }
+
+ protected void verifyNonPartitionedTopicNeverCreated(String topicNameStr)
throws Exception {
+ TopicName topicName = TopicName.get(topicNameStr);
+ List<String> topicList = admin1.topics().getList("public/default");
+ assertFalse(topicList.contains(topicName.getPartitionedTopicName()));
}
@DataProvider(name = "topicDomains")
@@ -96,43 +176,53 @@ public class GetPartitionMetadataTest extends
ProducerConsumerBase {
};
}
- @Test(dataProvider = "topicDomains")
- public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain
topicDomain) throws Exception {
- conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
- conf.setDefaultNumPartitions(3);
- conf.setAllowAutoTopicCreation(true);
- setup();
-
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
-
- // HTTP client.
- final String tp1 = BrokerTestUtil.newUniqueName(topicDomain.value() +
"://" + DEFAULT_NS + "/tp");
- clientWithHttpLookup.getPartitionsForTopic(tp1).join();
- Optional<PartitionedTopicMetadata> metadata1 =
pulsar.getPulsarResources().getNamespaceResources()
- .getPartitionedTopicResources()
- .getPartitionedTopicMetadataAsync(TopicName.get(tp1),
true).join();
- assertTrue(metadata1.isPresent());
- assertEquals(metadata1.get().partitions, 3);
-
- // Binary client.
- final String tp2 = BrokerTestUtil.newUniqueName(topicDomain.value() +
"://" + DEFAULT_NS + "/tp");
- clientWitBinaryLookup.getPartitionsForTopic(tp2).join();
- Optional<PartitionedTopicMetadata> metadata2 =
pulsar.getPulsarResources().getNamespaceResources()
- .getPartitionedTopicResources()
- .getPartitionedTopicMetadataAsync(TopicName.get(tp2),
true).join();
- assertTrue(metadata2.isPresent());
- assertEquals(metadata2.get().partitions, 3);
-
- // Verify: lookup semaphore has been releases.
+ protected static void doModifyTopicAutoCreation(PulsarAdmin admin1,
PulsarService pulsar1,
+ boolean
allowAutoTopicCreation, TopicType allowAutoTopicCreationType,
+ int defaultNumPartitions)
throws Exception {
+ admin1.brokers().updateDynamicConfiguration(
+ "allowAutoTopicCreation", allowAutoTopicCreation + "");
+ admin1.brokers().updateDynamicConfiguration(
+ "allowAutoTopicCreationType", allowAutoTopicCreationType + "");
+ admin1.brokers().updateDynamicConfiguration(
+ "defaultNumPartitions", defaultNumPartitions + "");
Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+
assertEquals(pulsar1.getConfiguration().isAllowAutoTopicCreation(),
allowAutoTopicCreation);
+
assertEquals(pulsar1.getConfiguration().getAllowAutoTopicCreationType(),
allowAutoTopicCreationType);
+ assertEquals(pulsar1.getConfiguration().getDefaultNumPartitions(),
defaultNumPartitions);
});
+ }
- // Cleanup.
- admin.topics().deletePartitionedTopic(tp1, false);
- admin.topics().deletePartitionedTopic(tp2, false);
+ protected void modifyTopicAutoCreation(boolean allowAutoTopicCreation,
+ TopicType
allowAutoTopicCreationType,
+ int defaultNumPartitions) throws
Exception {
+ doModifyTopicAutoCreation(admin1, pulsar1, allowAutoTopicCreation,
allowAutoTopicCreationType,
+ defaultNumPartitions);
+ }
+
+ @Test(dataProvider = "topicDomains")
+ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain
topicDomain) throws Exception {
+ modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
+
+ int lookupPermitsBefore = getLookupRequestPermits();
+
+ for (PulsarClientImpl client : getClientsToTest()) {
+ // Verify: the behavior of topic creation.
+ final String tp = BrokerTestUtil.newUniqueName(topicDomain.value()
+ "://" + DEFAULT_NS + "/tp");
+ client.getPartitionsForTopic(tp).join();
+ Optional<PartitionedTopicMetadata> metadata1 =
pulsar1.getPulsarResources().getNamespaceResources()
+ .getPartitionedTopicResources()
+ .getPartitionedTopicMetadataAsync(TopicName.get(tp),
true).join();
+ assertTrue(metadata1.isPresent());
+ assertEquals(metadata1.get().partitions, 3);
+
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+
+ // Cleanup.
+ admin1.topics().deletePartitionedTopic(tp, false);
+ }
}
@DataProvider(name = "autoCreationParamsAll")
@@ -163,40 +253,32 @@ public class GetPartitionMetadataTest extends
ProducerConsumerBase {
boolean
paramMetadataAutoCreationEnabled,
boolean
isUsingHttpLookup,
TopicDomain
topicDomain) throws Exception {
- conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
- conf.setDefaultNumPartitions(3);
- conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
- setup();
+ modifyTopicAutoCreation(configAllowAutoTopicCreation,
TopicType.PARTITIONED, 3);
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
+ int lookupPermitsBefore = getLookupRequestPermits();
- LookupService lookup = getLookupService(isUsingHttpLookup);
// Create topic.
- final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
- final TopicName topicName = TopicName.get(topicNameStr);
- admin.topics().createNonPartitionedTopic(topicNameStr);
- // Verify.
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
- PartitionedTopicMetadata response =
- lookup.getPartitionedTopicMetadata(topicName,
paramMetadataAutoCreationEnabled).join();
- assertEquals(response.partitions, 0);
- List<String> partitionedTopics =
admin.topics().getPartitionedTopicList("public/default");
- assertFalse(partitionedTopics.contains(topicNameStr));
- List<String> topicList = admin.topics().getList("public/default");
- for (int i = 0; i < 3; i++) {
- assertFalse(topicList.contains(topicName.getPartition(i)));
- }
+ final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp_");
+ admin1.topics().createNonPartitionedTopic(topicNameStr);
+
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ // Verify: the result of get partitioned topic metadata.
+ PartitionedTopicMetadata response =
+ client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled).join();
+ assertEquals(response.partitions, 0);
+ List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
+ assertFalse(partitionedTopics.contains(topicNameStr));
+ verifyPartitionsNeverCreated(topicNameStr);
- // Verify: lookup semaphore has been releases.
- Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
- });
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ }
// Cleanup.
- client.close();
- admin.topics().delete(topicNameStr, false);
+ admin1.topics().delete(topicNameStr, false);
}
@Test(dataProvider = "autoCreationParamsAll")
@@ -204,36 +286,30 @@ public class GetPartitionMetadataTest extends
ProducerConsumerBase {
boolean
paramMetadataAutoCreationEnabled,
boolean
isUsingHttpLookup,
TopicDomain
topicDomain) throws Exception {
- conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
- conf.setDefaultNumPartitions(3);
- conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
- setup();
+ modifyTopicAutoCreation(configAllowAutoTopicCreation,
TopicType.PARTITIONED, 3);
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
+ int lookupPermitsBefore = getLookupRequestPermits();
- LookupService lookup = getLookupService(isUsingHttpLookup);
// Create topic.
final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
- final TopicName topicName = TopicName.get(topicNameStr);
- admin.topics().createPartitionedTopic(topicNameStr, 3);
- // Verify.
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
- PartitionedTopicMetadata response =
- lookup.getPartitionedTopicMetadata(topicName,
paramMetadataAutoCreationEnabled).join();
- assertEquals(response.partitions, 3);
- List<String> topicList = admin.topics().getList("public/default");
- assertFalse(topicList.contains(topicNameStr));
-
- // Verify: lookup semaphore has been releases.
- Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
- });
+ admin1.topics().createPartitionedTopic(topicNameStr, 3);
+
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ // Verify: the result of get partitioned topic metadata.
+ PartitionedTopicMetadata response =
+ client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled).join();
+ assertEquals(response.partitions, 3);
+ verifyNonPartitionedTopicNeverCreated(topicNameStr);
+
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ }
// Cleanup.
- client.close();
- admin.topics().deletePartitionedTopic(topicNameStr, false);
+ admin1.topics().deletePartitionedTopic(topicNameStr, false);
}
@DataProvider(name = "clients")
@@ -247,76 +323,96 @@ public class GetPartitionMetadataTest extends
ProducerConsumerBase {
@Test(dataProvider = "clients")
public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup,
TopicDomain topicDomain) throws Exception {
- conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
- conf.setDefaultNumPartitions(3);
- conf.setAllowAutoTopicCreation(true);
- setup();
-
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
-
- LookupService lookup = getLookupService(isUsingHttpLookup);
- // Create topic.
- final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
- final TopicName topicName = TopicName.get(topicNameStr);
- // Verify.
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
- PartitionedTopicMetadata response =
lookup.getPartitionedTopicMetadata(topicName, true).join();
- assertEquals(response.partitions, 3);
- List<String> partitionedTopics =
admin.topics().getPartitionedTopicList("public/default");
- assertTrue(partitionedTopics.contains(topicNameStr));
- List<String> topicList = admin.topics().getList("public/default");
- assertFalse(topicList.contains(topicNameStr));
- for (int i = 0; i < 3; i++) {
+ modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
+
+ int lookupPermitsBefore = getLookupRequestPermits();
+
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ // Case-1: normal topic.
+ final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+ // Verify: the result of get partitioned topic metadata.
+ PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+ assertEquals(response.partitions, 3);
+ // Verify: the behavior of topic creation.
+ List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
+ assertTrue(partitionedTopics.contains(topicNameStr));
+ verifyNonPartitionedTopicNeverCreated(topicNameStr);
// The API "getPartitionedTopicMetadata" only creates the
partitioned metadata, it will not create the
// partitions.
- assertFalse(topicList.contains(topicName.getPartition(i)));
+ verifyPartitionsNeverCreated(topicNameStr);
+
+ // Case-2: topic with suffix "-partition-1".
+ final String topicNameStrWithSuffix = BrokerTestUtil.newUniqueName(
+ topicDomain.value() + "://" + DEFAULT_NS + "/tp") +
"-partition-1";
+ // Verify: the result of get partitioned topic metadata.
+ PartitionedTopicMetadata response2 =
+ client.getPartitionedTopicMetadata(topicNameStrWithSuffix,
true).join();
+ assertEquals(response2.partitions, 0);
+ // Verify: the behavior of topic creation.
+ List<String> partitionedTopics2 =
+ admin1.topics().getPartitionedTopicList("public/default");
+ assertFalse(partitionedTopics2.contains(topicNameStrWithSuffix));
+ assertFalse(partitionedTopics2.contains(
+
TopicName.get(topicNameStrWithSuffix).getPartitionedTopicName()));
+
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ // Cleanup.
+ admin1.topics().deletePartitionedTopic(topicNameStr, false);
+ try {
+ admin1.topics().delete(topicNameStrWithSuffix, false);
+ } catch (Exception ex) {}
}
- // Verify: lookup semaphore has been releases.
- Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
- });
-
- // Cleanup.
- client.close();
- admin.topics().deletePartitionedTopic(topicNameStr, false);
}
@Test(dataProvider = "clients")
public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup,
TopicDomain topicDomain) throws Exception {
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
- setup();
-
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
-
- LookupService lookup = getLookupService(isUsingHttpLookup);
- // Create topic.
- final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
- final TopicName topicName = TopicName.get(topicNameStr);
- // Verify.
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
- PartitionedTopicMetadata response =
lookup.getPartitionedTopicMetadata(topicName, true).join();
- assertEquals(response.partitions, 0);
- List<String> partitionedTopics =
admin.topics().getPartitionedTopicList("public/default");
- assertFalse(partitionedTopics.contains(topicNameStr));
- List<String> topicList = admin.topics().getList("public/default");
- assertFalse(topicList.contains(topicNameStr));
-
- // Verify: lookup semaphore has been releases.
- Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
- });
-
- // Cleanup.
- client.close();
- try {
- admin.topics().delete(topicNameStr, false);
- } catch (Exception ex) {}
+ modifyTopicAutoCreation(true, TopicType.NON_PARTITIONED, 3);
+
+ int lookupPermitsBefore = getLookupRequestPermits();
+
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ // Case 1: normal topic.
+ final String topicNameStr =
BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
+ // Verify: the result of get partitioned topic metadata.
+ PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, true).join();
+ assertEquals(response.partitions, 0);
+ // Verify: the behavior of topic creation.
+ List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
+ assertFalse(partitionedTopics.contains(topicNameStr));
+ verifyPartitionsNeverCreated(topicNameStr);
+
+ // Case-2: topic with suffix "-partition-1".
+ final String topicNameStrWithSuffix = BrokerTestUtil.newUniqueName(
+ topicDomain.value() + "://" + DEFAULT_NS + "/tp") +
"-partition-1";
+ // Verify: the result of get partitioned topic metadata.
+ PartitionedTopicMetadata response2 =
+ client.getPartitionedTopicMetadata(topicNameStrWithSuffix,
true).join();
+ assertEquals(response2.partitions, 0);
+ // Verify: the behavior of topic creation.
+ List<String> partitionedTopics2 =
+ admin1.topics().getPartitionedTopicList("public/default");
+ assertFalse(partitionedTopics2.contains(topicNameStrWithSuffix));
+ assertFalse(partitionedTopics2.contains(
+
TopicName.get(topicNameStrWithSuffix).getPartitionedTopicName()));
+
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ // Cleanup.
+ try {
+ admin1.topics().delete(topicNameStr, false);
+ } catch (Exception ex) {}
+ try {
+ admin1.topics().delete(topicNameStrWithSuffix, false);
+ } catch (Exception ex) {}
+ }
}
@DataProvider(name = "autoCreationParamsNotAllow")
@@ -336,64 +432,38 @@ public class GetPartitionMetadataTest extends
ProducerConsumerBase {
public void testGetMetadataIfNotAllowedCreate(boolean
configAllowAutoTopicCreation,
boolean
paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup)
throws Exception {
- if (!configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled)
{
- // These test cases are for the following PR.
- // Which was described in the Motivation of
https://github.com/apache/pulsar/pull/22206.
- return;
- }
- conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
- conf.setDefaultNumPartitions(3);
- conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
- setup();
-
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
-
- LookupService lookup = getLookupService(isUsingHttpLookup);
- // Define topic.
- final String topicNameStr =
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
- final TopicName topicName = TopicName.get(topicNameStr);
- // Verify.
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
- try {
- lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr),
paramMetadataAutoCreationEnabled).join();
- fail("Expect a not found exception");
- } catch (Exception e) {
- log.warn("", e);
- Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
- assertTrue(unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException
- || unwrapEx instanceof
PulsarClientException.NotFoundException);
- }
+ modifyTopicAutoCreation(configAllowAutoTopicCreation,
TopicType.PARTITIONED, 3);
- List<String> partitionedTopics =
admin.topics().getPartitionedTopicList("public/default");
-
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
- assertFalse(partitionedTopics.contains(topicNameStr));
- List<String> topicList = admin.topics().getList("public/default");
- assertFalse(topicList.contains(topicNameStr));
- for (int i = 0; i < 3; i++) {
- assertFalse(topicList.contains(topicName.getPartition(i)));
- }
+ int lookupPermitsBefore = getLookupRequestPermits();
- // Verify: lookup semaphore has been releases.
- Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
- });
-
- // Cleanup.
- client.close();
- }
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ // Define topic.
+ final String topicNameStr =
BrokerTestUtil.newUniqueName("persistent://" + DEFAULT_NS + "/tp");
+ final TopicName topicName = TopicName.get(topicNameStr);
+ // Verify: the result of get partitioned topic metadata.
+ try {
+ client.getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled)
+ .join();
+ fail("Expect a not found exception");
+ } catch (Exception e) {
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || unwrapEx instanceof
PulsarClientException.NotFoundException);
+ }
+ // Verify: the behavior of topic creation.
+ List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
+
pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExists(topicName);
+ assertFalse(partitionedTopics.contains(topicNameStr));
+ verifyNonPartitionedTopicNeverCreated(topicNameStr);
+ verifyPartitionsNeverCreated(topicNameStr);
- @DataProvider(name = "autoCreationParamsForNonPersistentTopic")
- public Object[][] autoCreationParamsForNonPersistentTopic(){
- return new Object[][]{
- // configAllowAutoTopicCreation,
paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
- {true, true, true},
- {true, true, false},
- {false, true, true},
- {false, true, false},
- {false, false, true}
- };
+ // Verify: lookup semaphore has been releases.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
+ });
+ }
}
/**
@@ -408,66 +478,46 @@ public class GetPartitionMetadataTest extends
ProducerConsumerBase {
* param-auto-create = false
* HTTP API: not found error
* binary API: not support
- * This test only guarantees that the behavior is the same as before. The
following separated PR will fix the
- * incorrect behavior.
+ * After PIP-344, the behavior will be the same as persistent topics,
which was described in PIP-344.
*/
- @Test(dataProvider = "autoCreationParamsForNonPersistentTopic")
- public void testGetNonPersistentMetadataIfNotAllowedCreate(boolean
configAllowAutoTopicCreation,
+ @Test(dataProvider = "autoCreationParamsNotAllow")
+ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean
configAllowAutoTopicCreation,
boolean
paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup)
throws Exception {
- conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
- conf.setDefaultNumPartitions(3);
- conf.setAllowAutoTopicCreation(configAllowAutoTopicCreation);
- setup();
-
- Semaphore semaphore =
pulsar.getBrokerService().getLookupRequestSemaphore();
- int lookupPermitsBefore = semaphore.availablePermits();
-
- LookupService lookup = getLookupService(isUsingHttpLookup);
- // Define topic.
- final String topicNameStr =
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
- final TopicName topicName = TopicName.get(topicNameStr);
- // Verify.
- // Regarding non-persistent topic, we do not know whether it exists or
not.
- // Broker will return a non-partitioned metadata if partitioned
metadata does not exist.
- PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
-
- if (!configAllowAutoTopicCreation && !paramMetadataAutoCreationEnabled
&& isUsingHttpLookup) {
+ modifyTopicAutoCreation(configAllowAutoTopicCreation,
TopicType.PARTITIONED, 3);
+
+ int lookupPermitsBefore = getLookupRequestPermits();
+
+ PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
+ for (PulsarClientImpl client : clientArray) {
+ // Define topic.
+ final String topicNameStr =
BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
+ final TopicName topicName = TopicName.get(topicNameStr);
+ // Verify: the result of get partitioned topic metadata.
try {
-
lookup.getPartitionedTopicMetadata(TopicName.get(topicNameStr),
paramMetadataAutoCreationEnabled)
+ PartitionedTopicMetadata topicMetadata = client
+ .getPartitionedTopicMetadata(topicNameStr,
paramMetadataAutoCreationEnabled)
.join();
- Assert.fail("Expected a not found ex");
+ log.info("Get topic metadata: {}", topicMetadata.partitions);
+ fail("Expected a not found ex");
} catch (Exception ex) {
- // Cleanup.
- client.close();
- return;
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ assertTrue(unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || unwrapEx instanceof
PulsarClientException.NotFoundException);
}
- }
- PartitionedTopicMetadata metadata = lookup
- .getPartitionedTopicMetadata(TopicName.get(topicNameStr),
paramMetadataAutoCreationEnabled).join();
- if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
- assertEquals(metadata.partitions, 3);
- } else {
- assertEquals(metadata.partitions, 0);
- }
-
- List<String> partitionedTopics =
admin.topics().getPartitionedTopicList("public/default");
-
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .partitionedTopicExists(topicName);
- if (configAllowAutoTopicCreation && paramMetadataAutoCreationEnabled) {
- assertTrue(partitionedTopics.contains(topicNameStr));
- } else {
+ // Verify: the behavior of topic creation.
+ List<String> partitionedTopics =
admin1.topics().getPartitionedTopicList("public/default");
+
pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExists(topicName);
assertFalse(partitionedTopics.contains(topicNameStr));
+ verifyNonPartitionedTopicNeverCreated(topicNameStr);
+ verifyPartitionsNeverCreated(topicNameStr);
}
// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
- int lookupPermitsAfter = semaphore.availablePermits();
- assertEquals(lookupPermitsAfter, lookupPermitsBefore);
+ assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
-
- // Cleanup.
- client.close();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 9aa29f08c5c..c9457e1a888 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -357,9 +358,12 @@ public class TopicsTest extends
MockedPulsarServiceBaseTest {
CompletableFuture future = new CompletableFuture();
future.completeExceptionally(new BrokerServiceException("Fake
Exception"));
CompletableFuture existFuture = new CompletableFuture();
- existFuture.complete(true);
+ existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists());
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+ CompletableFuture existBooleanFuture = new CompletableFuture();
+ existBooleanFuture.complete(false);
+
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
ProducerMessages producerMessages = new ProducerMessages();
@@ -370,7 +374,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest
{
topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
ArgumentCaptor<RestException> responseCaptor =
ArgumentCaptor.forClass(RestException.class);
verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
- Assert.assertEquals(responseCaptor.getValue().getMessage(), "Can't
find owner of given topic.");
+
Assert.assertTrue(responseCaptor.getValue().getMessage().contains(topicName + "
not found"));
}
@Test
@@ -378,8 +382,11 @@ public class TopicsTest extends
MockedPulsarServiceBaseTest {
String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
NamespaceService nameSpaceService = mock(NamespaceService.class);
CompletableFuture existFuture = new CompletableFuture();
- existFuture.complete(false);
+ existFuture.complete(TopicExistsInfo.newTopicNotExists());
+ CompletableFuture existBooleanFuture = new CompletableFuture();
+ existBooleanFuture.complete(false);
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
+
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
ProducerMessages producerMessages = new ProducerMessages();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 7004eae29b5..ab492de055b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.lookup.NamespaceData;
import org.apache.pulsar.broker.lookup.RedirectData;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
@@ -149,9 +150,12 @@ public class HttpTopicLookupv2Test {
config.setAuthorizationEnabled(true);
NamespaceService namespaceService = pulsar.getNamespaceService();
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- future.complete(false);
+ CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
+ future.complete(TopicExistsInfo.newTopicNotExists());
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+ CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
+ booleanFuture.complete(false);
+
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(asyncResponse1,
TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist",
false, null, null);
@@ -260,9 +264,12 @@ public class HttpTopicLookupv2Test {
policies3Future.complete(Optional.of(policies3));
doReturn(policies3Future).when(namespaceResources).getPoliciesAsync(namespaceName2);
NamespaceService namespaceService = pulsar.getNamespaceService();
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- future.complete(false);
+ CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
+ future.complete(TopicExistsInfo.newTopicNotExists());
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+ CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
+ booleanFuture.complete(false);
+
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
destLookup.lookupTopicAsync(asyncResponse,
TopicDomain.persistent.value(), property, cluster, ns2,
"invalid-localCluster", false, null, null);
verify(asyncResponse).resume(arg.capture());
@@ -294,8 +301,8 @@ public class HttpTopicLookupv2Test {
doReturn(uri).when(uriInfo).getRequestUri();
config.setAuthorizationEnabled(true);
NamespaceService namespaceService = pulsar.getNamespaceService();
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- future.complete(false);
+ CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
+ future.complete(TopicExistsInfo.newTopicNotExists());
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
// Get the current semaphore first
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index a0313ef7436..0b0d38a071e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -815,14 +815,15 @@ public class NamespaceServiceTest extends BrokerTestBase {
String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Awaitility.await().untilAsserted(() -> {
-
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get());
+
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get().isExists());
});
String partitionedTopic = topicDomain + "://prop/ns-abc/" +
UUID.randomUUID();
admin.topics().createPartitionedTopic(partitionedTopic, 5);
Awaitility.await().untilAsserted(() -> {
-
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get());
-
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic
+ "-partition-2")).get());
+
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get().isExists());
+ assertTrue(pulsar.getNamespaceService()
+ .checkTopicExists(TopicName.get(partitionedTopic +
"-partition-2")).get().isExists());
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
index 7790940c132..8fdf0723ea8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -99,6 +100,7 @@ public class TopicGCTest extends ProducerConsumerBase {
Consumer<String> consumerAllPartition =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
Message<String> msg = consumerAllPartition.receive(2,
TimeUnit.SECONDS);
+ assertNotNull(msg);
String receivedMsgValue = msg.getValue();
log.info("received msg: {}", receivedMsgValue);
consumerAllPartition.acknowledge(msg);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 7735f66e783..4d6cf96a010 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -32,6 +32,7 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -58,7 +59,6 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
@Getter(AccessLevel.PUBLIC)
@@ -104,6 +104,31 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
}
}
+ private CompletableFuture<Boolean> checkDlqAlreadyExists(String topic) {
+ CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
+ client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata
-> {
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPersistent()) {
+ // Either partitioned or non-partitioned, it exists.
+ existsFuture.complete(true);
+ } else {
+ // If it is a non-persistent topic, return true only it is a
partitioned topic.
+ existsFuture.complete(metadata != null && metadata.partitions
> 0);
+ }
+ }).exceptionally(ex -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof PulsarClientException.NotFoundException
+ || actEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || actEx instanceof
PulsarAdminException.NotFoundException) {
+ existsFuture.complete(false);
+ } else {
+ existsFuture.completeExceptionally(ex);
+ }
+ return null;
+ });
+ return existsFuture;
+ }
+
@Override
public CompletableFuture<Consumer<T>> subscribeAsync() {
if (conf.getTopicNames().isEmpty() && conf.getTopicsPattern() == null)
{
@@ -135,20 +160,18 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null ||
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
||
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
- CompletableFuture<PartitionedTopicMetadata>
retryLetterTopicMetadata =
-
client.getPartitionedTopicMetadata(oldRetryLetterTopic, true);
- CompletableFuture<PartitionedTopicMetadata>
deadLetterTopicMetadata =
- client.getPartitionedTopicMetadata(oldDeadLetterTopic,
true);
+ CompletableFuture<Boolean> retryLetterTopicMetadata =
checkDlqAlreadyExists(oldRetryLetterTopic);
+ CompletableFuture<Boolean> deadLetterTopicMetadata =
checkDlqAlreadyExists(oldDeadLetterTopic);
applyDLQConfig =
CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
.thenAccept(__ -> {
String retryLetterTopic = topicFirst + "-" +
conf.getSubscriptionName()
+
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String deadLetterTopic = topicFirst + "-" +
conf.getSubscriptionName()
+ RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
- if (retryLetterTopicMetadata.join().partitions >
0) {
+ if (retryLetterTopicMetadata.join()) {
retryLetterTopic = oldRetryLetterTopic;
}
- if (deadLetterTopicMetadata.join().partitions > 0)
{
+ if (deadLetterTopicMetadata.join()) {
deadLetterTopic = oldDeadLetterTopic;
}
if (deadLetterPolicy == null) {