aloyszhang commented on a change in pull request #13055:
URL: https://github.com/apache/pulsar/pull/13055#discussion_r763718139



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
##########
@@ -80,55 +80,63 @@ protected void internalLookupTopicAsync(TopicName 
topicName, boolean authoritati
             return;
         }
 
-        CompletableFuture<Optional<LookupResult>> lookupFuture = 
pulsar().getNamespaceService()
-                .getBrokerServiceUrlAsync(topicName,
-                        
LookupOptions.builder().advertisedListenerName(listenerName)
-                                
.authoritative(authoritative).loadTopicsInBundle(false).build());
-
-        lookupFuture.thenAccept(optionalResult -> {
-            if (optionalResult == null || !optionalResult.isPresent()) {
-                log.warn("No broker was found available for topic {}", 
topicName);
-                completeLookupResponseExceptionally(asyncResponse,
-                        new 
WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+        
pulsar().getNamespaceService().checkTopicExists(topicName).thenAccept(exist -> {
+            if (!exist && 
!pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)) {
+                completeLookupResponseExceptionally(asyncResponse, new 
RestException(Response.Status.NOT_FOUND,
+                        "Topic not found."));
                 return;
             }
+            CompletableFuture<Optional<LookupResult>> lookupFuture = 
pulsar().getNamespaceService()
+                    .getBrokerServiceUrlAsync(topicName,
+                            
LookupOptions.builder().advertisedListenerName(listenerName)
+                                    
.authoritative(authoritative).loadTopicsInBundle(false).build());
 
-            LookupResult result = optionalResult.get();
-            // We have found either a broker that owns the topic, or a broker 
to which we should redirect the client to
-            if (result.isRedirect()) {
-                boolean newAuthoritative = result.isAuthoritativeRedirect();
-                URI redirect;
-                try {
-                    String redirectUrl = isRequestHttps() ? 
result.getLookupData().getHttpUrlTls()
-                            : result.getLookupData().getHttpUrl();
-                    checkNotNull(redirectUrl, "Redirected cluster's service 
url is not configured");
-                    String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : 
LOOKUP_PATH_V1;
-                    String path = String.format("%s%s%s?authoritative=%s",
-                            redirectUrl, lookupPath, 
topicName.getLookupName(), newAuthoritative);
-                    path = listenerName == null ? path : path + 
"&listenerName=" + listenerName;
-                    redirect = new URI(path);
-                } catch (URISyntaxException | NullPointerException e) {
-                    log.error("Error in preparing redirect url for {}: {}", 
topicName, e.getMessage(), e);
-                    completeLookupResponseExceptionally(asyncResponse, e);
+            lookupFuture.thenAccept(optionalResult -> {
+                if (optionalResult == null || !optionalResult.isPresent()) {
+                    log.warn("No broker was found available for topic {}", 
topicName);
+                    completeLookupResponseExceptionally(asyncResponse,
+                            new 
WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
                     return;
                 }
-                if (log.isDebugEnabled()) {
-                    log.debug("Redirect lookup for topic {} to {}", topicName, 
redirect);
-                }
-                completeLookupResponseExceptionally(asyncResponse,
-                        new 
WebApplicationException(Response.temporaryRedirect(redirect).build()));
 
-            } else {
-                // Found broker owning the topic
-                if (log.isDebugEnabled()) {
-                    log.debug("Lookup succeeded for topic {} -- broker: {}", 
topicName, result.getLookupData());
+                LookupResult result = optionalResult.get();
+                // We have found either a broker that owns the topic, or a 
broker to
+                // which we should redirect the client to
+                if (result.isRedirect()) {
+                    boolean newAuthoritative = 
result.isAuthoritativeRedirect();
+                    URI redirect;
+                    try {
+                        String redirectUrl = isRequestHttps() ? 
result.getLookupData().getHttpUrlTls()
+                                : result.getLookupData().getHttpUrl();
+                        checkNotNull(redirectUrl, "Redirected cluster's 
service url is not configured");
+                        String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 
: LOOKUP_PATH_V1;
+                        String path = String.format("%s%s%s?authoritative=%s",
+                                redirectUrl, lookupPath, 
topicName.getLookupName(), newAuthoritative);
+                        path = listenerName == null ? path : path + 
"&listenerName=" + listenerName;
+                        redirect = new URI(path);
+                    } catch (URISyntaxException | NullPointerException e) {
+                        log.error("Error in preparing redirect url for {}: 
{}", topicName, e.getMessage(), e);
+                        completeLookupResponseExceptionally(asyncResponse, e);
+                        return;
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("Redirect lookup for topic {} to {}", 
topicName, redirect);
+                    }
+                    completeLookupResponseExceptionally(asyncResponse,
+                            new 
WebApplicationException(Response.temporaryRedirect(redirect).build()));
+
+                } else {
+                    // Found broker owning the topic
+                    if (log.isDebugEnabled()) {
+                        log.debug("Lookup succeeded for topic {} -- broker: 
{}", topicName, result.getLookupData());
+                    }
+                    completeLookupResponseSuccessfully(asyncResponse, 
result.getLookupData());
                 }
-                completeLookupResponseSuccessfully(asyncResponse, 
result.getLookupData());
-            }
-        }).exceptionally(exception -> {
-            log.warn("Failed to lookup broker for topic {}: {}", topicName, 
exception.getMessage(), exception);
-            completeLookupResponseExceptionally(asyncResponse, exception);
-            return null;
+            }).exceptionally(exception -> {
+                log.warn("Failed to lookup broker for topic {}: {}", 
topicName, exception.getMessage(), exception);
+                completeLookupResponseExceptionally(asyncResponse, exception);
+                return null;
+            });
         });

Review comment:
       which completableFuture do you mean?
    The `lookupFuture` already has an exceptionally




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to