poorbarcode commented on code in PR #21211:
URL: https://github.com/apache/pulsar/pull/21211#discussion_r1335207040
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -318,35 +318,37 @@ public static CompletableFuture<ByteBuf>
lookupTopicAsync(PulsarService pulsarSe
requestId,
shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause()
instanceof IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(),
ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- }
- lookupfuture.complete(
-
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(),
requestId));
- return null;
- });
+ handleLookupError(lookupfuture,
topicName.toString(), clientAppId, requestId, ex);
+ return null;
+ });
}
-
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause() instanceof
IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getMessage(), ex);
- }
-
-
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
+ handleLookupError(lookupfuture, topicName.toString(), clientAppId,
requestId, ex);
return null;
});
return lookupfuture;
}
+ private static void handleLookupError(CompletableFuture<ByteBuf>
lookupFuture, String topicName, String clientAppId,
+ long requestId, Throwable ex){
+ final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ final String errorMsg = unwrapEx.getMessage();
+ if (unwrapEx instanceof IllegalStateException) {
+ // Current broker still hold the bundle's lock, but the bundle is
being unloading.
+ log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
Review Comment:
> Because the connection can be reset, the previous producer could fail fast
when bundle unloaded and move to a new Broker.
The previous producer will finally receive a `CommandCloseProducer` and try
to reconnect even if the topic is closed without waiting client to disconnect,
right?
> This now causes each partition's producer to have to wait for a timeout.
The partition's producer will try to reconnect according to `backoff`'s
rules, which will not result in a timeout.
I also improve the test to ensure the producer and consumer are still
working, see `testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx`. Could
you explain the details?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]