This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 142440c [Management] Optimize ``BrokerBase`` log. (#14258)
142440c is described below
commit 142440c2034688040036db9621f86f33c95438f4
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Feb 14 11:09:34 2022 +0800
[Management] Optimize ``BrokerBase`` log. (#14258)
* Optimize ``BrokerBase`` log.
Motivation
When a user calls the admin rest API resulting in an exception, we need to
add an error log,
it is very helpful for the user to understand what happened.
Modification
- Add some logs for ``BrokerBase`` method.
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 +++++++++-
.../pulsar/broker/admin/impl/BrokersBase.java | 33 ++++++++++------------
2 files changed, 29 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 924c943..300c89f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -851,4 +851,17 @@ public abstract class AdminResource extends
PulsarWebResource {
persistence.getBookkeeperAckQuorum()));
}
-}
+
+ /**
+ * Check current exception whether is redirect exception.
+ *
+ * @param ex The throwable.
+ * @return Whether is redirect exception
+ */
+ protected static boolean isRedirectException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return realCause instanceof WebApplicationException
+ && ((WebApplicationException)
realCause).getResponse().getStatus()
+ == Status.TEMPORARY_REDIRECT.getStatusCode();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5c6350b..4530d8c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -38,7 +38,6 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
@@ -92,14 +91,15 @@ public class BrokersBase extends AdminResource {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ ->
pulsar().getLoadManager().get().getAvailableBrokersAsync())
- .thenAccept(asyncResponse::resume)
- .exceptionally(ex -> {
- Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
- if (realCause instanceof WebApplicationException) {
- asyncResponse.resume(realCause);
- } else {
- asyncResponse.resume(new RestException(realCause));
+ .thenAccept(activeBrokers -> {
+ LOG.info("[{}] Successfully to get active brokers,
cluster={}", clientAppId(), cluster);
+ asyncResponse.resume(activeBrokers);
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need to
log it.
+ if (!isRedirectException(ex)) {
+ LOG.error("[{}] Fail to get active brokers,
cluster={}", clientAppId(), cluster, ex);
}
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@@ -119,16 +119,12 @@ public class BrokersBase extends AdminResource {
LeaderBroker leaderBroker =
pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new
RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo =
BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
+ LOG.info("[{}] Successfully to get the information of the
leader broker.", clientAppId());
asyncResponse.resume(brokerInfo);
})
.exceptionally(ex -> {
- Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
- LOG.error("[{}] Failed to get the information of the
leader broker.", clientAppId(), realCause);
- if (realCause instanceof WebApplicationException) {
- asyncResponse.resume(realCause);
- } else {
- asyncResponse.resume(new RestException(realCause));
- }
+ LOG.error("[{}] Failed to get the information of the
leader broker.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@@ -268,7 +264,7 @@ public class BrokersBase extends AdminResource {
});
} else {
return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
- "Cannot update non-dynamic configuration"));
+ "Can't update non-dynamic configuration"));
}
}
@@ -333,7 +329,8 @@ public class BrokersBase extends AdminResource {
asyncResponse.resume("ok");
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(),
ex);
- return handleCommonRestAsyncException(asyncResponse, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
});
}
@@ -427,7 +424,7 @@ public class BrokersBase extends AdminResource {
value = "if the value absent(value=0) means no concurrent
limitation.")
@QueryParam("maxConcurrentUnloadPerSec") int
maxConcurrentUnloadPerSec,
@QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean
forcedTerminateTopic
- ) throws Exception {
+ ) {
validateSuperUserAccess();
doShutDownBrokerGracefully(maxConcurrentUnloadPerSec,
forcedTerminateTopic);
}