This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19120e6b00e log transaction rest api error on broker side (#15081)
19120e6b00e is described below
commit 19120e6b00e4176b8cf72058ee524eb4d10fc759
Author: gaozhangmin <[email protected]>
AuthorDate: Wed Apr 13 13:28:06 2022 +0800
log transaction rest api error on broker side (#15081)
Co-authored-by: gavingaozhangmin <[email protected]>
---
.../pulsar/broker/admin/v3/Transactions.java | 31 +++++++++++++++++-----
1 file changed, 25 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 9cb825b9f8e..7b2c99ff9d5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -88,8 +88,12 @@ public class Transactions extends TransactionsBase {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInBufferStats(authoritative,
Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits))
- .thenAccept(stat -> asyncResponse.resume(stat))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction state in
transaction buffer {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -122,8 +126,12 @@ public class Transactions extends TransactionsBase {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInPendingAckStats(authoritative,
Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits), subName)
- .thenAccept(stat -> asyncResponse.resume(stat))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction state in
pending ack {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -152,8 +160,12 @@ public class Transactions extends TransactionsBase {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative)
- .thenAccept(stat -> asyncResponse.resume(stat))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction buffer
stats in topic {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -183,8 +195,12 @@ public class Transactions extends TransactionsBase {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckStats(authoritative, subName)
- .thenAccept(stats -> asyncResponse.resume(stats))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction pending
ack stats in topic {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -276,10 +292,13 @@ public class Transactions extends TransactionsBase {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckInternalStats(authoritative, subName,
metadata)
- .thenAccept(stats -> asyncResponse.resume(stats))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get pending ack internal
stats {}",
+ clientAppId(), topicName, ex);
+ }
Throwable cause =
FutureUtil.unwrapCompletionException(ex);
- log.error("[{}] Failed to get pending ack internal
stats {}", clientAppId(), topicName, cause);
if (cause instanceof
BrokerServiceException.ServiceUnitNotReadyException) {
asyncResponse.resume(new
RestException(SERVICE_UNAVAILABLE, cause));
} else if (cause instanceof
BrokerServiceException.NotAllowedException) {