This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8b8bf154aec [fix][transaction] Fix transaction REST API redirect
issue. (#15017)
8b8bf154aec is described below
commit 8b8bf154aecf07c45fede2679445ef0563b49c5b
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Apr 7 10:20:28 2022 +0800
[fix][transaction] Fix transaction REST API redirect issue. (#15017)
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 510 ++++++++-------------
.../pulsar/broker/admin/v3/Transactions.java | 67 ++-
.../broker/admin/v3/AdminApiTransactionTest.java | 92 +++-
3 files changed, 341 insertions(+), 328 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 308f18e8bc3..b225cd2e266 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,11 +18,9 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
-import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
@@ -46,6 +44,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -53,6 +52,7 @@ import
org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import
org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
+import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -67,216 +67,97 @@ public abstract class TransactionsBase extends
AdminResource {
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse,
boolean authoritative,
Integer coordinatorId) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+ pulsar().getTransactionMetadataStoreService().getStores()
+ .get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : "
+ coordinatorId));
+ return;
+ }
+
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
return;
}
-
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
- } else {
-
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
+ List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ transactionMetadataStoreInfoFutures
+
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
return;
}
- List<CompletableFuture<TransactionCoordinatorStats>>
transactionMetadataStoreInfoFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
+ }
+ Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
+
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
try {
- transactionMetadataStoreInfoFutures
-
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new
RestException(exception.getCause()));
return;
}
}
- Map<Integer, TransactionCoordinatorStats> stats = new
HashMap<>();
-
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
- try {
- stats.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
- } catch (Exception exception) {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(stats);
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get transaction coordinator
state.", clientAppId(), ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ asyncResponse.resume(stats);
});
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get transaction coordinator state.",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
- protected void internalGetTransactionInPendingAckStats(AsyncResponse
asyncResponse, boolean authoritative,
- long mostSigBits,
long leastSigBits, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic) topicObject)
- .getTransactionInPendingAckStats(new
TxnID(mostSigBits, leastSigBits), subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInPendingAckStats>
internalGetTransactionInPendingAckStats(
+ boolean authoritative, long mostSigBits, long leastSigBits, String
subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInPendingAckStats(new
TxnID(mostSigBits, leastSigBits),
+ subName));
}
- protected void internalGetTransactionInBufferStats(AsyncResponse
asyncResponse, boolean authoritative,
- long mostSigBits, long
leastSigBits) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- TransactionInBufferStats transactionInBufferStats =
((PersistentTopic) topicObject)
- .getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits));
- asyncResponse.resume(transactionInBufferStats);
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInBufferStats>
internalGetTransactionInBufferStats(
+ boolean authoritative, long mostSigBits, long leastSigBits) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits)));
}
- protected void internalGetTransactionBufferStats(AsyncResponse
asyncResponse, boolean authoritative) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic)
topicObject).getTransactionBufferStats());
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionBufferStats>
internalGetTransactionBufferStats(boolean authoritative) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionBufferStats());
}
- protected void internalGetPendingAckStats(AsyncResponse asyncResponse,
boolean authoritative, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic)
topicObject).getTransactionPendingAckStats(subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST,
"Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionPendingAckStats>
internalGetPendingAckStats(
+ boolean authoritative, String subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic ->
topic.getTransactionPendingAckStats(subName));
}
protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int
mostSigBits, long leastSigBits) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
- authoritative);
- CompletableFuture<TransactionMetadata>
transactionMetadataFuture = new CompletableFuture<>();
- TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
- .getTxnMeta(new TxnID(mostSigBits,
leastSigBits)).get();
- getTransactionMetadata(txnMeta, transactionMetadataFuture);
- asyncResponse.resume(transactionMetadataFuture.get(10,
TimeUnit.SECONDS));
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+ authoritative);
+ CompletableFuture<TransactionMetadata> transactionMetadataFuture =
new CompletableFuture<>();
+ TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+ .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
+ getTransactionMetadata(txnMeta, transactionMetadataFuture);
+ asyncResponse.resume(transactionMetadataFuture.get(10,
TimeUnit.SECONDS));
} catch (Exception e) {
if (e instanceof ExecutionException) {
if (e.getCause() instanceof CoordinatorNotFoundException
@@ -378,91 +259,87 @@ public abstract class TransactionsBase extends
AdminResource {
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long
timeout, Integer coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found!
coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+
pulsar().getTransactionMetadataStoreService().getStores()
+
.get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id
: " + coordinatorId));
+ return;
+ }
+ List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
+ List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
+ for (TxnMeta txnMeta : transactions) {
+ CompletableFuture<TransactionMetadata> completableFuture =
new CompletableFuture<>();
+ getTransactionMetadata(txnMeta, completableFuture);
+ completableFutures.add(completableFuture);
+ }
+
+ FutureUtil.waitForAll(completableFutures).whenComplete((v, e)
-> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e.getCause()));
return;
}
- List<TxnMeta> transactions =
transactionMetadataStore.getSlowTransactions(timeout);
- List<CompletableFuture<TransactionMetadata>>
completableFutures = new ArrayList<>();
- for (TxnMeta txnMeta : transactions) {
- CompletableFuture<TransactionMetadata>
completableFuture = new CompletableFuture<>();
- getTransactionMetadata(txnMeta, completableFuture);
- completableFutures.add(completableFuture);
- }
- FutureUtil.waitForAll(completableFutures).whenComplete((v,
e) -> {
+ Map<String, TransactionMetadata> transactionMetadata = new
HashMap<>();
+ for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ try {
+ transactionMetadata.put(future.get().txnId,
future.get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return;
+ }
+ }
+ asyncResponse.resume(transactionMetadata);
+ });
+ } else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
+ return;
+ }
+ List<CompletableFuture<Map<String, TransactionMetadata>>>
completableFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ completableFutures
+
.add(pulsar().getAdminClient().transactions()
+
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+ TimeUnit.MILLISECONDS));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+ Map<String, TransactionMetadata> transactionMetadataMaps =
new HashMap<>();
+
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
if (e != null) {
- asyncResponse.resume(new
RestException(e.getCause()));
+ asyncResponse.resume(new RestException(e));
return;
}
- Map<String, TransactionMetadata> transactionMetadata =
new HashMap<>();
- for (CompletableFuture<TransactionMetadata> future :
completableFutures) {
+ for (CompletableFuture<Map<String,
TransactionMetadata>> transactionMetadataMap
+ : completableFutures) {
try {
- transactionMetadata.put(future.get().txnId,
future.get());
+
transactionMetadataMaps.putAll(transactionMetadataMap.get());
} catch (Exception exception) {
asyncResponse.resume(new
RestException(exception.getCause()));
return;
}
}
- asyncResponse.resume(transactionMetadata);
- });
- } else {
-
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
- return;
- }
- List<CompletableFuture<Map<String,
TransactionMetadata>>> completableFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++)
{
- try {
- completableFutures
-
.add(pulsar().getAdminClient().transactions()
-
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
-
TimeUnit.MILLISECONDS));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
- Map<String, TransactionMetadata>
transactionMetadataMaps = new HashMap<>();
-
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- for (CompletableFuture<Map<String,
TransactionMetadata>> transactionMetadataMap
- : completableFutures) {
- try {
-
transactionMetadataMaps.putAll(transactionMetadataMap.get());
- } catch (Exception exception) {
- asyncResponse.resume(new
RestException(exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(transactionMetadataMaps);
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get transaction coordinator
state.", clientAppId(), ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ asyncResponse.resume(transactionMetadataMaps);
});
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get transaction coordinator
state.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"Broker don't support transaction!"));
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
@@ -472,33 +349,28 @@ public abstract class TransactionsBase extends
AdminResource {
protected void internalGetCoordinatorInternalStats(AsyncResponse
asyncResponse, boolean authoritative,
boolean metadata, int
coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- TopicName topicName =
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
- validateTopicOwnership(topicName, authoritative);
- TransactionMetadataStore metadataStore =
pulsar().getTransactionMetadataStoreService()
-
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
- if (metadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id
: " + coordinatorId));
- return;
- }
- if (metadataStore instanceof MLTransactionMetadataStore) {
- ManagedLedger managedLedger =
((MLTransactionMetadataStore) metadataStore).getManagedLedger();
- TransactionCoordinatorInternalStats
transactionCoordinatorInternalStats =
- new TransactionCoordinatorInternalStats();
- TransactionLogStats transactionLogStats = new
TransactionLogStats();
- transactionLogStats.managedLedgerName =
managedLedger.getName();
- transactionLogStats.managedLedgerInternalStats =
-
managedLedger.getManagedLedgerInternalStats(metadata).get();
- transactionCoordinatorInternalStats.transactionLogStats =
transactionLogStats;
- asyncResponse.resume(transactionCoordinatorInternalStats);
- } else {
- asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
- "Broker don't use MLTransactionMetadataStore!"));
- }
+ TopicName topicName =
TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+ validateTopicOwnership(topicName, authoritative);
+ TransactionMetadataStore metadataStore =
pulsar().getTransactionMetadataStoreService()
+
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
+ if (metadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : "
+ coordinatorId));
+ return;
+ }
+ if (metadataStore instanceof MLTransactionMetadataStore) {
+ ManagedLedger managedLedger = ((MLTransactionMetadataStore)
metadataStore).getManagedLedger();
+ TransactionCoordinatorInternalStats
transactionCoordinatorInternalStats =
+ new TransactionCoordinatorInternalStats();
+ TransactionLogStats transactionLogStats = new
TransactionLogStats();
+ transactionLogStats.managedLedgerName =
managedLedger.getName();
+ transactionLogStats.managedLedgerInternalStats =
+
managedLedger.getManagedLedgerInternalStats(metadata).get();
+ transactionCoordinatorInternalStats.transactionLogStats =
transactionLogStats;
+ asyncResponse.resume(transactionCoordinatorInternalStats);
} else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
+ "Broker don't use MLTransactionMetadataStore!"));
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e.getCause()));
@@ -506,40 +378,46 @@ public abstract class TransactionsBase extends
AdminResource {
}
protected CompletableFuture<TransactionPendingAckInternalStats>
internalGetPendingAckInternalStats(
- boolean authoritative, TopicName topicName, String subName,
boolean metadata) {
+ boolean authoritative, String subName, boolean metadata) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenCompose(topic ->
topic.getPendingAckManagedLedger(subName))
+ .thenCompose(managedLedger ->
+ managedLedger.getManagedLedgerInternalStats(metadata)
+ .thenApply(internalStats -> {
+ TransactionLogStats pendingAckLogStats = new
TransactionLogStats();
+ pendingAckLogStats.managedLedgerName =
managedLedger.getName();
+ pendingAckLogStats.managedLedgerInternalStats
= internalStats;
+ return pendingAckLogStats;
+ })
+ .thenApply(pendingAckLogStats -> {
+ TransactionPendingAckInternalStats stats = new
TransactionPendingAckInternalStats();
+ stats.pendingAckLogStats = pendingAckLogStats;
+ return stats;
+ })
+ );
+ }
+
+ protected CompletableFuture<PersistentTopic>
getExistingPersistentTopicAsync(boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName,
authoritative).thenCompose(__ -> {
+ CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
+ .getTopics().get(topicName.toString());
+ if (topicFuture == null) {
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND,
"Topic not found"));
+ }
+ return topicFuture.thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent()) {
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND, "Topic not found"));
+ }
+ return CompletableFuture.completedFuture((PersistentTopic)
optionalTopic.get());
+ });
+ });
+ }
+
+ protected void checkTransactionCoordinatorEnabled() {
if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- return FutureUtil.failedFuture(new
RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ throw new RestException(SERVICE_UNAVAILABLE,
+ "This Broker is not configured with
transactionCoordinatorEnabled=true.");
}
- return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> {
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture == null) {
- return FutureUtil.failedFuture(new
RestException(NOT_FOUND, "Topic not found"));
- }
- return topicFuture.thenCompose(optionalTopic -> {
- if (!optionalTopic.isPresent()) {
- return FutureUtil.failedFuture(new
RestException(NOT_FOUND, "Topic not found"));
- } else {
- Topic topicObject = optionalTopic.get();
- return ((PersistentTopic)
topicObject).getPendingAckManagedLedger(subName)
- .thenCompose(managedLedger ->
managedLedger.getManagedLedgerInternalStats(metadata)
- .thenApply(internalStats -> {
- TransactionLogStats
pendingAckLogStats = new TransactionLogStats();
-
pendingAckLogStats.managedLedgerName = managedLedger.getName();
-
pendingAckLogStats.managedLedgerInternalStats = internalStats;
- return pendingAckLogStats;
- })
- .thenApply(pendingAckLogStats -> {
-
TransactionPendingAckInternalStats stats =
- new
TransactionPendingAckInternalStats();
- stats.pendingAckLogStats =
pendingAckLogStats;
- return stats;
- }));
- }
- });
- });
}
protected void validateTopicName(String property, String namespace, String
encodedTopic) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index bbd79036ecf..9cb825b9f8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -61,6 +61,7 @@ public class Transactions extends TransactionsBase {
@QueryParam("authoritative")
@DefaultValue("false") boolean
authoritative,
@QueryParam("coordinatorId") Integer
coordinatorId) {
+ checkTransactionCoordinatorEnabled();
internalGetCoordinatorStats(asyncResponse, authoritative,
coordinatorId);
}
@@ -82,9 +83,19 @@ public class Transactions extends TransactionsBase {
@PathParam("topic") @Encoded
String encodedTopic,
@PathParam("mostSigBits") String
mostSigBits,
@PathParam("leastSigBits") String
leastSigBits) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionInBufferStats(asyncResponse, authoritative,
- Long.parseLong(mostSigBits), Long.parseLong(leastSigBits));
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionInBufferStats(authoritative,
Long.parseLong(mostSigBits),
+ Long.parseLong(leastSigBits))
+ .thenAccept(stat -> asyncResponse.resume(stat))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -106,9 +117,19 @@ public class Transactions extends TransactionsBase {
@PathParam("mostSigBits")
String mostSigBits,
@PathParam("leastSigBits")
String leastSigBits,
@PathParam("subName") String
subName) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionInPendingAckStats(asyncResponse, authoritative,
Long.parseLong(mostSigBits),
- Long.parseLong(leastSigBits), subName);
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionInPendingAckStats(authoritative,
Long.parseLong(mostSigBits),
+ Long.parseLong(leastSigBits), subName)
+ .thenAccept(stat -> asyncResponse.resume(stat))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -127,8 +148,18 @@ public class Transactions extends TransactionsBase {
@PathParam("tenant") String tenant,
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionBufferStats(asyncResponse, authoritative);
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionBufferStats(authoritative)
+ .thenAccept(stat -> asyncResponse.resume(stat))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -148,8 +179,18 @@ public class Transactions extends TransactionsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic,
@PathParam("subName") String subName) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetPendingAckStats(asyncResponse, authoritative, subName);
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetPendingAckStats(authoritative, subName)
+ .thenAccept(stats -> asyncResponse.resume(stats))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -168,6 +209,7 @@ public class Transactions extends TransactionsBase {
@DefaultValue("false") boolean
authoritative,
@PathParam("mostSigBits") String
mostSigBits,
@PathParam("leastSigBits") String
leastSigBits) {
+ checkTransactionCoordinatorEnabled();
internalGetTransactionMetadata(asyncResponse, authoritative,
Integer.parseInt(mostSigBits),
Long.parseLong(leastSigBits));
}
@@ -188,6 +230,7 @@ public class Transactions extends TransactionsBase {
@DefaultValue("false") boolean
authoritative,
@PathParam("timeout") String timeout,
@QueryParam("coordinatorId") Integer
coordinatorId) {
+ checkTransactionCoordinatorEnabled();
internalGetSlowTransactions(asyncResponse, authoritative,
Long.parseLong(timeout), coordinatorId);
}
@@ -205,6 +248,7 @@ public class Transactions extends TransactionsBase {
@DefaultValue("false") boolean
authoritative,
@PathParam("coordinatorId") String
coordinatorId,
@QueryParam("metadata")
@DefaultValue("false") boolean metadata) {
+ checkTransactionCoordinatorEnabled();
internalGetCoordinatorInternalStats(asyncResponse, authoritative,
metadata, Integer.parseInt(coordinatorId));
}
@@ -229,8 +273,9 @@ public class Transactions extends TransactionsBase {
@PathParam("subName") String
subName,
@QueryParam("metadata")
@DefaultValue("false") boolean metadata) {
try {
+ checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
- internalGetPendingAckInternalStats(authoritative, topicName,
subName, metadata)
+ internalGetPendingAckInternalStats(authoritative, subName,
metadata)
.thenAccept(stats -> asyncResponse.resume(stats))
.exceptionally(ex -> {
Throwable cause =
FutureUtil.unwrapCompletionException(ex);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 920ad61be4e..fa6e56724fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.v3;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -121,6 +122,25 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
initTransaction(2);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic =
"persistent://public/default/testGetTransactionInBufferStats";
+ try {
+ admin.transactions()
+ .getTransactionInBufferStatsAsync(new TxnID(1, 1),
topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getTransactionInBufferStatsAsync(new TxnID(1, 1),
topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0,
TimeUnit.SECONDS).create();
MessageId messageId = producer.newMessage(transaction).value("Hello
pulsar!".getBytes()).send();
@@ -146,6 +166,27 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
initTransaction(2);
final String topic =
"persistent://public/default/testGetTransactionInBufferStats";
final String subName = "test";
+ try {
+ admin.transactions()
+ .getTransactionInPendingAckStatsAsync(new TxnID(1,
+ 2), topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getTransactionInPendingAckStatsAsync(new TxnID(1,
+ 2), topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer =
pulsarClient.newConsumer(Schema.BYTES).topic(topic)
@@ -252,8 +293,26 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
final String topic =
"persistent://public/default/testGetTransactionBufferStats";
final String subName1 = "test1";
final String subName2 = "test2";
+ try {
+ admin.transactions()
+ .getTransactionBufferStatsAsync(topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getTransactionBufferStatsAsync(topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
-
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
Consumer<byte[]> consumer1 =
pulsarClient.newConsumer(Schema.BYTES).topic(topic)
@@ -289,6 +348,25 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
initTransaction(2);
final String topic =
"persistent://public/default/testGetPendingAckStats";
final String subName = "test1";
+ try {
+ admin.transactions()
+ .getPendingAckStatsAsync(topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getPendingAckStatsAsync(topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
@@ -429,6 +507,18 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}
+ @Test(timeOut = 20000)
+ public void testTransactionNotEnabled() throws Exception {
+ stopBroker();
+ conf.setTransactionCoordinatorEnabled(false);
+ super.internalSetup();
+ try {
+ admin.transactions().getCoordinatorInternalStats(1, false);
+ } catch (PulsarAdminException ex) {
+ assertEquals(ex.getStatusCode(),
HttpStatus.SC_SERVICE_UNAVAILABLE);
+ }
+ }
+
private static void verifyCoordinatorStats(String state,
long sequenceId, long
lowWaterMark) {
assertEquals(state, "Ready");