poorbarcode commented on code in PR #20330:
URL: https://github.com/apache/pulsar/pull/20330#discussion_r1206148081


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +436,69 @@ protected 
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
                 );
     }
 
+    protected CompletableFuture<TransactionBufferInternalStats> 
internalGetTransactionBufferInternalStats(
+            boolean authoritative, boolean metadata) {
+        TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> {
+                    TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+                    if (snapshotType == null) {
+                        return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+                                "Transaction buffer Snapshot for the topic 
does not exist"));
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        this.topicName = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                        return getExistingPersistentTopicAsync(authoritative)
+                                .thenCompose(segmentTopic ->
+                                        
segmentTopic.getManagedLedger().getManagedLedgerInternalStats(metadata)
+                                                
.thenApply(segmentInternalStats -> {
+                                                    SnapshotInternalStats 
segmentStats = new SnapshotInternalStats();
+                                                    
segmentStats.managedLedgerName = segmentTopic
+                                                            
.getManagedLedger().getName();
+                                                    
segmentStats.managedLedgerInternalStats = segmentInternalStats;
+                                                    
transactionBufferInternalStats.segmentInternalStats = segmentStats;
+                                                    return null;
+                                        }))
+                                .thenCompose(ignore -> {
+                                    this.topicName = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                            
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+                                    return 
getExistingPersistentTopicAsync(authoritative)
+                                            .thenCompose(indexTopic ->  
indexTopic.getManagedLedger()
+                                                    
.getManagedLedgerInternalStats(metadata)
+                                                    
.thenApply(indexInternalStats -> {
+                                                        SnapshotInternalStats 
indexStats = new SnapshotInternalStats();
+                                                        
indexStats.managedLedgerName = indexTopic
+                                                                
.getManagedLedger().getName();
+                                                        
indexStats.managedLedgerInternalStats = indexInternalStats;
+                                                        
transactionBufferInternalStats
+                                                                
.segmentIndexInternalStats = indexStats;
+                                                        return null;
+                                                    }));
+                                }).thenApply(ignore -> 
transactionBufferInternalStats);
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Single) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        this.topicName = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                        return getExistingPersistentTopicAsync(authoritative)
+                                .thenCompose(singleTopic ->
+                                        
singleTopic.getManagedLedger().getManagedLedgerInternalStats(metadata)
+                                                .thenApply(singleInternalStats 
-> {

Review Comment:
   We can `new TransactionBufferInternalStats()` here, just like this:
   
   ```java
   getManagedLedgerInternalStats.thenApply(mlstats -> {
     TransactionBufferInternalStats txStats = new 
TransactionBufferInternalStats();
      ...
     return txStats;
   });
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -431,6 +436,69 @@ protected 
CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
                 );
     }
 
+    protected CompletableFuture<TransactionBufferInternalStats> 
internalGetTransactionBufferInternalStats(
+            boolean authoritative, boolean metadata) {
+        TransactionBufferInternalStats transactionBufferInternalStats = new 
TransactionBufferInternalStats();
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> {
+                    TransactionBuffer.SnapshotType snapshotType = 
topic.getTransactionBuffer().getSnapshotType();
+                    if (snapshotType == null) {
+                        return FutureUtil.failedFuture(new 
RestException(NOT_FOUND,
+                                "Transaction buffer Snapshot for the topic 
does not exist"));
+                    } else if (snapshotType == 
TransactionBuffer.SnapshotType.Segment) {
+                        transactionBufferInternalStats.snapshotType = 
snapshotType.toString();
+                        this.topicName = 
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+                                
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                        return getExistingPersistentTopicAsync(authoritative)

Review Comment:
   We can not confirm the owner of the topic 
"__transaction_buffer_snapshot_segments" is the current broker, right? 
   
   If yes, we should get the ml-stats of the topic 
"__transaction_buffer_snapshot_segments" by another method, it looks like this:
   
   ```java
   if (currentBrokerIsOwner) {
     //Get ML stats
   } else {
     //Send a request to get ML stats
   }
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -173,8 +174,8 @@ public CompletableFuture<Void> 
takeAbortedTxnsSnapshot(PositionImpl maxReadPosit
     }
 
     @Override
-    public long getLastSnapshotTimestamps() {
-        return this.lastSnapshotTimestamps;
+    public void generateSnapshotStats(TransactionBufferStats 
transactionBufferStats, boolean segmentStats) {
+        transactionBufferStats.lastSnapshotTimestamps = 
this.lastSnapshotTimestamps;

Review Comment:
   Please set `transactionBufferStats.totalAbortedTransactions` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to