This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 942c5c5f4d8 [Broker] Make 
PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)
942c5c5f4d8 is described below

commit 942c5c5f4d801d03b63631c6bf0a2e1654ef1c56
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu May 5 19:26:31 2022 +0800

    [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async 
(#14153)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 50 +++-------------
 .../broker/admin/impl/PersistentTopicsBase.java    | 68 ++++++++++++----------
 .../broker/admin/v1/NonPersistentTopics.java       | 21 ++++---
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 15 ++++-
 .../broker/admin/v2/NonPersistentTopics.java       |  7 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 13 ++++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 56 +++++++++++-------
 .../org/apache/pulsar/broker/admin/TopicsTest.java |  2 +-
 .../pulsar/broker/admin/v1/V1_AdminApi2Test.java   |  7 ++-
 9 files changed, 127 insertions(+), 112 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ef167829f8d..ec502e134f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -427,21 +427,20 @@ public abstract class AdminResource extends 
PulsarWebResource {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName 
topicName,
+                                                                   boolean 
authoritative,
+                                                                   boolean 
checkAllowAutoCreation) {
+        return sync(() -> getPartitionedTopicMetadataAsync(topicName, 
authoritative, checkAllowAutoCreation));
+    }
+
     protected CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean authoritative, boolean 
checkAllowAutoCreation) {
-        try {
-            validateClusterOwnership(topicName.getCluster());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-
         // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
         // producer/consumer
-        return 
validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
-                .thenRun(() -> {
-                    validateTopicOperation(topicName, TopicOperation.LOOKUP);
-                })
+        return validateClusterOwnershipAsync(topicName.getCluster())
+                .thenCompose(__ -> 
validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.LOOKUP))
                 .thenCompose(__ -> {
                     if (checkAllowAutoCreation) {
                         return pulsar().getBrokerService()
@@ -452,37 +451,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 });
     }
 
-    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName 
topicName,
-            boolean authoritative, boolean checkAllowAutoCreation) {
-        validateClusterOwnership(topicName.getCluster());
-        // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
-        // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
-        // producer/consumer
-        validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
-
-        try {
-            validateTopicOperation(topicName, TopicOperation.LOOKUP);
-        } catch (Exception e) {
-            // unknown error marked as internal server error
-            log.warn("Unexpected error while authorizing lookup. topic={}, 
role={}. Error: {}", topicName,
-                    clientAppId(), e.getMessage(), e);
-            throw new RestException(e);
-        }
-
-        PartitionedTopicMetadata partitionMetadata;
-        if (checkAllowAutoCreation) {
-            partitionMetadata = 
fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), topicName);
-        } else {
-            partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), 
topicName);
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Total number of partitions for topic {} is {}", 
clientAppId(), topicName,
-                    partitionMetadata.partitions);
-        }
-        return partitionMetadata;
-    }
-
     protected static PartitionedTopicMetadata 
fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
         try {
             return 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf1b3d71528..aa3fa6011dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -546,27 +546,37 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean 
authoritative,
                                                                       boolean 
checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = 
getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists 
here.
-            // However, when checkAllowAutoCreation is true, the client will 
create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created 
topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if 
(!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic 
not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, 
e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed 
to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return sync(() -> internalGetPartitionedMetadataAsync(authoritative, 
checkAllowAutoCreation));
+    }
+
+    protected CompletableFuture<PartitionedTopicMetadata> 
internalGetPartitionedMetadataAsync(
+                                                                          
boolean authoritative,
+                                                                          
boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, 
checkAllowAutoCreation)
+                .thenCompose(metadata -> {
+                    CompletableFuture<Void> ret;
+                    if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+                        // The topic may be a non-partitioned topic, so check 
if it exists here.
+                        // However, when checkAllowAutoCreation is true, the 
client will create the topic if
+                        // it doesn't exist. In this case, `partitions == 0` 
means the automatically created topic
+                        // is a non-partitioned topic so we shouldn't check if 
the topic exists.
+                        ret = internalCheckTopicExists(topicName);
+                    } else if (metadata.partitions > 1) {
+                        ret = internalValidateClientVersionAsync();
+                    } else {
+                        ret = CompletableFuture.completedFuture(null);
+                    }
+                    return ret.thenApply(__ -> metadata);
+                });
+    }
+
+    protected CompletableFuture<Void> internalCheckTopicExists(TopicName 
topicName) {
+        return pulsar().getNamespaceService().checkTopicExists(topicName)
+                .thenAccept(exist -> {
+                    if (!exist) {
+                        throw new RestException(Status.NOT_FOUND, "Topic not 
exist");
+                    }
+                });
     }
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
@@ -4135,15 +4145,15 @@ public class PersistentTopicsBase extends AdminResource 
{
     // Pulsar client-java lib always passes user-agent as X-Java-$version.
     // However, cpp-client older than v1.20 (PR #765) never used to pass it.
     // So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be 
rejected
-    private void validateClientVersion() {
+    protected CompletableFuture<Void> internalValidateClientVersionAsync() {
         if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) 
{
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         final String userAgent = httpRequest.getHeader("User-Agent");
         if (StringUtils.isBlank(userAgent)) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
+            return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
                     "Client lib is not compatible to"
-                            + " access partitioned metadata: version in 
user-agent is not present");
+                            + " access partitioned metadata: version in 
user-agent is not present"));
         }
         // Version < 1.20 for cpp-client is not allowed
         if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
@@ -4154,18 +4164,16 @@ public class PersistentTopicsBase extends AdminResource 
{
                 if (splits != null && splits.length > 1) {
                     if 
(LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > 
Integer.parseInt(splits[0])
                             || 
LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > 
Integer.parseInt(splits[1])) {
-                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
                                 "Client lib is not compatible to access 
partitioned metadata: version " + userAgent
-                                        + " is not supported");
+                                        + " is not supported"));
                     }
                 }
-            } catch (RestException re) {
-                throw re;
             } catch (Exception e) {
                 log.warn("[{}] Failed to parse version {} ", clientAppId(), 
userAgent);
             }
         }
