This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19120e6b00e log  transaction rest api error on broker side (#15081)
19120e6b00e is described below

commit 19120e6b00e4176b8cf72058ee524eb4d10fc759
Author: gaozhangmin <[email protected]>
AuthorDate: Wed Apr 13 13:28:06 2022 +0800

    log  transaction rest api error on broker side (#15081)
    
    Co-authored-by: gavingaozhangmin <[email protected]>
---
 .../pulsar/broker/admin/v3/Transactions.java       | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 9cb825b9f8e..7b2c99ff9d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -88,8 +88,12 @@ public class Transactions extends TransactionsBase {
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetTransactionInBufferStats(authoritative, 
Long.parseLong(mostSigBits),
                     Long.parseLong(leastSigBits))
-                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction state in 
transaction buffer {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -122,8 +126,12 @@ public class Transactions extends TransactionsBase {
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetTransactionInPendingAckStats(authoritative, 
Long.parseLong(mostSigBits),
                     Long.parseLong(leastSigBits), subName)
-                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction state in 
pending ack {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -152,8 +160,12 @@ public class Transactions extends TransactionsBase {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetTransactionBufferStats(authoritative)
-                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction buffer 
stats in topic {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -183,8 +195,12 @@ public class Transactions extends TransactionsBase {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetPendingAckStats(authoritative, subName)
-                    .thenAccept(stats -> asyncResponse.resume(stats))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction pending 
ack stats in topic {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -276,10 +292,13 @@ public class Transactions extends TransactionsBase {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetPendingAckInternalStats(authoritative, subName, 
metadata)
-                    .thenAccept(stats -> asyncResponse.resume(stats))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get pending ack internal 
stats {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
-                        log.error("[{}] Failed to get pending ack internal 
stats {}", clientAppId(), topicName, cause);
                         if (cause instanceof 
BrokerServiceException.ServiceUnitNotReadyException) {
                             asyncResponse.resume(new 
RestException(SERVICE_UNAVAILABLE, cause));
                         } else if (cause instanceof 
BrokerServiceException.NotAllowedException) {

Reply via email to