mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843877969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, 
boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        
pulsar().getTransactionMetadataStoreService().getStores()
-                                
.get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+        if (coordinatorId != null) {
+            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " 
+ coordinatorId));
+                return;
+            }
+            
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> 
transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> 
transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new 
HashMap<>();
+                
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());

Review Comment:
   use ``join`` to avoid catch checked exceptions?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, 
boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        
pulsar().getTransactionMetadataStoreService().getStores()
-                                
.get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+        if (coordinatorId != null) {
+            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long 
timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            
pulsar().getTransactionMetadataStoreService().getStores()
-                                    
.get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! 
coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        
pulsar().getTransactionMetadataStoreService().getStores()
+                                
.get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = 
transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> 
completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = 
new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) 
-> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = 
transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> 
completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> 
completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, 
e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new 
HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : 
completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, 
future.get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new 
RestException(exception.getCause()));
+                            return;
+                        }
+                    }
+                    asyncResponse.resume(transactionMetadata);
+                });
+            } else {
+                
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                        false, false).thenAccept(partitionMetadata -> {
+                    if (partitionMetadata.partitions == 0) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    List<CompletableFuture<Map<String, TransactionMetadata>>> 
completableFutures =
+                            Lists.newArrayList();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        try {
+                            completableFutures
+                                    
.add(pulsar().getAdminClient().transactions()
+                                            
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+                                                    TimeUnit.MILLISECONDS));
+                        } catch (PulsarServerException e) {
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    Map<String, TransactionMetadata> transactionMetadataMaps = 
new HashMap<>();
+                    
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
                         if (e != null) {
-                            asyncResponse.resume(new 
RestException(e.getCause()));
+                            asyncResponse.resume(new RestException(e));
                             return;
                         }
 
-                        Map<String, TransactionMetadata> transactionMetadata = 
new HashMap<>();
-                        for (CompletableFuture<TransactionMetadata> future : 
completableFutures) {
+                        for (CompletableFuture<Map<String, 
TransactionMetadata>> transactionMetadataMap
+                                : completableFutures) {
                             try {
-                                transactionMetadata.put(future.get().txnId, 
future.get());
+                                
transactionMetadataMaps.putAll(transactionMetadataMap.get());

Review Comment:
   use join to avoid catch checked exceptions?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, 
boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        
pulsar().getTransactionMetadataStoreService().getStores()
-                                
.get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+        if (coordinatorId != null) {
+            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " 
+ coordinatorId));
+                return;
+            }
+            
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> 
transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> 
transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new 
HashMap<>();
+                
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new 
RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new 
HashMap<>();
-                    
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-
-                        for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new 
RestException(exception.getCause()));
-                                return;
-                            }
-                        }
 
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator 
state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", 
clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse 
asyncResponse, boolean authoritative,
-                                                           long mostSigBits, 
long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new 
TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInPendingAckStats> 
internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String 
subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInPendingAckStats(new 
TxnID(mostSigBits, leastSigBits),
+                        subName));
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse 
asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long 
leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        TransactionInBufferStats transactionInBufferStats = 
((PersistentTopic) topicObject)
-                                .getTransactionInBufferStats(new 
TxnID(mostSigBits, leastSigBits));
-                        asyncResponse.resume(transactionInBufferStats);
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInBufferStats> 
internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInBufferStats(new 
TxnID(mostSigBits, leastSigBits)));
     }
 
-    protected void internalGetTransactionBufferStats(AsyncResponse 
asyncResponse, boolean authoritative) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) 
topicObject).getTransactionBufferStats());
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, 
"Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionBufferStats> 
internalGetTransactionBufferStats(boolean authoritative) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionBufferStats());
     }
 
-    protected void internalGetPendingAckStats(AsyncResponse asyncResponse, 
boolean authoritative, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) 
topicObject).getTransactionPendingAckStats(subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, 
"Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionPendingAckStats> 
internalGetPendingAckStats(
+            boolean authoritative, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> 
topic.getTransactionPendingAckStats(subName));
     }
 
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int 
mostSigBits, long leastSigBits) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
-                        authoritative);
-                CompletableFuture<TransactionMetadata> 
transactionMetadataFuture = new CompletableFuture<>();
-                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
-                        .getTxnMeta(new TxnID(mostSigBits, 
leastSigBits)).get();
-                getTransactionMetadata(txnMeta, transactionMetadataFuture);
-                asyncResponse.resume(transactionMetadataFuture.get(10, 
TimeUnit.SECONDS));
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
-            }
+            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+                    authoritative);
+            CompletableFuture<TransactionMetadata> transactionMetadataFuture = 
new CompletableFuture<>();
+            TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+                    .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();

