heesung-sn commented on code in PR #21211:
URL: https://github.com/apache/pulsar/pull/21211#discussion_r1332086204
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -318,35 +318,37 @@ public static CompletableFuture<ByteBuf>
lookupTopicAsync(PulsarService pulsarSe
requestId,
shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause()
instanceof IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(),
ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- }
- lookupfuture.complete(
-
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(),
requestId));
- return null;
- });
+ handleLookupError(lookupfuture,
topicName.toString(), clientAppId, requestId, ex);
+ return null;
+ });
}
-
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause() instanceof
IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getMessage(), ex);
- }
-
-
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
+ handleLookupError(lookupfuture, topicName.toString(), clientAppId,
requestId, ex);
return null;
});
return lookupfuture;
}
+ private static void handleLookupError(CompletableFuture<ByteBuf>
lookupFuture, String topicName, String clientAppId,
+ long requestId, Throwable ex){
+ final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ final String errorMsg = unwrapEx.getMessage();
+ if (unwrapEx instanceof IllegalStateException) {
+ // Current broker still hold the bundle's lock, but the bundle is
being unloading.
+ log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
Review Comment:
how do we know `IllegalStateException` is always `MetadataError`?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -318,35 +318,37 @@ public static CompletableFuture<ByteBuf>
lookupTopicAsync(PulsarService pulsarSe
requestId,
shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause()
instanceof IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(),
ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error
{}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- }
- lookupfuture.complete(
-
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(),
requestId));
- return null;
- });
+ handleLookupError(lookupfuture,
topicName.toString(), clientAppId, requestId, ex);
+ return null;
+ });
}
-
}).exceptionally(ex -> {
- if (ex instanceof CompletionException && ex.getCause() instanceof
IllegalStateException) {
- log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getCause().getMessage());
- } else {
- log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName.toString(),
- ex.getMessage(), ex);
- }
-
-
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
+ handleLookupError(lookupfuture, topicName.toString(), clientAppId,
requestId, ex);
return null;
});
return lookupfuture;
}
+ private static void handleLookupError(CompletableFuture<ByteBuf>
lookupFuture, String topicName, String clientAppId,
+ long requestId, Throwable ex){
+ final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
+ final String errorMsg = unwrapEx.getMessage();
+ if (unwrapEx instanceof IllegalStateException) {
+ // Current broker still hold the bundle's lock, but the bundle is
being unloading.
+ log.info("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
+ } else if (unwrapEx instanceof MetadataStoreException){
+ // Load bundle ownership or acquire lock failed.
+ // Differ with "IllegalStateException", print warning log.
+ log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
errorMsg, requestId));
+ } else {
+ log.warn("Failed to lookup {} for topic {} with error {}",
clientAppId, topicName, errorMsg);
+
lookupFuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
errorMsg, requestId));
Review Comment:
why do we need to return `ServiceNotReady`?
why not `UnknownError`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]