-        return;
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index dc466be86d6..93e49dc70b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -77,16 +76,16 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission")})
-    public PartitionedTopicMetadata 
getPartitionedMetadata(@PathParam("property") String property,
-                                                           
@PathParam("cluster") String cluster,
-                                                           
@PathParam("namespace") String namespace,
-                                                           @PathParam("topic") 
@Encoded String encodedTopic,
-                                                           
@QueryParam("authoritative") @DefaultValue("false")
-                                                                       boolean 
authoritative,
-                                                           
@QueryParam("checkAllowAutoCreation") @DefaultValue("false")
-                                                                       boolean 
checkAllowAutoCreation) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return getPartitionedTopicMetadata(topicName, authoritative, 
checkAllowAutoCreation);
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") 
boolean checkAllowAutoCreation) {
+        super.getPartitionedMetadata(asyncResponse, property, cluster, 
namespace, encodedTopic, authoritative,
+                checkAllowAutoCreation);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e5107a6bd8..bb92498b2e7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,7 +48,6 @@ import 
org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -283,13 +282,23 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission") 
})
-    public PartitionedTopicMetadata 
getPartitionedMetadata(@PathParam("property") String property,
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") 
boolean checkAllowAutoCreation) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative, 
checkAllowAutoCreation);
+        internalGetPartitionedMetadataAsync(authoritative, 
checkAllowAutoCreation)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get partitioned metadata 
topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 66867b68426..5cf78278350 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -54,7 +54,6 @@ import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -88,7 +87,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate cluster 
configuration")
     })
-    public PartitionedTopicMetadata getPartitionedMetadata(
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -99,7 +99,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @ApiParam(value = "Is check configuration required to 
automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") 
boolean checkAllowAutoCreation) {
-        return super.getPartitionedMetadata(tenant, namespace, encodedTopic, 
authoritative, checkAllowAutoCreation);
+        super.getPartitionedMetadata(asyncResponse, tenant, namespace, 
encodedTopic, authoritative,
+                checkAllowAutoCreation);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a70bf7f58ea..4095530340c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -881,7 +881,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
             @ApiResponse(code = 500, message = "Internal server error")
     })
-    public PartitionedTopicMetadata getPartitionedMetadata(
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -893,7 +894,15 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @ApiParam(value = "Is check configuration required to 
automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") 
boolean checkAllowAutoCreation) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative, 
checkAllowAutoCreation);
+        internalGetPartitionedMetadataAsync(authoritative, 
checkAllowAutoCreation)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get partitioned metadata 
topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 382e90e9321..b1bc4ab5486 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -405,58 +405,74 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         ArgumentCaptor<RestException> errorCaptor = 
ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
         Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero 
partitions"));
-
+        response = mock(AsyncResponse.class);
         final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