Review Comment:
   looks like this method uses an endless wait for "get()" here?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -429,6 +507,18 @@ public void testGetPendingAckInternalStats() throws 
Exception {
         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
     }
 
+    @Test(timeOut = 20000)
+    public void testTransactionNotEnabled() throws Exception {
+        stopBroker();
+        conf.setTransactionCoordinatorEnabled(false);
+        super.internalSetup();
+        try {
+            admin.transactions().getCoordinatorInternalStats(1, false);

Review Comment:
   Assert.fail() here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long 
timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            
pulsar().getTransactionMetadataStoreService().getStores()
-                                    
.get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! 
coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        
pulsar().getTransactionMetadataStoreService().getStores()
+                                
.get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = 
transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> 
completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = 
new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) 
-> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = 
transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> 
completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> 
completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, 
e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new 
HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : 
completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, 
future.get());

Review Comment:
   use join to avoid catch checked exceptions?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long 
timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            
pulsar().getTransactionMetadataStoreService().getStores()
-                                    
.get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! 
coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse 
asyncResponse,
     protected void internalGetCoordinatorInternalStats(AsyncResponse 
asyncResponse, boolean authoritative,
                                                        boolean metadata, int 
coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                TopicName topicName = 
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
-                validateTopicOwnership(topicName, authoritative);
-                TransactionMetadataStore metadataStore = 
pulsar().getTransactionMetadataStoreService()
-                        
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
-                if (metadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
-                    return;
-                }
-                if (metadataStore instanceof MLTransactionMetadataStore) {
-                    ManagedLedger managedLedger = 
((MLTransactionMetadataStore) metadataStore).getManagedLedger();
-                    TransactionCoordinatorInternalStats 
transactionCoordinatorInternalStats =
-                            new TransactionCoordinatorInternalStats();
-                    TransactionLogStats transactionLogStats = new 
TransactionLogStats();
-                    transactionLogStats.managedLedgerName = 
managedLedger.getName();
-                    transactionLogStats.managedLedgerInternalStats =
-                            
managedLedger.getManagedLedgerInternalStats(metadata).get();
-                    transactionCoordinatorInternalStats.transactionLogStats = 
transactionLogStats;
-                    asyncResponse.resume(transactionCoordinatorInternalStats);
-                } else {
-                    asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                            "Broker don't use MLTransactionMetadataStore!"));
-                }
+            TopicName topicName = 
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            validateTopicOwnership(topicName, authoritative);
+            TransactionMetadataStore metadataStore = 
pulsar().getTransactionMetadataStoreService()
+                    
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
+            if (metadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " 
+ coordinatorId));
+                return;
+            }
+            if (metadataStore instanceof MLTransactionMetadataStore) {
+                ManagedLedger managedLedger = ((MLTransactionMetadataStore) 
metadataStore).getManagedLedger();
+                TransactionCoordinatorInternalStats 
transactionCoordinatorInternalStats =
+                        new TransactionCoordinatorInternalStats();
+                TransactionLogStats transactionLogStats = new 
TransactionLogStats();
+                transactionLogStats.managedLedgerName = 
managedLedger.getName();
+                transactionLogStats.managedLedgerInternalStats =
+                        
managedLedger.getManagedLedgerInternalStats(metadata).get();

Review Comment:
   looks like this method uses an endless wait for "get()" here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse 
asyncResponse,
     protected void internalGetCoordinatorInternalStats(AsyncResponse 
asyncResponse, boolean authoritative,
                                                        boolean metadata, int 
coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                TopicName topicName = 
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
-                validateTopicOwnership(topicName, authoritative);
-                TransactionMetadataStore metadataStore = 
pulsar().getTransactionMetadataStoreService()
-                        
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
-                if (metadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
-                    return;
-                }
-                if (metadataStore instanceof MLTransactionMetadataStore) {
-                    ManagedLedger managedLedger = 
((MLTransactionMetadataStore) metadataStore).getManagedLedger();
-                    TransactionCoordinatorInternalStats 
transactionCoordinatorInternalStats =
-                            new TransactionCoordinatorInternalStats();
-                    TransactionLogStats transactionLogStats = new 
TransactionLogStats();
-                    transactionLogStats.managedLedgerName = 
managedLedger.getName();
-                    transactionLogStats.managedLedgerInternalStats =
-                            
managedLedger.getManagedLedgerInternalStats(metadata).get();
-                    transactionCoordinatorInternalStats.transactionLogStats = 
transactionLogStats;
-                    asyncResponse.resume(transactionCoordinatorInternalStats);
-                } else {
-                    asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                            "Broker don't use MLTransactionMetadataStore!"));
-                }
+            TopicName topicName = 
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            validateTopicOwnership(topicName, authoritative);

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -168,6 +209,7 @@ public void getTransactionMetadata(@Suspended final 
AsyncResponse asyncResponse,
                                        @DefaultValue("false") boolean 
authoritative,
                                        @PathParam("mostSigBits") String 
mostSigBits,
                                        @PathParam("leastSigBits") String 
leastSigBits) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, 
boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        
pulsar().getTransactionMetadataStoreService().getStores()
-                                
.get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+        if (coordinatorId != null) {
+            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " 
+ coordinatorId));
+                return;
+            }
+            
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> 
transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> 
transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new 
HashMap<>();
+                
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new 
RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new 
HashMap<>();
-                    
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-
-                        for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new 
RestException(exception.getCause()));
-                                return;
-                            }
-                        }
 
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator 
state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", 
clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse 
asyncResponse, boolean authoritative,
-                                                           long mostSigBits, 
long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new 
TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInPendingAckStats> 
internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String 
subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInPendingAckStats(new 
TxnID(mostSigBits, leastSigBits),
+                        subName));
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse 
asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long 
leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        TransactionInBufferStats transactionInBufferStats = 
((PersistentTopic) topicObject)
-                                .getTransactionInBufferStats(new 
TxnID(mostSigBits, leastSigBits));
-                        asyncResponse.resume(transactionInBufferStats);
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInBufferStats> 
internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInBufferStats(new 
TxnID(mostSigBits, leastSigBits)));
     }
 
