This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d1fe1821e0 [refactor][broker] Suppress error logging when message 
expiration fails (#19778)
0d1fe1821e0 is described below

commit 0d1fe1821e030c0c0b53a7d47a2bf89151783eb4
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Wed Apr 5 15:45:59 2023 +0900

    [refactor][broker] Suppress error logging when message expiration fails 
(#19778)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 85 +++++++++++++++-------
 1 file changed, 59 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 035e32542ed..7347d6dbf20 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2098,11 +2098,15 @@ public class PersistentTopicsBase extends AdminResource 
{
 
                             FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
                                 if (exception != null) {
-                                    Throwable t = exception.getCause();
-                                    log.error("[{}] Failed to expire messages 
up to {} on {}",
-                                            clientAppId(), expireTimeInSeconds,
-                                            topicName, t);
-                                    asyncResponse.resume(new RestException(t));
+                                    Throwable t = 
FutureUtil.unwrapCompletionException(exception);
+                                    if (t instanceof PulsarAdminException) {
+                                        log.warn("[{}] Failed to expire 
messages up to {} on {}: {}", clientAppId(),
+                                                expireTimeInSeconds, 
topicName, t.toString());
+                                    } else {
+                                        log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+                                                expireTimeInSeconds, 
topicName, t);
+                                    }
+                                    
resumeAsyncResponseExceptionally(asyncResponse, t);
                                     return null;
                                 }
                                 
asyncResponse.resume(Response.noContent().build());
@@ -2169,9 +2173,14 @@ public class PersistentTopicsBase extends AdminResource {
                     FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
                         if (exception != null) {
                             Throwable throwable = 
FutureUtil.unwrapCompletionException(exception);
-                            log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
-                                    clientAppId(), expireTimeInSeconds, 
topicName, throwable);
-                            asyncResponse.resume(new RestException(throwable));
+                            if (throwable instanceof RestException) {
+                                log.warn("[{}] Failed to expire messages for 
all subscription up to {} on {}: {}",
+                                        clientAppId(), expireTimeInSeconds, 
topicName, throwable.toString());
+                            } else {
+                                log.error("[{}] Failed to expire messages for 
all subscription up to {} on {}",
+                                        clientAppId(), expireTimeInSeconds, 
topicName, throwable);
+                            }
+                            resumeAsyncResponseExceptionally(asyncResponse, 
throwable);
                             return null;
                         }
                         asyncResponse.resume(Response.noContent().build());
@@ -3927,17 +3936,24 @@ public class PersistentTopicsBase extends AdminResource 
{
 
                                                 
FutureUtil.waitForAll(futures).handle((result, exception) -> {
                                                     if (exception != null) {
-                                                        Throwable t = 
exception.getCause();
+                                                        Throwable t = 
FutureUtil.unwrapCompletionException(exception);
                                                         if (t instanceof 
NotFoundException) {
                                                             
asyncResponse.resume(new RestException(Status.NOT_FOUND,
                                                                     
getSubNotFoundErrorMessage(topicName.toString(),
                                                                             
subName)));
                                                             return null;
                                                         } else {
-                                                            log.error("[{}] 
Failed to expire messages up "
-                                                                            + 
"to {} on {}", clientAppId(),
-                                                                    
expireTimeInSeconds, topicName, t);
-                                                            
asyncResponse.resume(new RestException(t));
+                                                            if (t instanceof 
PulsarAdminException) {
+                                                                log.warn("[{}] 
Failed to expire messages up "
+                                                                        + "to 
{} on {}: {}", clientAppId(),
+                                                                               
 expireTimeInSeconds, topicName,
+                                                                               
         t.toString());
+                                                            } else {
+                                                                
log.error("[{}] Failed to expire messages up "
+                                                                        + "to 
{} on {}", clientAppId(),
+                                                                               
 expireTimeInSeconds, topicName, t);
+                                                            }
+                                                            
resumeAsyncResponseExceptionally(asyncResponse, t);
                                                             return null;
                                                         }
                                                     }
@@ -3955,12 +3971,18 @@ public class PersistentTopicsBase extends AdminResource 
{
                                 }))
 
         ).exceptionally(ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
             // If the exception is not redirect exception we need to log it.
-            if (!isRedirectException(ex)) {
-                log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(),
-                        expireTimeInSeconds, topicName, ex);
+            if (!isRedirectException(cause)) {
+                if (cause instanceof RestException) {
+                    log.warn("[{}] Failed to expire messages up to {} on {}: 
{}", clientAppId(), expireTimeInSeconds,
+                            topicName, cause.toString());
+                } else {
+                    log.error("[{}] Failed to expire messages up to {} on {}", 
clientAppId(), expireTimeInSeconds,
+                            topicName, cause);
+                }
             }
-            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            resumeAsyncResponseExceptionally(asyncResponse, cause);
             return null;
         });
     }
@@ -4068,7 +4090,7 @@ public class PersistentTopicsBase extends AdminResource {
         future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
                 .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.EXPIRE_MESSAGES, subName))
                 .thenCompose(__ -> {
-                    log.info("[{}][{}] received expire messages on 
subscription {} to position {}", clientAppId(),
+                    log.info("[{}][{}] Received expire messages on 
subscription {} to position {}", clientAppId(),
                             topicName, subName, messageId);
                     return 
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
                             messageId, isExcluded, batchIndex);
@@ -4134,16 +4156,22 @@ public class PersistentTopicsBase extends AdminResource 
{
                                     + topicName + " for subscription " + 
subName + " due to ongoing"
                                     + " message expiration not finished or 
invalid message position provided.");
                         }
+                    } catch (RestException exception) {
+                        throw exception;
                     } catch (Exception exception) {
-                        log.error("[{}] Failed to expire messages up to {} on 
{} with subscription {} {}",
-                                clientAppId(), position, topicName, subName, 
exception);
                         throw new RestException(exception);
                     }
                     asyncResponse.resume(Response.noContent().build());
                 }).exceptionally(e -> {
-                    log.error("[{}] Failed to expire messages up to {} on {} 
with subscription {} {}",
-                            clientAppId(), messageId, topicName, subName, e);
-                    asyncResponse.resume(e);
+                    Throwable throwable = 
FutureUtil.unwrapCompletionException(e);
+                    if (throwable instanceof RestException) {
+                        log.warn("[{}] Failed to expire messages up to {} on 
{} with subscription {}: {}",
+                                clientAppId(), messageId, topicName, subName, 
throwable.toString());
+                    } else {
+                        log.error("[{}] Failed to expire messages up to {} on 
{} with subscription {}", clientAppId(),
+                                messageId, topicName, subName, throwable);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, throwable);
                     return null;
                 });
             } catch (Exception e) {
@@ -4152,9 +4180,14 @@ public class PersistentTopicsBase extends AdminResource {
                 resumeAsyncResponseExceptionally(asyncResponse, e);
             }
         }).exceptionally(ex -> {
-            Throwable cause = ex.getCause();
-            log.error("[{}] Failed to expire messages up to {} on subscription 
{} to position {}", clientAppId(),
-                    topicName, subName, messageId, cause);
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            if (cause instanceof RestException) {
+                log.warn("[{}] Failed to expire messages up to {} on 
subscription {} to position {}: {}", clientAppId(),
+                        topicName, subName, messageId, cause.toString());
+            } else {
+                log.error("[{}] Failed to expire messages up to {} on 
subscription {} to position {}", clientAppId(),
+                        topicName, subName, messageId, cause);
+            }
             resumeAsyncResponseExceptionally(asyncResponse, cause);
             return null;
         });

Reply via email to