This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0fbb7fd1b2a [improve] [broker] Do not print an Error log when
responding to `HTTP-404` when calling `Admin API` and the topic does not exist.
(#21995)
0fbb7fd1b2a is described below
commit 0fbb7fd1b2ab15d00ca248c80a90edce4365cb8e
Author: fengyubiao <[email protected]>
AuthorDate: Sun Feb 18 15:46:52 2024 +0800
[improve] [broker] Do not print an Error log when responding to `HTTP-404`
when calling `Admin API` and the topic does not exist. (#21995)
---
.../apache/pulsar/broker/admin/AdminResource.java | 4 +
.../broker/admin/impl/PersistentTopicsBase.java | 88 +++++++++++-----------
.../broker/admin/impl/SchemasResourceBase.java | 2 +-
.../broker/admin/v2/NonPersistentTopics.java | 6 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 36 ++++-----
.../pulsar/broker/admin/v3/Transactions.java | 12 +--
6 files changed, 75 insertions(+), 73 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1526ae18a90..2ceec189975 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -834,6 +834,10 @@ public abstract class AdminResource extends
PulsarWebResource {
== Status.NOT_FOUND.getStatusCode();
}
+ protected static boolean isNot307And404Exception(Throwable ex) {
+ return !isRedirectException(ex) && !isNotFoundException(ex);
+ }
+
protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 379d6675b57..0cdb140c7c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -874,7 +874,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we
need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned
metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -884,7 +884,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace
ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -1052,7 +1052,7 @@ public class PersistentTopicsBase extends AdminResource {
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to unload topic {}, {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1074,7 +1074,7 @@ public class PersistentTopicsBase extends AdminResource {
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to unload tc {},{}",
clientAppId(),
topicName.getPartitionIndex(), ex);
}
@@ -1176,7 +1176,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we
need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned
topic metadata while get"
+ " subscriptions for topic {}",
clientAppId(), topicName, ex);
}
@@ -1186,7 +1186,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global
namespace/topic ownership while get subscriptions"
+ " for topic {}", clientAppId(), topicName,
ex);
}
@@ -1195,7 +1195,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get subscriptions for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1234,7 +1234,7 @@ public class PersistentTopicsBase extends AdminResource {
.thenAccept(topic -> asyncResponse.resume(new
ArrayList<>(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get list of subscriptions
for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1343,7 +1343,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned metadata
while get managed info for {}",
clientAppId(), topicName, ex);
}
@@ -1353,7 +1353,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace
ownership while get managed info for {}",
clientAppId(), topicName, ex);
}
@@ -1472,7 +1472,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned internal stats for
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1527,7 +1527,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned internal stats for
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1655,7 +1655,7 @@ public class PersistentTopicsBase extends AdminResource {
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to analyze subscription backlog
{} {}",
clientAppId(), topicName, subName, cause);
}
@@ -1682,7 +1682,7 @@ public class PersistentTopicsBase extends AdminResource {
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} {}",
clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
@@ -1711,7 +1711,7 @@ public class PersistentTopicsBase extends AdminResource {
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} {}",
clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
@@ -1880,7 +1880,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -1924,7 +1924,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -1988,7 +1988,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip {} messages {} {}",
clientAppId(), numMessages, topicName,
subName, ex);
}
@@ -2058,7 +2058,7 @@ public class PersistentTopicsBase extends AdminResource {
)
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages for all subscription
on topic {}", clientAppId(), topicName,
ex);
}
@@ -2125,7 +2125,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages for all subscription
up to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, ex);
}
@@ -2332,7 +2332,7 @@ public class PersistentTopicsBase extends AdminResource {
})).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to create subscription {} on
topic {}",
clientAppId(), subscriptionName, topicName,
ex);
}
@@ -2342,7 +2342,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
clientAppId(), subscriptionName, topicName, ex);
}
@@ -2473,7 +2473,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} from topic
{}",
clientAppId(), subName, topicName, ex);
}
@@ -2513,7 +2513,7 @@ public class PersistentTopicsBase extends AdminResource {
})
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to analyze back log of
subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -2598,7 +2598,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} from topic
{}",
clientAppId(), subName, topicName, ex);
}
@@ -2684,7 +2684,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need
to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}][{}] Failed to reset cursor on
subscription {} to position {}",
clientAppId(), topicName, subName,
messageId, ex.getCause());
}
@@ -2693,7 +2693,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}][{}] Failed to reset cursor on subscription {}
to position {}",
clientAppId(), topicName, subName, messageId,
ex.getCause());
}
@@ -3329,7 +3329,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get backlog size for topic
{}", clientAppId(),
topicName, ex);
}
@@ -3337,7 +3337,7 @@ public class PersistentTopicsBase extends AdminResource {
return null;
})).exceptionally(ex -> {
// If the exception is not redirect exception we need
to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate global
namespace ownership "
+ "to get backlog size for topic {}",
clientAppId(), topicName, ex);
}
@@ -3875,7 +3875,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3883,7 +3883,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(),
topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3973,7 +3973,7 @@ public class PersistentTopicsBase extends AdminResource {
).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(cause)) {
+ if (!isNot307And404Exception(cause)) {
if (cause instanceof RestException) {
log.warn("[{}] Failed to expire messages up to {} on {}:
{}", clientAppId(), expireTimeInSeconds,
topicName, cause.toString());
@@ -4088,7 +4088,7 @@ public class PersistentTopicsBase extends AdminResource {
messageId, isExcluded, batchIndex);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages up to {} on
subscription {} to position {}",
clientAppId(), topicName, subName, messageId,
ex);
}
@@ -4238,7 +4238,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger compaction on topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4247,7 +4247,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate global namespace ownership
to trigger compaction on topic {}",
clientAppId(), topicName, ex);
}
@@ -4276,7 +4276,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger compaction for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4312,7 +4312,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger offload for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4329,7 +4329,7 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(offloadProcessStatus);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to offload status on topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4606,7 +4606,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get last messageId {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4984,9 +4984,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void handleTopicPolicyException(String methodName, Throwable
thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
- if (!(cause instanceof WebApplicationException) || !(
- ((WebApplicationException) cause).getResponse().getStatus() ==
307
- || ((WebApplicationException)
cause).getResponse().getStatus() == 404)) {
+ if (isNot307And404Exception(cause)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
@@ -5112,7 +5110,7 @@ public class PersistentTopicsBase extends AdminResource {
resultFuture.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}] Failed to change replicated subscription status
to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
}
@@ -5159,7 +5157,7 @@ public class PersistentTopicsBase extends AdminResource {
}
).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to set replicated subscription
status on {} {}", clientAppId(),
topicName, subName, ex);
}
@@ -5260,7 +5258,7 @@ public class PersistentTopicsBase extends AdminResource {
}
resultFuture.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get replicated subscription status
on {} {}", clientAppId(),
topicName, subName, ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 454b8f0fac8..286366c8b58 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -238,7 +238,7 @@ public class SchemasResourceBase extends AdminResource {
protected boolean shouldPrintErrorLog(Throwable ex) {
- return !isRedirectException(ex) && !isNotFoundException(ex);
+ return isNot307And404Exception(ex);
}
private static final Logger log =
LoggerFactory.getLogger(SchemasResourceBase.class);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index d4795393f9b..a22ad4b242f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -132,7 +132,7 @@ public class NonPersistentTopics extends PersistentTopics {
})
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -479,7 +479,7 @@ public class NonPersistentTopics extends PersistentTopics {
}
asyncResponse.resume(topicList);
}).exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to list topics on
namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, ex);
}
@@ -488,7 +488,7 @@ public class NonPersistentTopics extends PersistentTopics {
});
}
}).exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to list topics on namespace bundle
{}/{}", clientAppId(),
namespaceName, bundleRange, ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9ccbc0ecba1..f0f42a8ff4a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -119,7 +119,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalGetListAsync(Optional.ofNullable(bundle))
.thenAccept(topicList ->
asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic list {}",
clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -150,7 +150,7 @@ public class PersistentTopics extends PersistentTopicsBase {
.thenAccept(partitionedTopicList -> asyncResponse.resume(
filterSystemTopic(partitionedTopicList,
includeSystemTopic)))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned topic list
{}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -335,7 +335,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalCreateNonPartitionedTopicAsync(authoritative, properties)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to create non-partitioned topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -825,7 +825,7 @@ public class PersistentTopics extends PersistentTopicsBase {
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}][{}] Failed to update partition to {}",
clientAppId(), topicName, numPartitions, ex);
}
@@ -934,7 +934,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalGetPropertiesAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic {} properties",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -970,7 +970,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to update topic {} properties",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1004,7 +1004,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to remove key {} in properties
on topic {}",
clientAppId(), key, topicName, ex);
}
@@ -1125,7 +1125,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
} else if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
- } else if (!isRedirectException(ex)) {
+ } else if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to delete topic {}",
clientAppId(), topicName, t);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1209,7 +1209,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get stats for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1243,7 +1243,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1892,7 +1892,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalPeekNthMessageAsync(decode(encodedSubName), messagePosition,
authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get peek nth message for
topic {} subscription {}", clientAppId(),
topicName, decode(encodedSubName), ex);
}
@@ -1934,7 +1934,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalExamineMessageAsync(initialPosition, messagePosition,
authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to examine a specific message
on the topic {}", clientAppId(), topicName,
ex);
}
@@ -1976,7 +1976,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message with ledgerId {}
entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName,
ex);
}
@@ -2020,7 +2020,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
}
})
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message ID by timestamp
{} from {}",
clientAppId(), timestamp, topicName, ex);
}
@@ -2055,7 +2055,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
log.warn("[{}] Failed to get topic backlog {}:
Namespace does not exist", clientAppId(),
namespaceName);
ex = new RestException(Response.Status.NOT_FOUND,
"Namespace does not exist");
- } else if (!isRedirectException(ex)) {
+ } else if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get estimated backlog for
topic {}", clientAppId(), encodedTopic, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3092,7 +3092,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalTerminateAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminated topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3188,7 +3188,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalCompactionStatusAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get the status of a
compaction operation for the topic {}",
clientAppId(), topicName, ex);
}
@@ -3327,7 +3327,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
validateTopicName(tenant, namespace, encodedTopic);
internalTrimTopic(asyncResponse, authoritative).exceptionally(ex
-> {
// If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to trim topic {}", clientAppId(),
topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 1bdc2255085..667d8ce581e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -105,7 +105,7 @@ public class Transactions extends TransactionsBase {
Long.parseLong(leastSigBits))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction state in
transaction buffer {}",
clientAppId(), topicName, ex);
}
@@ -143,7 +143,7 @@ public class Transactions extends TransactionsBase {
Long.parseLong(leastSigBits), subName)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction state in
pending ack {}",
clientAppId(), topicName, ex);
}
@@ -181,7 +181,7 @@ public class Transactions extends TransactionsBase {
internalGetTransactionBufferStats(authoritative, lowWaterMarks,
segmentStats)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction buffer
stats in topic {}",
clientAppId(), topicName, ex);
}
@@ -217,7 +217,7 @@ public class Transactions extends TransactionsBase {
internalGetPendingAckStats(authoritative, subName, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction pending
ack stats in topic {}",
clientAppId(), topicName, ex);
}
@@ -314,7 +314,7 @@ public class Transactions extends TransactionsBase {
internalGetPendingAckInternalStats(authoritative, subName,
metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get pending ack internal
stats {}",
clientAppId(), topicName, ex);
}
@@ -365,7 +365,7 @@ public class Transactions extends TransactionsBase {
internalGetTransactionBufferInternalStats(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction buffer
internal stats {}",
clientAppId(), topicName, ex);
}