Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843901280
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long
timeout, Integer coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found!
coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+
pulsar().getTransactionMetadataStoreService().getStores()
+
.get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ return;
+ }
+ List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
+ List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
+ for (TxnMeta txnMeta : transactions) {
+ CompletableFuture<TransactionMetadata> completableFuture =
new CompletableFuture<>();
+ getTransactionMetadata(txnMeta, completableFuture);
+ completableFutures.add(completableFuture);
+ }
+
+ FutureUtil.waitForAll(completableFutures).whenComplete((v, e)
-> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e.getCause()));
return;
}
- List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
- List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
- for (TxnMeta txnMeta : transactions) {
- CompletableFuture<TransactionMetadata>
completableFuture = new CompletableFuture<>();
- getTransactionMetadata(txnMeta, completableFuture);
- completableFutures.add(completableFuture);
- }
- FutureUtil.waitForAll(completableFutures).whenComplete((v,
e) -> {
+ Map<String, TransactionMetadata> transactionMetadata = new
HashMap<>();
+ for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ try {
+ transactionMetadata.put(future.get().txnId,
future.get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return;
+ }
+ }
+ asyncResponse.resume(transactionMetadata);
+ });
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
+ return;
+ }
+ List<CompletableFuture<Map<String, TransactionMetadata>>>
completableFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ completableFutures
+
.add(pulsar().getAdminClient().transactions()
+
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+ TimeUnit.MILLISECONDS));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+ Map<String, TransactionMetadata> transactionMetadataMaps =
new HashMap<>();
+
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
if (e != null) {
- asyncResponse.resume(new
RestException(e.getCause()));
+ asyncResponse.resume(new RestException(e));
return;
}
- Map<String, TransactionMetadata> transactionMetadata =
new HashMap<>();
- for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ for (CompletableFuture<Map<String,
TransactionMetadata>> transactionMetadataMap
+ : completableFutures) {
try {
- transactionMetadata.put(future.get().txnId,
future.get());
+
transactionMetadataMaps.putAll(transactionMetadataMap.get());
Review Comment:
In this patch, for this method, only remove
`pulsar().getConfig().isTransactionCoordinatorEnabled()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]