nonPartitionTopic2, true, null);
-        Assert.assertEquals(persistentTopics
-                        .getPartitionedMetadata(testTenant, testNamespace, 
nonPartitionTopic, true, false).partitions,
-                0);
-
-        Assert.assertEquals(persistentTopics
-                        .getPartitionedMetadata(testTenant, testNamespace, 
nonPartitionTopic, true, true).partitions,
-                0);
+        persistentTopics.getPartitionedMetadata(response, testTenant, 
testNamespace, nonPartitionTopic, true, false);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor2.capture());
+        Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
+        response = mock(AsyncResponse.class);
+        responseCaptor2 = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, 
testNamespace, nonPartitionTopic, true, true);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor2.capture());
+        Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
     }
 
     @Test
     public void testCreateNonPartitionedTopic() {
         final String topicName = "standard-topic-partition-a";
+        AsyncResponse response = mock(AsyncResponse.class);
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
topicName, true, null);
-        PartitionedTopicMetadata pMetadata = 
persistentTopics.getPartitionedMetadata(
+        persistentTopics.getPartitionedMetadata(response,
                 testTenant, testNamespace, topicName, true, false);
-        Assert.assertEquals(pMetadata.partitions, 0);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().partitions, 0);
 
-        PartitionedTopicMetadata metadata = 
persistentTopics.getPartitionedMetadata(
+        response = mock(AsyncResponse.class);
+        responseCaptor = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response,
                 testTenant, testNamespace, topicName, true, true);
-        Assert.assertEquals(metadata.partitions, 0);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().partitions, 0);
         final String topicName2 = "standard-topic-partition-b";
         Map<String, String> topicMetadata = Maps.newHashMap();
         topicMetadata.put("key1", "value1");
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
topicName2, true, topicMetadata);
-        PartitionedTopicMetadata pMetadata2 = 
persistentTopics.getPartitionedMetadata(
+        response = mock(AsyncResponse.class);
+        responseCaptor = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response,
                 testTenant, testNamespace, topicName2, true, false);
-        Assert.assertNull(pMetadata2.properties);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertNull(responseCaptor.getValue().properties);
     }
 
     @Test
     public void testCreatePartitionedTopic() {
         AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
         final String topicName = "standard-partitioned-topic-a";
         persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, topicName, 2, true);
         Awaitility.await().untilAsserted(() -> {
-            PartitionedTopicMetadata pMetadata = 
persistentTopics.getPartitionedMetadata(
+            persistentTopics.getPartitionedMetadata(response,
                     testTenant, testNamespace, topicName, true, false);
-            Assert.assertNull(pMetadata.properties);
+            verify(response, 
timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+            Assert.assertNull(responseCaptor.getValue().properties);
         });
+        AsyncResponse response2 = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = 
ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
         final String topicName2 = "standard-partitioned-topic-b";
         Map<String, String> topicMetadata = Maps.newHashMap();
         topicMetadata.put("key1", "value1");
         PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, 
topicMetadata);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, topicName2, metadata, true);
+        persistentTopics.createPartitionedTopic(response2, testTenant, 
testNamespace, topicName2, metadata, true);
         Awaitility.await().untilAsserted(() -> {
-            PartitionedTopicMetadata pMetadata2 = 
persistentTopics.getPartitionedMetadata(
+            persistentTopics.getPartitionedMetadata(response2,
                     testTenant, testNamespace, topicName2, true, false);
-            Assert.assertEquals(pMetadata2.properties.size(), 1);
-            Assert.assertEquals(pMetadata2.properties, topicMetadata);
+            verify(response2, 
timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+            Assert.assertEquals(responseCaptor2.getValue().properties.size(), 
1);
+            Assert.assertEquals(responseCaptor2.getValue().properties, 
topicMetadata);
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 4edfbfd17db..82d7571912b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -383,7 +383,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest 
{
         topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false, producerMessages);
         ArgumentCaptor<RestException> responseCaptor = 
ArgumentCaptor.forClass(RestException.class);
         verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertEquals(responseCaptor.getValue().getMessage(), "Fail to 
publish message: Topic not exist");
+        
Assert.assertTrue(responseCaptor.getValue().getMessage().contains("Topic not 
exist"));
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
index c74d5ee7733..f62130ec381 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
@@ -256,7 +256,12 @@ public class V1_AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
         // test partitioned-topic
         final String partitionedTopicName = 
"non-persistent://prop-xyz/use/ns1/paritioned";
-        
assertEquals(admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 0);
+        try {
+            
admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName);
+            fail("Should have failed");
+        } catch (Exception ex) {
+            assertTrue(ex instanceof PulsarAdminException.NotFoundException);
+        }
         
admin.nonPersistentTopics().createPartitionedTopic(partitionedTopicName, 5);
         
assertEquals(admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 5);
     }

Reply via email to