poorbarcode commented on code in PR #20330: URL: https://github.com/apache/pulsar/pull/20330#discussion_r1206148081
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java: ########## @@ -431,6 +436,69 @@ protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi ); } + protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats( + boolean authoritative, boolean metadata) { + TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats(); + return getExistingPersistentTopicAsync(authoritative) + .thenCompose(topic -> { + TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType(); + if (snapshotType == null) { + return FutureUtil.failedFuture(new RestException(NOT_FOUND, + "Transaction buffer Snapshot for the topic does not exist")); + } else if (snapshotType == TransactionBuffer.SnapshotType.Segment) { + transactionBufferInternalStats.snapshotType = snapshotType.toString(); + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); + return getExistingPersistentTopicAsync(authoritative) + .thenCompose(segmentTopic -> + segmentTopic.getManagedLedger().getManagedLedgerInternalStats(metadata) + .thenApply(segmentInternalStats -> { + SnapshotInternalStats segmentStats = new SnapshotInternalStats(); + segmentStats.managedLedgerName = segmentTopic + .getManagedLedger().getName(); + segmentStats.managedLedgerInternalStats = segmentInternalStats; + transactionBufferInternalStats.segmentInternalStats = segmentStats; + return null; + })) + .thenCompose(ignore -> { + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES); + return getExistingPersistentTopicAsync(authoritative) + .thenCompose(indexTopic -> indexTopic.getManagedLedger() + .getManagedLedgerInternalStats(metadata) + .thenApply(indexInternalStats -> { + SnapshotInternalStats indexStats = new SnapshotInternalStats(); + indexStats.managedLedgerName = indexTopic + .getManagedLedger().getName(); + indexStats.managedLedgerInternalStats = indexInternalStats; + transactionBufferInternalStats + .segmentIndexInternalStats = indexStats; + return null; + })); + }).thenApply(ignore -> transactionBufferInternalStats); + } else if (snapshotType == TransactionBuffer.SnapshotType.Single) { + transactionBufferInternalStats.snapshotType = snapshotType.toString(); + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + return getExistingPersistentTopicAsync(authoritative) + .thenCompose(singleTopic -> + singleTopic.getManagedLedger().getManagedLedgerInternalStats(metadata) + .thenApply(singleInternalStats -> { Review Comment: We can `new TransactionBufferInternalStats()` here, just like this: ```java getManagedLedgerInternalStats.thenApply(mlstats -> { TransactionBufferInternalStats txStats = new TransactionBufferInternalStats(); ... return txStats; }); ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java: ########## @@ -431,6 +436,69 @@ protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi ); } + protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats( + boolean authoritative, boolean metadata) { + TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats(); + return getExistingPersistentTopicAsync(authoritative) + .thenCompose(topic -> { + TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType(); + if (snapshotType == null) { + return FutureUtil.failedFuture(new RestException(NOT_FOUND, + "Transaction buffer Snapshot for the topic does not exist")); + } else if (snapshotType == TransactionBuffer.SnapshotType.Segment) { + transactionBufferInternalStats.snapshotType = snapshotType.toString(); + this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); + return getExistingPersistentTopicAsync(authoritative) Review Comment: We can not confirm the owner of the topic "__transaction_buffer_snapshot_segments" is the current broker, right? If yes, we should get the ml-stats of the topic "__transaction_buffer_snapshot_segments" by another method, it looks like this: ```java if (currentBrokerIsOwner) { //Get ML stats } else { //Send a request to get ML stats } ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java: ########## @@ -173,8 +174,8 @@ public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosit } @Override - public long getLastSnapshotTimestamps() { - return this.lastSnapshotTimestamps; + public void generateSnapshotStats(TransactionBufferStats transactionBufferStats, boolean segmentStats) { + transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps; Review Comment: Please set `transactionBufferStats.totalAbortedTransactions` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org