315157973 commented on code in PR #21211:
URL: https://github.com/apache/pulsar/pull/21211#discussion_r1335095861
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -318,35 +318,37 @@ public static CompletableFuture<ByteBuf>
lookupTopicAsync(PulsarService pulsarSe
requestId,
shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause()
instanceof IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(),
ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- }
- lookupfuture.complete(
-
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(),
requestId));
- return null;
- });
+ handleLookupError(lookupfuture,
topicName.toString(), clientAppId, requestId, ex);
+ return null;
+ });
}
-
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause() instanceof
IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getMessage(), ex);
- }
-
-
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
+ handleLookupError(lookupfuture, topicName.toString(), clientAppId,
requestId, ex);
return null;
});
return lookupfuture;
}
+ private static void handleLookupError(CompletableFuture<ByteBuf>
lookupFuture, String topicName, String clientAppId,
+ long requestId, Throwable ex){
+ final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ final String errorMsg = unwrapEx.getMessage();
+ if (unwrapEx instanceof IllegalStateException) {
+ // Current broker still hold the bundle's lock, but the bundle is
being unloading.
+ log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
Review Comment:
There may be a side effect here.
Because the connection can be reset, the previous producer could fail fast
when bundle unloaded and move to a new Broker.
This now causes each partition's producer to have to wait for a timeout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]