Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843901280


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long 
timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            
pulsar().getTransactionMetadataStoreService().getStores()
-                                    
.get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! 
coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        
pulsar().getTransactionMetadataStoreService().getStores()
+                                
.get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = 
transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> 
completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = 
new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) 
-> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = 
transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> 
completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> 
completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, 
e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new 
HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : 
completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, 
future.get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new 
RestException(exception.getCause()));
+                            return;
+                        }
+                    }
+                    asyncResponse.resume(transactionMetadata);
+                });
+            } else {
+                
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                        false, false).thenAccept(partitionMetadata -> {
+                    if (partitionMetadata.partitions == 0) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    List<CompletableFuture<Map<String, TransactionMetadata>>> 
completableFutures =
+                            Lists.newArrayList();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        try {
+                            completableFutures
+                                    
.add(pulsar().getAdminClient().transactions()
+                                            
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+                                                    TimeUnit.MILLISECONDS));
+                        } catch (PulsarServerException e) {
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    Map<String, TransactionMetadata> transactionMetadataMaps = 
new HashMap<>();
+                    
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
                         if (e != null) {
-                            asyncResponse.resume(new 
RestException(e.getCause()));
+                            asyncResponse.resume(new RestException(e));
                             return;
                         }
 
-                        Map<String, TransactionMetadata> transactionMetadata = 
new HashMap<>();
-                        for (CompletableFuture<TransactionMetadata> future : 
completableFutures) {
+                        for (CompletableFuture<Map<String, 
TransactionMetadata>> transactionMetadataMap
+                                : completableFutures) {
                             try {
-                                transactionMetadata.put(future.get().txnId, 
future.get());
+                                
transactionMetadataMaps.putAll(transactionMetadataMap.get());

Review Comment:
   In this patch, for this method, only remove 
`pulsar().getConfig().isTransactionCoordinatorEnabled()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to