liangyepianzhou commented on code in PR #20330: URL: https://github.com/apache/pulsar/pull/20330#discussion_r1218163871
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java: ########## @@ -431,6 +437,80 @@ protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi ); } + protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats( + boolean authoritative, boolean metadata) { + TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats(); + return getExistingPersistentTopicAsync(authoritative) + .thenCompose(topic -> { + TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType(); + if (snapshotType == null) { + return FutureUtil.failedFuture(new RestException(NOT_FOUND, + "Transaction buffer Snapshot for the topic does not exist")); + } else if (snapshotType == TransactionBuffer.SnapshotType.Segment) { + transactionBufferInternalStats.snapshotType = snapshotType.toString(); + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); + return getTxnSnapshotInternalStats(topicName, authoritative, metadata) + .thenApply(snapshotInternalStats -> { + transactionBufferInternalStats.segmentInternalStats = snapshotInternalStats; + return transactionBufferInternalStats; + }).thenCompose(ignore -> { + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES); + return getTxnSnapshotInternalStats(topicName, authoritative, metadata) + .thenApply(indexStats -> { + transactionBufferInternalStats.segmentIndexInternalStats = indexStats; + return transactionBufferInternalStats; + }); + }); + } else if (snapshotType == TransactionBuffer.SnapshotType.Single) { + transactionBufferInternalStats.snapshotType = snapshotType.toString(); + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + return getTxnSnapshotInternalStats(topicName, authoritative, metadata) + .thenApply(snapshotInternalStats -> { + transactionBufferInternalStats.singleSnapshotInternalStats = snapshotInternalStats; + return transactionBufferInternalStats; + }); + } + return FutureUtil.failedFuture(new RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType " + + snapshotType)); + }); + } + + private CompletableFuture<SnapshotInternalStats> getTxnSnapshotInternalStats(TopicName topicName, Review Comment: Oh, thanks for the reminder. I add a `testGetTransactionBufferInternalStatsInMultiBroker` to cover the case of system topic existing in multiple brokers. And the case of a single broker had been covered by `testGetTransactionBufferInternalStats`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org