mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843877969
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse,
boolean authoritative,
Integer coordinatorId) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+ pulsar().getTransactionMetadataStoreService().getStores()
+ .get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : "
+ coordinatorId));
+ return;
+ }
+
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
return;
}
-
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
- } else {
-
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
+ List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ transactionMetadataStoreInfoFutures
+
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
return;
}
- List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
+ }
+ Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
+
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
try {
- transactionMetadataStoreInfoFutures
-
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
Review Comment:
use ``join`` to avoid catch checked exceptions?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse,
boolean authoritative,
Integer coordinatorId) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
Review Comment:
Looks like we should check this method exception or make it async.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long
timeout, Integer coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found!
coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+
pulsar().getTransactionMetadataStoreService().getStores()
+
.get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ return;
+ }
+ List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
+ List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
+ for (TxnMeta txnMeta : transactions) {
+ CompletableFuture<TransactionMetadata> completableFuture =
new CompletableFuture<>();
+ getTransactionMetadata(txnMeta, completableFuture);
+ completableFutures.add(completableFuture);
+ }
+
+ FutureUtil.waitForAll(completableFutures).whenComplete((v, e)
-> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e.getCause()));
return;
}
- List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
- List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
- for (TxnMeta txnMeta : transactions) {
- CompletableFuture<TransactionMetadata>
completableFuture = new CompletableFuture<>();
- getTransactionMetadata(txnMeta, completableFuture);
- completableFutures.add(completableFuture);
- }
- FutureUtil.waitForAll(completableFutures).whenComplete((v,
e) -> {
+ Map<String, TransactionMetadata> transactionMetadata = new
HashMap<>();
+ for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ try {
+ transactionMetadata.put(future.get().txnId,
future.get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return;
+ }
+ }
+ asyncResponse.resume(transactionMetadata);
+ });
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
+ return;
+ }
+ List<CompletableFuture<Map<String, TransactionMetadata>>>
completableFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ completableFutures
+
.add(pulsar().getAdminClient().transactions()
+
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+ TimeUnit.MILLISECONDS));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+ Map<String, TransactionMetadata> transactionMetadataMaps =
new HashMap<>();
+
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
if (e != null) {
- asyncResponse.resume(new
RestException(e.getCause()));
+ asyncResponse.resume(new RestException(e));
return;
}
- Map<String, TransactionMetadata> transactionMetadata =
new HashMap<>();
- for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ for (CompletableFuture<Map<String,
TransactionMetadata>> transactionMetadataMap
+ : completableFutures) {
try {
- transactionMetadata.put(future.get().txnId,
future.get());
+
transactionMetadataMaps.putAll(transactionMetadataMap.get());
Review Comment:
use join to avoid catch checked exceptions?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse,
boolean authoritative,
Integer coordinatorId) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+ pulsar().getTransactionMetadataStoreService().getStores()
+ .get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : "
+ coordinatorId));
+ return;
+ }
+
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
return;
}
-
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
- } else {
-
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
+ List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ transactionMetadataStoreInfoFutures
+
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
return;
}
- List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
+ }
+ Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
+
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
try {
- transactionMetadataStoreInfoFutures
-
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new
RestException(exception.getCause()));
return;
}
}
- Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
-
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
- try {
- stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
- } catch (Exception exception) {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(stats);
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get transaction coordinator
state.", clientAppId(), ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ asyncResponse.resume(stats);
});
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get transaction coordinator state.",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
- protected void internalGetTransactionInPendingAckStats(AsyncResponse
asyncResponse, boolean authoritative,
- long mostSigBits,
long leastSigBits, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic) topicObject)
- .getTransactionInPendingAckStats(new
TxnID(mostSigBits, leastSigBits), subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInPendingAckStats>
internalGetTransactionInPendingAckStats(
+ boolean authoritative, long mostSigBits, long leastSigBits, String
subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInPendingAckStats(new
TxnID(mostSigBits, leastSigBits),
+ subName));
}
- protected void internalGetTransactionInBufferStats(AsyncResponse
asyncResponse, boolean authoritative,
- long mostSigBits, long
leastSigBits) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- TransactionInBufferStats transactionInBufferStats =
((PersistentTopic) topicObject)
- .getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits));
- asyncResponse.resume(transactionInBufferStats);
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInBufferStats>
internalGetTransactionInBufferStats(
+ boolean authoritative, long mostSigBits, long leastSigBits) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits)));
}
- protected void internalGetTransactionBufferStats(AsyncResponse
asyncResponse, boolean authoritative) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic)
topicObject).getTransactionBufferStats());
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionBufferStats>
internalGetTransactionBufferStats(boolean authoritative) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionBufferStats());
}
- protected void internalGetPendingAckStats(AsyncResponse asyncResponse,
boolean authoritative, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic)
topicObject).getTransactionPendingAckStats(subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionPendingAckStats>
internalGetPendingAckStats(
+ boolean authoritative, String subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic ->
topic.getTransactionPendingAckStats(subName));
}
protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int
mostSigBits, long leastSigBits) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
- authoritative);
- CompletableFuture<TransactionMetadata>
transactionMetadataFuture = new CompletableFuture<>();
- TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
- .getTxnMeta(new TxnID(mostSigBits,
leastSigBits)).get();
- getTransactionMetadata(txnMeta, transactionMetadataFuture);
- asyncResponse.resume(transactionMetadataFuture.get(10,
TimeUnit.SECONDS));
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+ authoritative);
+ CompletableFuture<TransactionMetadata> transactionMetadataFuture =
new CompletableFuture<>();
+ TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+ .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
Review Comment:
looks like this method uses an endless wait for "get()" here?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -429,6 +507,18 @@ public void testGetPendingAckInternalStats() throws
Exception {
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}
+ @Test(timeOut = 20000)
+ public void testTransactionNotEnabled() throws Exception {
+ stopBroker();
+ conf.setTransactionCoordinatorEnabled(false);
+ super.internalSetup();
+ try {
+ admin.transactions().getCoordinatorInternalStats(1, false);
Review Comment:
Assert.fail() here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long
timeout, Integer coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found!
coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+
pulsar().getTransactionMetadataStoreService().getStores()
+
.get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ return;
+ }
+ List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
+ List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
+ for (TxnMeta txnMeta : transactions) {
+ CompletableFuture<TransactionMetadata> completableFuture =
new CompletableFuture<>();
+ getTransactionMetadata(txnMeta, completableFuture);
+ completableFutures.add(completableFuture);
+ }
+
+ FutureUtil.waitForAll(completableFutures).whenComplete((v, e)
-> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e.getCause()));
return;
}
- List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
- List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
- for (TxnMeta txnMeta : transactions) {
- CompletableFuture<TransactionMetadata>
completableFuture = new CompletableFuture<>();
- getTransactionMetadata(txnMeta, completableFuture);
- completableFutures.add(completableFuture);
- }
- FutureUtil.waitForAll(completableFutures).whenComplete((v,
e) -> {
+ Map<String, TransactionMetadata> transactionMetadata = new
HashMap<>();
+ for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ try {
+ transactionMetadata.put(future.get().txnId,
future.get());
Review Comment:
use join to avoid catch checked exceptions?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long
timeout, Integer coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found!
coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
Review Comment:
Looks like we should check this method exception or make it async.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse
asyncResponse,
protected void internalGetCoordinatorInternalStats(AsyncResponse
asyncResponse, boolean authoritative,
boolean metadata, int
coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- TopicName topicName =
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
- validateTopicOwnership(topicName, authoritative);
- TransactionMetadataStore metadataStore =
pulsar().getTransactionMetadataStoreService()
-
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
- if (metadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
- return;
- }
- if (metadataStore instanceof MLTransactionMetadataStore) {
- ManagedLedger managedLedger =
((MLTransactionMetadataStore) metadataStore).getManagedLedger();
- TransactionCoordinatorInternalStats
transactionCoordinatorInternalStats =
- new TransactionCoordinatorInternalStats();
- TransactionLogStats transactionLogStats = new
TransactionLogStats();
- transactionLogStats.managedLedgerName =
managedLedger.getName();
- transactionLogStats.managedLedgerInternalStats =
-
managedLedger.getManagedLedgerInternalStats(metadata).get();
- transactionCoordinatorInternalStats.transactionLogStats =
transactionLogStats;
- asyncResponse.resume(transactionCoordinatorInternalStats);
- } else {
- asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
- "Broker don't use MLTransactionMetadataStore!"));
- }
+ TopicName topicName =
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+ validateTopicOwnership(topicName, authoritative);
+ TransactionMetadataStore metadataStore =
pulsar().getTransactionMetadataStoreService()
+
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
+ if (metadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : "
+ coordinatorId));
+ return;
+ }
+ if (metadataStore instanceof MLTransactionMetadataStore) {
+ ManagedLedger managedLedger = ((MLTransactionMetadataStore)
metadataStore).getManagedLedger();
+ TransactionCoordinatorInternalStats
transactionCoordinatorInternalStats =
+ new TransactionCoordinatorInternalStats();
+ TransactionLogStats transactionLogStats = new
TransactionLogStats();
+ transactionLogStats.managedLedgerName =
managedLedger.getName();
+ transactionLogStats.managedLedgerInternalStats =
+
managedLedger.getManagedLedgerInternalStats(metadata).get();
Review Comment:
looks like this method uses an endless wait for "get()" here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse
asyncResponse,
protected void internalGetCoordinatorInternalStats(AsyncResponse
asyncResponse, boolean authoritative,
boolean metadata, int
coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- TopicName topicName =
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
- validateTopicOwnership(topicName, authoritative);
- TransactionMetadataStore metadataStore =
pulsar().getTransactionMetadataStoreService()
-
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
- if (metadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
- return;
- }
- if (metadataStore instanceof MLTransactionMetadataStore) {
- ManagedLedger managedLedger =
((MLTransactionMetadataStore) metadataStore).getManagedLedger();
- TransactionCoordinatorInternalStats
transactionCoordinatorInternalStats =
- new TransactionCoordinatorInternalStats();
- TransactionLogStats transactionLogStats = new
TransactionLogStats();
- transactionLogStats.managedLedgerName =
managedLedger.getName();
- transactionLogStats.managedLedgerInternalStats =
-
managedLedger.getManagedLedgerInternalStats(metadata).get();
- transactionCoordinatorInternalStats.transactionLogStats =
transactionLogStats;
- asyncResponse.resume(transactionCoordinatorInternalStats);
- } else {
- asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
- "Broker don't use MLTransactionMetadataStore!"));
- }
+ TopicName topicName =
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+ validateTopicOwnership(topicName, authoritative);
Review Comment:
Looks like we should check this method exception or make it async.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -168,6 +209,7 @@ public void getTransactionMetadata(@Suspended final
AsyncResponse asyncResponse,
@DefaultValue("false") boolean
authoritative,
@PathParam("mostSigBits") String
mostSigBits,
@PathParam("leastSigBits") String
leastSigBits) {
+ checkTransactionCoordinatorEnabled();
Review Comment:
Looks like we should catch this method exception.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse,
boolean authoritative,
Integer coordinatorId) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+ pulsar().getTransactionMetadataStoreService().getStores()
+ .get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : "
+ coordinatorId));
+ return;
+ }
+
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
return;
}
-
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
- } else {
-
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
+ List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ transactionMetadataStoreInfoFutures
+
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
return;
}
- List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
+ }
+ Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
+
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
try {
- transactionMetadataStoreInfoFutures
-
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new
RestException(exception.getCause()));
return;
}
}
- Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
-
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
- try {
- stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
- } catch (Exception exception) {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(stats);
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get transaction coordinator
state.", clientAppId(), ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ asyncResponse.resume(stats);
});
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get transaction coordinator state.",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
- protected void internalGetTransactionInPendingAckStats(AsyncResponse
asyncResponse, boolean authoritative,
- long mostSigBits,
long leastSigBits, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic) topicObject)
- .getTransactionInPendingAckStats(new
TxnID(mostSigBits, leastSigBits), subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInPendingAckStats>
internalGetTransactionInPendingAckStats(
+ boolean authoritative, long mostSigBits, long leastSigBits, String
subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInPendingAckStats(new
TxnID(mostSigBits, leastSigBits),
+ subName));
}
- protected void internalGetTransactionInBufferStats(AsyncResponse
asyncResponse, boolean authoritative,
- long mostSigBits, long
leastSigBits) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- TransactionInBufferStats transactionInBufferStats =
((PersistentTopic) topicObject)
- .getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits));
- asyncResponse.resume(transactionInBufferStats);
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInBufferStats>
internalGetTransactionInBufferStats(
+ boolean authoritative, long mostSigBits, long leastSigBits) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits)));
}
- protected void internalGetTransactionBufferStats(AsyncResponse
asyncResponse, boolean authoritative) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic)
topicObject).getTransactionBufferStats());
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionBufferStats>
internalGetTransactionBufferStats(boolean authoritative) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionBufferStats());
}
- protected void internalGetPendingAckStats(AsyncResponse asyncResponse,
boolean authoritative, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic)
topicObject).getTransactionPendingAckStats(subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionPendingAckStats>
internalGetPendingAckStats(
+ boolean authoritative, String subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic ->
topic.getTransactionPendingAckStats(subName));
}
protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int
mostSigBits, long leastSigBits) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
- authoritative);
- CompletableFuture<TransactionMetadata>
transactionMetadataFuture = new CompletableFuture<>();
- TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
- .getTxnMeta(new TxnID(mostSigBits,
leastSigBits)).get();
- getTransactionMetadata(txnMeta, transactionMetadataFuture);
- asyncResponse.resume(transactionMetadataFuture.get(10,
TimeUnit.SECONDS));
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
Review Comment:
Looks like we should check this method exception or make it async.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -61,6 +61,7 @@ public void getCoordinatorStats(@Suspended final
AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean
authoritative,
@QueryParam("coordinatorId") Integer
coordinatorId) {
+ checkTransactionCoordinatorEnabled();
Review Comment:
Looks like we should catch this method exception.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -205,6 +248,7 @@ public void getCoordinatorInternalStats(@Suspended final
AsyncResponse asyncResp
@DefaultValue("false") boolean
authoritative,
@PathParam("coordinatorId") String
coordinatorId,
@QueryParam("metadata")
@DefaultValue("false") boolean metadata) {
+ checkTransactionCoordinatorEnabled();
Review Comment:
Looks like we should catch this method exception.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -188,6 +230,7 @@ public void getSlowTransactions(@Suspended final
AsyncResponse asyncResponse,
@DefaultValue("false") boolean
authoritative,
@PathParam("timeout") String timeout,
@QueryParam("coordinatorId") Integer
coordinatorId) {
+ checkTransactionCoordinatorEnabled();
Review Comment:
Looks like we should catch this method exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]