This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0d1fe1821e0 [refactor][broker] Suppress error logging when message
expiration fails (#19778)
0d1fe1821e0 is described below
commit 0d1fe1821e030c0c0b53a7d47a2bf89151783eb4
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Wed Apr 5 15:45:59 2023 +0900
[refactor][broker] Suppress error logging when message expiration fails
(#19778)
---
.../broker/admin/impl/PersistentTopicsBase.java | 85 +++++++++++++++-------
1 file changed, 59 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 035e32542ed..7347d6dbf20 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2098,11 +2098,15 @@ public class PersistentTopicsBase extends AdminResource
{
FutureUtil.waitForAll(futures).handle((result,
exception) -> {
if (exception != null) {
- Throwable t = exception.getCause();
- log.error("[{}] Failed to expire messages
up to {} on {}",
- clientAppId(), expireTimeInSeconds,
- topicName, t);
- asyncResponse.resume(new RestException(t));
+ Throwable t =
FutureUtil.unwrapCompletionException(exception);
+ if (t instanceof PulsarAdminException) {
+ log.warn("[{}] Failed to expire
messages up to {} on {}: {}", clientAppId(),
+ expireTimeInSeconds,
topicName, t.toString());
+ } else {
+ log.error("[{}] Failed to expire
messages up to {} on {}", clientAppId(),
+ expireTimeInSeconds,
topicName, t);
+ }
+
resumeAsyncResponseExceptionally(asyncResponse, t);
return null;
}
asyncResponse.resume(Response.noContent().build());
@@ -2169,9 +2173,14 @@ public class PersistentTopicsBase extends AdminResource {
FutureUtil.waitForAll(futures).handle((result, exception)
-> {
if (exception != null) {
Throwable throwable =
FutureUtil.unwrapCompletionException(exception);
- log.error("[{}] Failed to expire messages for all
subscription up to {} on {}",
- clientAppId(), expireTimeInSeconds,
topicName, throwable);
- asyncResponse.resume(new RestException(throwable));
+ if (throwable instanceof RestException) {
+ log.warn("[{}] Failed to expire messages for
all subscription up to {} on {}: {}",
+ clientAppId(), expireTimeInSeconds,
topicName, throwable.toString());
+ } else {
+ log.error("[{}] Failed to expire messages for
all subscription up to {} on {}",
+ clientAppId(), expireTimeInSeconds,
topicName, throwable);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse,
throwable);
return null;
}
asyncResponse.resume(Response.noContent().build());
@@ -3927,17 +3936,24 @@ public class PersistentTopicsBase extends AdminResource
{
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
- Throwable t =
exception.getCause();
+ Throwable t =
FutureUtil.unwrapCompletionException(exception);
if (t instanceof
NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(),
subName)));
return null;
} else {
- log.error("[{}]
Failed to expire messages up "
- +
"to {} on {}", clientAppId(),
-
expireTimeInSeconds, topicName, t);
-
asyncResponse.resume(new RestException(t));
+ if (t instanceof
PulsarAdminException) {
+ log.warn("[{}]
Failed to expire messages up "
+ + "to
{} on {}: {}", clientAppId(),
+
expireTimeInSeconds, topicName,
+
t.toString());
+ } else {
+
log.error("[{}] Failed to expire messages up "
+ + "to
{} on {}", clientAppId(),
+
expireTimeInSeconds, topicName, t);
+ }
+
resumeAsyncResponseExceptionally(asyncResponse, t);
return null;
}
}
@@ -3955,12 +3971,18 @@ public class PersistentTopicsBase extends AdminResource
{
}))
).exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to expire messages up to {} on {}",
clientAppId(),
- expireTimeInSeconds, topicName, ex);
+ if (!isRedirectException(cause)) {
+ if (cause instanceof RestException) {
+ log.warn("[{}] Failed to expire messages up to {} on {}:
{}", clientAppId(), expireTimeInSeconds,
+ topicName, cause.toString());
+ } else {
+ log.error("[{}] Failed to expire messages up to {} on {}",
clientAppId(), expireTimeInSeconds,
+ topicName, cause);
+ }
}
- resumeAsyncResponseExceptionally(asyncResponse, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}
@@ -4068,7 +4090,7 @@ public class PersistentTopicsBase extends AdminResource {
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(__ -> {
- log.info("[{}][{}] received expire messages on
subscription {} to position {}", clientAppId(),
+ log.info("[{}][{}] Received expire messages on
subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
return
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
messageId, isExcluded, batchIndex);
@@ -4134,16 +4156,22 @@ public class PersistentTopicsBase extends AdminResource
{
+ topicName + " for subscription " +
subName + " due to ongoing"
+ " message expiration not finished or
invalid message position provided.");
}
+ } catch (RestException exception) {
+ throw exception;
} catch (Exception exception) {
- log.error("[{}] Failed to expire messages up to {} on
{} with subscription {} {}",
- clientAppId(), position, topicName, subName,
exception);
throw new RestException(exception);
}
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
- log.error("[{}] Failed to expire messages up to {} on {}
with subscription {} {}",
- clientAppId(), messageId, topicName, subName, e);
- asyncResponse.resume(e);
+ Throwable throwable =
FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ log.warn("[{}] Failed to expire messages up to {} on
{} with subscription {}: {}",
+ clientAppId(), messageId, topicName, subName,
throwable.toString());
+ } else {
+ log.error("[{}] Failed to expire messages up to {} on
{} with subscription {}", clientAppId(),
+ messageId, topicName, subName, throwable);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, throwable);
return null;
});
} catch (Exception e) {
@@ -4152,9 +4180,14 @@ public class PersistentTopicsBase extends AdminResource {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- log.error("[{}] Failed to expire messages up to {} on subscription
{} to position {}", clientAppId(),
- topicName, subName, messageId, cause);
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof RestException) {
+ log.warn("[{}] Failed to expire messages up to {} on
subscription {} to position {}: {}", clientAppId(),
+ topicName, subName, messageId, cause.toString());
+ } else {
+ log.error("[{}] Failed to expire messages up to {} on
subscription {} to position {}", clientAppId(),
+ topicName, subName, messageId, cause);
+ }
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});