This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 48b4481969c [improve] [broker] Do not print an Error log when
responding to `HTTP-404` when calling `Admin API` and the topic does not exist.
(#21995)
48b4481969c is described below
commit 48b4481969cb6028186a7a84b8be8af90770674b
Author: fengyubiao <[email protected]>
AuthorDate: Sun Feb 18 15:46:52 2024 +0800
[improve] [broker] Do not print an Error log when responding to `HTTP-404`
when calling `Admin API` and the topic does not exist. (#21995)
---
.../apache/pulsar/broker/admin/AdminResource.java | 4 +
.../broker/admin/impl/PersistentTopicsBase.java | 88 +++++++++++-----------
.../broker/admin/impl/SchemasResourceBase.java | 2 +-
.../broker/admin/v2/NonPersistentTopics.java | 6 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 36 ++++-----
.../pulsar/broker/admin/v3/Transactions.java | 12 +--
6 files changed, 75 insertions(+), 73 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 54bc0077103..8eba6cc7b05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -834,6 +834,10 @@ public abstract class AdminResource extends
PulsarWebResource {
== Status.NOT_FOUND.getStatusCode();
}
+ protected static boolean isNot307And404Exception(Throwable ex) {
+ return !isRedirectException(ex) && !isNotFoundException(ex);
+ }
+
protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1731d4c73db..638a12d3b97 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -876,7 +876,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we
need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned
metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -886,7 +886,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace
ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -1057,7 +1057,7 @@ public class PersistentTopicsBase extends AdminResource {
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to unload topic {}, {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1079,7 +1079,7 @@ public class PersistentTopicsBase extends AdminResource {
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to unload tc {},{}",
clientAppId(),
topicName.getPartitionIndex(), ex);
}
@@ -1181,7 +1181,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we
need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned
topic metadata while get"
+ " subscriptions for topic {}",
clientAppId(), topicName, ex);
}
@@ -1191,7 +1191,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global
namespace/topic ownership while get subscriptions"
+ " for topic {}", clientAppId(), topicName,
ex);
}
@@ -1200,7 +1200,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get subscriptions for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1239,7 +1239,7 @@ public class PersistentTopicsBase extends AdminResource {
.thenAccept(topic -> asyncResponse.resume(new
ArrayList<>(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get list of subscriptions
for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1348,7 +1348,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned metadata
while get managed info for {}",
clientAppId(), topicName, ex);
}
@@ -1358,7 +1358,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace
ownership while get managed info for {}",
clientAppId(), topicName, ex);
}
@@ -1477,7 +1477,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned internal stats for
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1532,7 +1532,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned internal stats for
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1660,7 +1660,7 @@ public class PersistentTopicsBase extends AdminResource {
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to analyze subscription backlog
{} {}",
clientAppId(), topicName, subName, cause);
}
@@ -1687,7 +1687,7 @@ public class PersistentTopicsBase extends AdminResource {
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} {}",
clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
@@ -1716,7 +1716,7 @@ public class PersistentTopicsBase extends AdminResource {
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} {}",
clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
@@ -1885,7 +1885,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -1929,7 +1929,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip all messages for
subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -1993,7 +1993,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip {} messages {} {}",
clientAppId(), numMessages, topicName,
subName, ex);
}
@@ -2063,7 +2063,7 @@ public class PersistentTopicsBase extends AdminResource {
)
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages for all subscription
on topic {}", clientAppId(), topicName,
ex);
}
@@ -2130,7 +2130,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages for all subscription
up to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, ex);
}
@@ -2337,7 +2337,7 @@ public class PersistentTopicsBase extends AdminResource {
})).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to create subscription {} on
topic {}",
clientAppId(), subscriptionName, topicName,
ex);
}
@@ -2347,7 +2347,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
clientAppId(), subscriptionName, topicName, ex);
}
@@ -2478,7 +2478,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} from topic
{}",
clientAppId(), subName, topicName, ex);
}
@@ -2518,7 +2518,7 @@ public class PersistentTopicsBase extends AdminResource {
})
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to analyze back log of
subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -2603,7 +2603,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} from topic
{}",
clientAppId(), subName, topicName, ex);
}
@@ -2689,7 +2689,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need
to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}][{}] Failed to reset cursor on
subscription {} to position {}",
clientAppId(), topicName, subName,
messageId, ex.getCause());
}
@@ -2698,7 +2698,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}][{}] Failed to reset cursor on subscription {}
to position {}",
clientAppId(), topicName, subName, messageId,
ex.getCause());
}
@@ -3334,7 +3334,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get backlog size for topic
{}", clientAppId(),
topicName, ex);
}
@@ -3342,7 +3342,7 @@ public class PersistentTopicsBase extends AdminResource {
return null;
})).exceptionally(ex -> {
// If the exception is not redirect exception we need
to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate global
namespace ownership "
+ "to get backlog size for topic {}",
clientAppId(), topicName, ex);
}
@@ -3908,7 +3908,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3916,7 +3916,7 @@ public class PersistentTopicsBase extends AdminResource {
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(),
topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4006,7 +4006,7 @@ public class PersistentTopicsBase extends AdminResource {
).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(cause)) {
+ if (!isNot307And404Exception(cause)) {
if (cause instanceof RestException) {
log.warn("[{}] Failed to expire messages up to {} on {}:
{}", clientAppId(), expireTimeInSeconds,
topicName, cause.toString());
@@ -4121,7 +4121,7 @@ public class PersistentTopicsBase extends AdminResource {
messageId, isExcluded, batchIndex);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages up to {} on
subscription {} to position {}",
clientAppId(), topicName, subName, messageId,
ex);
}
@@ -4271,7 +4271,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger compaction on topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4280,7 +4280,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate global namespace ownership
to trigger compaction on topic {}",
clientAppId(), topicName, ex);
}
@@ -4309,7 +4309,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger compaction for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4345,7 +4345,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger offload for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4362,7 +4362,7 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(offloadProcessStatus);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to offload status on topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4639,7 +4639,7 @@ public class PersistentTopicsBase extends AdminResource {
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get last messageId {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -5017,9 +5017,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void handleTopicPolicyException(String methodName, Throwable
thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
- if (!(cause instanceof WebApplicationException) || !(
- ((WebApplicationException) cause).getResponse().getStatus() ==
307
- || ((WebApplicationException)
cause).getResponse().getStatus() == 404)) {
+ if (isNot307And404Exception(cause)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
@@ -5145,7 +5143,7 @@ public class PersistentTopicsBase extends AdminResource {
resultFuture.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}] Failed to change replicated subscription status
to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
}
@@ -5192,7 +5190,7 @@ public class PersistentTopicsBase extends AdminResource {
}
).exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to set replicated subscription
status on {} {}", clientAppId(),
topicName, subName, ex);
}
@@ -5293,7 +5291,7 @@ public class PersistentTopicsBase extends AdminResource {
}
resultFuture.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get replicated subscription status
on {} {}", clientAppId(),
topicName, subName, ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 454b8f0fac8..286366c8b58 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -238,7 +238,7 @@ public class SchemasResourceBase extends AdminResource {
protected boolean shouldPrintErrorLog(Throwable ex) {
- return !isRedirectException(ex) && !isNotFoundException(ex);
+ return isNot307And404Exception(ex);
}
private static final Logger log =
LoggerFactory.getLogger(SchemasResourceBase.class);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index d4795393f9b..a22ad4b242f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -132,7 +132,7 @@ public class NonPersistentTopics extends PersistentTopics {
})
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -479,7 +479,7 @@ public class NonPersistentTopics extends PersistentTopics {
}
asyncResponse.resume(topicList);
}).exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to list topics on
namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, ex);
}
@@ -488,7 +488,7 @@ public class NonPersistentTopics extends PersistentTopics {
});
}
}).exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to list topics on namespace bundle
{}/{}", clientAppId(),
namespaceName, bundleRange, ex);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 97531cf8ab0..32667fcf1eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -119,7 +119,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalGetListAsync(Optional.ofNullable(bundle))
.thenAccept(topicList ->
asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic list {}",
clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -150,7 +150,7 @@ public class PersistentTopics extends PersistentTopicsBase {
.thenAccept(partitionedTopicList -> asyncResponse.resume(
filterSystemTopic(partitionedTopicList,
includeSystemTopic)))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned topic list
{}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -335,7 +335,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalCreateNonPartitionedTopicAsync(authoritative, properties)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to create non-partitioned topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -825,7 +825,7 @@ public class PersistentTopics extends PersistentTopicsBase {
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}][{}] Failed to update partition to {}",
clientAppId(), topicName, numPartitions, ex);
}
@@ -934,7 +934,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalGetPropertiesAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic {} properties",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -970,7 +970,7 @@ public class PersistentTopics extends PersistentTopicsBase {
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to update topic {} properties",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1004,7 +1004,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to remove key {} in properties
on topic {}",
clientAppId(), key, topicName, ex);
}
@@ -1125,7 +1125,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
} else if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
- } else if (!isRedirectException(ex)) {
+ } else if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to delete topic {}",
clientAppId(), topicName, t);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1209,7 +1209,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get stats for {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1243,7 +1243,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic
{}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1892,7 +1892,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalPeekNthMessageAsync(decode(encodedSubName), messagePosition,
authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get peek nth message for
topic {} subscription {}", clientAppId(),
topicName, decode(encodedSubName), ex);
}
@@ -1934,7 +1934,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalExamineMessageAsync(initialPosition, messagePosition,
authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to examine a specific message
on the topic {}", clientAppId(), topicName,
ex);
}
@@ -1976,7 +1976,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message with ledgerId {}
entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName,
ex);
}
@@ -2020,7 +2020,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
}
})
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message ID by timestamp
{} from {}",
clientAppId(), timestamp, topicName, ex);
}
@@ -2055,7 +2055,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
log.warn("[{}] Failed to get topic backlog {}:
Namespace does not exist", clientAppId(),
namespaceName);
ex = new RestException(Response.Status.NOT_FOUND,
"Namespace does not exist");
- } else if (!isRedirectException(ex)) {
+ } else if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get estimated backlog for
topic {}", clientAppId(), encodedTopic, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3173,7 +3173,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalTerminateAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminated topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3269,7 +3269,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
internalCompactionStatusAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get the status of a
compaction operation for the topic {}",
clientAppId(), topicName, ex);
}
@@ -3408,7 +3408,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
validateTopicName(tenant, namespace, encodedTopic);
internalTrimTopic(asyncResponse, authoritative).exceptionally(ex
-> {
// If the exception is not redirect exception we need to log
it.
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to trim topic {}", clientAppId(),
topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 83ee03b2e4f..c2a54987ea2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -105,7 +105,7 @@ public class Transactions extends TransactionsBase {
Long.parseLong(leastSigBits))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction state in
transaction buffer {}",
clientAppId(), topicName, ex);
}
@@ -143,7 +143,7 @@ public class Transactions extends TransactionsBase {
Long.parseLong(leastSigBits), subName)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction state in
pending ack {}",
clientAppId(), topicName, ex);
}
@@ -181,7 +181,7 @@ public class Transactions extends TransactionsBase {
internalGetTransactionBufferStats(authoritative, lowWaterMarks,
segmentStats)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction buffer
stats in topic {}",
clientAppId(), topicName, ex);
}
@@ -217,7 +217,7 @@ public class Transactions extends TransactionsBase {
internalGetPendingAckStats(authoritative, subName, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction pending
ack stats in topic {}",
clientAppId(), topicName, ex);
}
@@ -314,7 +314,7 @@ public class Transactions extends TransactionsBase {
internalGetPendingAckInternalStats(authoritative, subName,
metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get pending ack internal
stats {}",
clientAppId(), topicName, ex);
}
@@ -365,7 +365,7 @@ public class Transactions extends TransactionsBase {
internalGetTransactionBufferInternalStats(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction buffer
internal stats {}",
clientAppId(), topicName, ex);
}