This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a080aebe015f18a1cbf94be50a57105bd096cade Author: fengyubiao <[email protected]> AuthorDate: Sun Feb 18 15:46:52 2024 +0800 [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995) (cherry picked from commit 48b4481969cb6028186a7a84b8be8af90770674b) --- .../apache/pulsar/broker/admin/AdminResource.java | 11 +++ .../broker/admin/impl/PersistentTopicsBase.java | 87 +++++++++++----------- .../broker/admin/v2/NonPersistentTopics.java | 4 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++-- .../pulsar/broker/admin/v3/Transactions.java | 16 ++++ 5 files changed, 80 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 30526a0787d..f64e1d94507 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -798,6 +798,17 @@ public abstract class AdminResource extends PulsarWebResource { == Status.TEMPORARY_REDIRECT.getStatusCode(); } + protected static boolean isNotFoundException(Throwable ex) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + return realCause instanceof WebApplicationException + && ((WebApplicationException) realCause).getResponse().getStatus() + == Status.NOT_FOUND.getStatusCode(); + } + + protected static boolean isNot307And404Exception(Throwable ex) { + return !isRedirectException(ex) && !isNotFoundException(ex); + } + protected static String getTopicNotFoundErrorMessage(String topic) { return String.format("Topic %s not found", topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ed3896b1456..82711096701 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -838,7 +838,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned metadata while unloading topic {}", clientAppId(), topicName, ex); } @@ -848,7 +848,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace ownership while unloading topic {}", clientAppId(), topicName, ex); } @@ -1058,7 +1058,7 @@ public class PersistentTopicsBase extends AdminResource { })) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1080,7 +1080,7 @@ public class PersistentTopicsBase extends AdminResource { })) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to unload tc {},{}", clientAppId(), topicName.getPartitionIndex(), ex); } @@ -1206,7 +1206,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned topic metadata while get" + " subscriptions for topic {}", clientAppId(), topicName, ex); } @@ -1216,7 +1216,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace/topic ownership while get subscriptions" + " for topic {}", clientAppId(), topicName, ex); } @@ -1225,7 +1225,7 @@ public class PersistentTopicsBase extends AdminResource { }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get subscriptions for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1264,7 +1264,7 @@ public class PersistentTopicsBase extends AdminResource { .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys()))) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1376,7 +1376,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned metadata while get managed info for {}", clientAppId(), topicName, ex); } @@ -1386,7 +1386,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace ownership while get managed info for {}", clientAppId(), topicName, ex); } @@ -1488,7 +1488,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1542,7 +1542,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1656,8 +1656,9 @@ public class PersistentTopicsBase extends AdminResource { "Subscription has active connected consumers")); } else { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause); + if (!isNot307And404Exception(ex)) { + log.error("[{}] Failed to analyze subscription backlog {} {}", + clientAppId(), topicName, subName, cause); } asyncResponse.resume(new RestException(cause)); } @@ -1683,7 +1684,7 @@ public class PersistentTopicsBase extends AdminResource { }).exceptionally(ex -> { Throwable cause = ex.getCause(); // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); } asyncResponse.resume(new RestException(cause)); @@ -1850,7 +1851,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex); } @@ -1893,7 +1894,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex); } @@ -1955,7 +1956,7 @@ public class PersistentTopicsBase extends AdminResource { }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to skip {} messages {} {}", clientAppId(), numMessages, topicName, subName, ex); } @@ -2022,7 +2023,7 @@ public class PersistentTopicsBase extends AdminResource { ) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex); } @@ -2085,7 +2086,7 @@ public class PersistentTopicsBase extends AdminResource { }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, ex); } @@ -2290,7 +2291,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex); } @@ -2300,7 +2301,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex); } @@ -2431,7 +2432,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to update subscription {} from topic {}", clientAppId(), subName, topicName, ex); } @@ -2512,7 +2513,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId, ex.getCause()); } @@ -2521,7 +2522,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId, ex.getCause()); } @@ -3069,7 +3070,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get backlog size for topic {}", clientAppId(), topicName, ex); } @@ -3079,7 +3080,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get backlog size for topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3087,7 +3088,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate global namespace ownership to get backlog size for topic " + "{}", clientAppId(), topicName, ex); } @@ -3599,7 +3600,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3607,7 +3608,7 @@ public class PersistentTopicsBase extends AdminResource { }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3688,7 +3689,7 @@ public class PersistentTopicsBase extends AdminResource { ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, ex); } @@ -3798,7 +3799,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", clientAppId(), topicName, subName, messageId, ex); } @@ -3953,7 +3954,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3962,7 +3963,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate global namespace ownership to trigger compaction on topic {}", clientAppId(), topicName, ex); } @@ -3991,7 +3992,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to trigger compaction for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4028,7 +4029,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to trigger offload for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4045,7 +4046,7 @@ public class PersistentTopicsBase extends AdminResource { asyncResponse.resume(offloadProcessStatus); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to offload status on topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4518,7 +4519,7 @@ public class PersistentTopicsBase extends AdminResource { }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get last messageId {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4779,9 +4780,7 @@ public class PersistentTopicsBase extends AdminResource { protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { Throwable cause = thr.getCause(); - if (!(cause instanceof WebApplicationException) || !( - ((WebApplicationException) cause).getResponse().getStatus() == 307 - || ((WebApplicationException) cause).getResponse().getStatus() == 404)) { + if (isNot307And404Exception(cause)) { log.error("[{}] Failed to perform {} on topic {}", clientAppId(), methodName, topicName, cause); } @@ -4943,7 +4942,7 @@ public class PersistentTopicsBase extends AdminResource { resultFuture.exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, subName, ex); } @@ -4988,7 +4987,7 @@ public class PersistentTopicsBase extends AdminResource { } ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to set replicated subscription status on {} {}", clientAppId(), topicName, subName, ex); } @@ -5090,7 +5089,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(), topicName, subName, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 854f76c3abf..b75ec765796 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -498,7 +498,7 @@ public class NonPersistentTopics extends PersistentTopics { } asyncResponse.resume(topicList); } catch (Exception e) { - if (!isRedirectException(e)) { + if (!isNot307And404Exception(e)) { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); } @@ -506,7 +506,7 @@ public class NonPersistentTopics extends PersistentTopics { } } }).exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 71c9dd64732..e2cfc86e8fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -105,7 +105,7 @@ public class PersistentTopics extends PersistentTopicsBase { internalGetListAsync(Optional.ofNullable(bundle)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -907,7 +907,7 @@ public class PersistentTopics extends PersistentTopicsBase { internalGetPropertiesAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1704,7 +1704,7 @@ public class PersistentTopics extends PersistentTopicsBase { .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, ex); } @@ -1748,9 +1748,11 @@ public class PersistentTopics extends PersistentTopicsBase { } }) .exceptionally(ex -> { - log.error("[{}] Failed to get message ID by timestamp {} from {}", - clientAppId(), timestamp, topicName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); + if (isNot307And404Exception(ex)) { + log.error("[{}] Failed to get message ID by timestamp {} from {}", + clientAppId(), timestamp, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + } return null; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 9cb825b9f8e..a0faeb65459 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -90,6 +90,10 @@ public class Transactions extends TransactionsBase { Long.parseLong(leastSigBits)) .thenAccept(stat -> asyncResponse.resume(stat)) .exceptionally(ex -> { + if (!isNot307And404Exception(ex)) { + log.error("[{}] Failed to get transaction state in transaction buffer {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -124,6 +128,10 @@ public class Transactions extends TransactionsBase { Long.parseLong(leastSigBits), subName) .thenAccept(stat -> asyncResponse.resume(stat)) .exceptionally(ex -> { + if (!isNot307And404Exception(ex)) { + log.error("[{}] Failed to get transaction state in pending ack {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -154,6 +162,10 @@ public class Transactions extends TransactionsBase { internalGetTransactionBufferStats(authoritative) .thenAccept(stat -> asyncResponse.resume(stat)) .exceptionally(ex -> { + if (!isNot307And404Exception(ex)) { + log.error("[{}] Failed to get transaction buffer stats in topic {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -185,6 +197,10 @@ public class Transactions extends TransactionsBase { internalGetPendingAckStats(authoritative, subName) .thenAccept(stats -> asyncResponse.resume(stats)) .exceptionally(ex -> { + if (!isNot307And404Exception(ex)) { + log.error("[{}] Failed to get transaction pending ack stats in topic {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; });
