codelipenghui commented on a change in pull request #8818:
URL: https://github.com/apache/pulsar/pull/8818#discussion_r548529963



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -715,6 +720,11 @@ protected static PartitionedTopicMetadata 
fetchPartitionedTopicMetadataCheckAllo
                     .get();
         } catch (Exception e) {
             if (e.getCause() instanceof RestException) {
+                // Since CompletableFuture wrappers exception, so we will lost 
the Status
+                // rebuild the Status if exception message contains "Topic not 
exist."
+                if (e.getCause().getMessage().contains("Topic not exist.")) {
+                    throw new 
org.apache.pulsar.broker.web.RestException(Response.Status.NOT_FOUND, "Topic 
not exist.");
+                }

Review comment:
       Same as the above comment.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -702,6 +702,11 @@ protected static PartitionedTopicMetadata 
fetchPartitionedTopicMetadata(PulsarSe
             return 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
         } catch (Exception e) {
             if (e.getCause() instanceof RestException) {
+                // Since CompletableFuture wrappers exception, so we will lost 
the Status
+                // rebuild the Status if exception message contains "Topic not 
exist."
+                if (e.getCause().getMessage().contains("Topic not exist.")) {
+                    throw new 
org.apache.pulsar.broker.web.RestException(Response.Status.NOT_FOUND, "Topic 
not exist.");
+                }

Review comment:
       Could you please clarify why `throw (RestException) e.getCause()` does 
not work here? If e.getCause is instance of RestException, I think we can throw 
it directly. I might be missing something here.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -456,10 +456,17 @@ protected void internalCreateNonPartitionedTopic(boolean 
authoritative) {
 
         validateTopicOwnership(topicName, authoritative);
 
-        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            log.warn("[{}] Partitioned topic with the same name already exists 
{}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "This topic already 
exists");
+        PartitionedTopicMetadata partitionMetadata = null;
+        try {
+            partitionMetadata = getPartitionedTopicMetadata(topicName, 
authoritative, false);
+            if (partitionMetadata.partitions > 0) {
+                log.warn("[{}] Partitioned topic with the same name already 
exists {}", clientAppId(), topicName);
+                throw new RestException(Status.CONFLICT, "This topic already 
exists");
+            }
+        } catch (RestException restException) {
+            if (Status.NOT_FOUND.getStatusCode() != 
restException.getResponse().getStatus()) {
+                throw restException;

Review comment:
       I think it's not correct handling here, If got 404 here, it means the 
partitioned metadata does not exist right? If the partitioned metadata does 
exist, why should we prevent the non-partitioned topic creation?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -1174,13 +1174,19 @@ public ServiceUnitId getServiceUnitId(TopicName 
topicName) throws Exception {
     }
 
     private CompletableFuture<List<String>> getPartitionsForTopic(TopicName 
topicName) {
-        return 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(meta
 -> {
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+        
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).whenComplete((meta,
 t) -> {
+            if (t != null) {
+                future.completeExceptionally(t);
+                return;
+            }
             List<String> result = Lists.newArrayList();
             for (int i = 0; i < meta.partitions; i++) {
                 result.add(topicName.getPartition(i).toString());
             }
-            return CompletableFuture.completedFuture(result);
+            future.complete(result);
         });
+        return future;

Review comment:
       What is the difference between these changes? Why we change the 
`thenCompose` to `whenComplete`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to