-    protected void internalGetTransactionBufferStats(AsyncResponse 
asyncResponse, boolean authoritative) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) 
topicObject).getTransactionBufferStats());
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, 
"Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionBufferStats> 
internalGetTransactionBufferStats(boolean authoritative) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionBufferStats());
     }
 
-    protected void internalGetPendingAckStats(AsyncResponse asyncResponse, 
boolean authoritative, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new 
RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) 
topicObject).getTransactionPendingAckStats(subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, 
"Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, 
"Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionPendingAckStats> 
internalGetPendingAckStats(
+            boolean authoritative, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> 
topic.getTransactionPendingAckStats(subName));
     }
 
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int 
mostSigBits, long leastSigBits) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
-                        authoritative);
-                CompletableFuture<TransactionMetadata> 
transactionMetadataFuture = new CompletableFuture<>();
-                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
-                        .getTxnMeta(new TxnID(mostSigBits, 
leastSigBits)).get();
-                getTransactionMetadata(txnMeta, transactionMetadataFuture);
-                asyncResponse.resume(transactionMetadataFuture.get(10, 
TimeUnit.SECONDS));
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
-            }
+            
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -61,6 +61,7 @@ public void getCoordinatorStats(@Suspended final 
AsyncResponse asyncResponse,
                                     @QueryParam("authoritative")
                                     @DefaultValue("false") boolean 
authoritative,
                                     @QueryParam("coordinatorId") Integer 
coordinatorId) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -205,6 +248,7 @@ public void getCoordinatorInternalStats(@Suspended final 
AsyncResponse asyncResp
                                             @DefaultValue("false") boolean 
authoritative,
                                             @PathParam("coordinatorId") String 
coordinatorId,
                                             @QueryParam("metadata") 
@DefaultValue("false") boolean metadata) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -188,6 +230,7 @@ public void getSlowTransactions(@Suspended final 
AsyncResponse asyncResponse,
                                     @DefaultValue("false") boolean 
authoritative,
                                     @PathParam("timeout") String timeout,
                                     @QueryParam("coordinatorId") Integer 
coordinatorId) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to