This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c00c3c1e2a76abcd243486b5ef2c14a27ceef5ef Author: Masahiro Sakamoto <[email protected]> AuthorDate: Wed Apr 5 15:45:59 2023 +0900 [refactor][broker] Suppress error logging when message expiration fails (#19778) (cherry picked from commit 0d1fe1821e030c0c0b53a7d47a2bf89151783eb4) --- .../broker/admin/impl/PersistentTopicsBase.java | 85 +++++++++++++++------- 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 264bebfe50e..d31092c5829 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2006,11 +2006,15 @@ public class PersistentTopicsBase extends AdminResource { FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { - Throwable t = exception.getCause(); - log.error("[{}] Failed to expire messages up to {} on {}", - clientAppId(), expireTimeInSeconds, - topicName, t); - asyncResponse.resume(new RestException(t)); + Throwable t = FutureUtil.unwrapCompletionException(exception); + if (t instanceof PulsarAdminException) { + log.warn("[{}] Failed to expire messages up to {} on {}: {}", clientAppId(), + expireTimeInSeconds, topicName, t.toString()); + } else { + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), + expireTimeInSeconds, topicName, t); + } + resumeAsyncResponseExceptionally(asyncResponse, t); return null; } asyncResponse.resume(Response.noContent().build()); @@ -2077,9 +2081,14 @@ public class PersistentTopicsBase extends AdminResource { FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { Throwable throwable = FutureUtil.unwrapCompletionException(exception); - log.error("[{}] Failed to expire messages for all subscription up to {} on {}", - clientAppId(), expireTimeInSeconds, topicName, throwable); - asyncResponse.resume(new RestException(throwable)); + if (throwable instanceof RestException) { + log.warn("[{}] Failed to expire messages for all subscription up to {} on {}: {}", + clientAppId(), expireTimeInSeconds, topicName, throwable.toString()); + } else { + log.error("[{}] Failed to expire messages for all subscription up to {} on {}", + clientAppId(), expireTimeInSeconds, topicName, throwable); + } + resumeAsyncResponseExceptionally(asyncResponse, throwable); return null; } asyncResponse.resume(Response.noContent().build()); @@ -3846,17 +3855,24 @@ public class PersistentTopicsBase extends AdminResource { FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { - Throwable t = exception.getCause(); + Throwable t = FutureUtil.unwrapCompletionException(exception); if (t instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, getSubNotFoundErrorMessage(topicName.toString(), subName))); return null; } else { - log.error("[{}] Failed to expire messages up " - + "to {} on {}", clientAppId(), - expireTimeInSeconds, topicName, t); - asyncResponse.resume(new RestException(t)); + if (t instanceof PulsarAdminException) { + log.warn("[{}] Failed to expire messages up " + + "to {} on {}: {}", clientAppId(), + expireTimeInSeconds, topicName, + t.toString()); + } else { + log.error("[{}] Failed to expire messages up " + + "to {} on {}", clientAppId(), + expireTimeInSeconds, topicName, t); + } + resumeAsyncResponseExceptionally(asyncResponse, t); return null; } } @@ -3874,12 +3890,18 @@ public class PersistentTopicsBase extends AdminResource { })) ).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), - expireTimeInSeconds, topicName, ex); + if (!isRedirectException(cause)) { + if (cause instanceof RestException) { + log.warn("[{}] Failed to expire messages up to {} on {}: {}", clientAppId(), expireTimeInSeconds, + topicName, cause.toString()); + } else { + log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds, + topicName, cause); + } } - resumeAsyncResponseExceptionally(asyncResponse, ex); + resumeAsyncResponseExceptionally(asyncResponse, cause); return null; }); } @@ -3987,7 +4009,7 @@ public class PersistentTopicsBase extends AdminResource { future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName)) .thenCompose(__ -> { - log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), + log.info("[{}][{}] Received expire messages on subscription {} to position {}", clientAppId(), topicName, subName, messageId); return internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName, messageId, isExcluded, batchIndex); @@ -4053,16 +4075,22 @@ public class PersistentTopicsBase extends AdminResource { + topicName + " for subscription " + subName + " due to ongoing" + " message expiration not finished or invalid message position provided."); } + } catch (RestException exception) { + throw exception; } catch (Exception exception) { - log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", - clientAppId(), position, topicName, subName, exception); throw new RestException(exception); } asyncResponse.resume(Response.noContent().build()); }).exceptionally(e -> { - log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", - clientAppId(), messageId, topicName, subName, e); - asyncResponse.resume(e); + Throwable throwable = FutureUtil.unwrapCompletionException(e); + if (throwable instanceof RestException) { + log.warn("[{}] Failed to expire messages up to {} on {} with subscription {}: {}", + clientAppId(), messageId, topicName, subName, throwable.toString()); + } else { + log.error("[{}] Failed to expire messages up to {} on {} with subscription {}", clientAppId(), + messageId, topicName, subName, throwable); + } + resumeAsyncResponseExceptionally(asyncResponse, throwable); return null; }); } catch (Exception e) { @@ -4071,9 +4099,14 @@ public class PersistentTopicsBase extends AdminResource { resumeAsyncResponseExceptionally(asyncResponse, e); } }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", clientAppId(), - topicName, subName, messageId, cause); + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof RestException) { + log.warn("[{}] Failed to expire messages up to {} on subscription {} to position {}: {}", clientAppId(), + topicName, subName, messageId, cause.toString()); + } else { + log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", clientAppId(), + topicName, subName, messageId, cause); + } resumeAsyncResponseExceptionally(asyncResponse, cause); return null; });
