aloyszhang commented on a change in pull request #13055:
URL: https://github.com/apache/pulsar/pull/13055#discussion_r763814414
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
##########
@@ -80,55 +80,63 @@ protected void internalLookupTopicAsync(TopicName
topicName, boolean authoritati
return;
}
- CompletableFuture<Optional<LookupResult>> lookupFuture =
pulsar().getNamespaceService()
- .getBrokerServiceUrlAsync(topicName,
-
LookupOptions.builder().advertisedListenerName(listenerName)
-
.authoritative(authoritative).loadTopicsInBundle(false).build());
-
- lookupFuture.thenAccept(optionalResult -> {
- if (optionalResult == null || !optionalResult.isPresent()) {
- log.warn("No broker was found available for topic {}",
topicName);
- completeLookupResponseExceptionally(asyncResponse,
- new
WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+
pulsar().getNamespaceService().checkTopicExists(topicName).thenAccept(exist -> {
+ if (!exist &&
!pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)) {
+ completeLookupResponseExceptionally(asyncResponse, new
RestException(Response.Status.NOT_FOUND,
+ "Topic not found."));
return;
}
+ CompletableFuture<Optional<LookupResult>> lookupFuture =
pulsar().getNamespaceService()
+ .getBrokerServiceUrlAsync(topicName,
+
LookupOptions.builder().advertisedListenerName(listenerName)
+
.authoritative(authoritative).loadTopicsInBundle(false).build());
- LookupResult result = optionalResult.get();
- // We have found either a broker that owns the topic, or a broker
to which we should redirect the client to
- if (result.isRedirect()) {
- boolean newAuthoritative = result.isAuthoritativeRedirect();
- URI redirect;
- try {
- String redirectUrl = isRequestHttps() ?
result.getLookupData().getHttpUrlTls()
- : result.getLookupData().getHttpUrl();
- checkNotNull(redirectUrl, "Redirected cluster's service
url is not configured");
- String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 :
LOOKUP_PATH_V1;
- String path = String.format("%s%s%s?authoritative=%s",
- redirectUrl, lookupPath,
topicName.getLookupName(), newAuthoritative);
- path = listenerName == null ? path : path +
"&listenerName=" + listenerName;
- redirect = new URI(path);
- } catch (URISyntaxException | NullPointerException e) {
- log.error("Error in preparing redirect url for {}: {}",
topicName, e.getMessage(), e);
- completeLookupResponseExceptionally(asyncResponse, e);
+ lookupFuture.thenAccept(optionalResult -> {
+ if (optionalResult == null || !optionalResult.isPresent()) {
+ log.warn("No broker was found available for topic {}",
topicName);
+ completeLookupResponseExceptionally(asyncResponse,
+ new
WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}
- if (log.isDebugEnabled()) {
- log.debug("Redirect lookup for topic {} to {}", topicName,
redirect);
- }
- completeLookupResponseExceptionally(asyncResponse,
- new
WebApplicationException(Response.temporaryRedirect(redirect).build()));
- } else {
- // Found broker owning the topic
- if (log.isDebugEnabled()) {
- log.debug("Lookup succeeded for topic {} -- broker: {}",
topicName, result.getLookupData());
+ LookupResult result = optionalResult.get();
+ // We have found either a broker that owns the topic, or a
broker to
+ // which we should redirect the client to
+ if (result.isRedirect()) {
+ boolean newAuthoritative =
result.isAuthoritativeRedirect();
+ URI redirect;
+ try {
+ String redirectUrl = isRequestHttps() ?
result.getLookupData().getHttpUrlTls()
+ : result.getLookupData().getHttpUrl();
+ checkNotNull(redirectUrl, "Redirected cluster's
service url is not configured");
+ String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2
: LOOKUP_PATH_V1;
+ String path = String.format("%s%s%s?authoritative=%s",
+ redirectUrl, lookupPath,
topicName.getLookupName(), newAuthoritative);
+ path = listenerName == null ? path : path +
"&listenerName=" + listenerName;
+ redirect = new URI(path);
+ } catch (URISyntaxException | NullPointerException e) {
+ log.error("Error in preparing redirect url for {}:
{}", topicName, e.getMessage(), e);
+ completeLookupResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Redirect lookup for topic {} to {}",
topicName, redirect);
+ }
+ completeLookupResponseExceptionally(asyncResponse,
+ new
WebApplicationException(Response.temporaryRedirect(redirect).build()));
+
+ } else {
+ // Found broker owning the topic
+ if (log.isDebugEnabled()) {
+ log.debug("Lookup succeeded for topic {} -- broker:
{}", topicName, result.getLookupData());
+ }
+ completeLookupResponseSuccessfully(asyncResponse,
result.getLookupData());
}
- completeLookupResponseSuccessfully(asyncResponse,
result.getLookupData());
- }
- }).exceptionally(exception -> {
- log.warn("Failed to lookup broker for topic {}: {}", topicName,
exception.getMessage(), exception);
- completeLookupResponseExceptionally(asyncResponse, exception);
- return null;
+ }).exceptionally(exception -> {
+ log.warn("Failed to lookup broker for topic {}: {}",
topicName, exception.getMessage(), exception);
+ completeLookupResponseExceptionally(asyncResponse, exception);
+ return null;
+ });
});
Review comment:
I got it, you mean the `checkTopicExists` may throw an exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]