This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 417d1e5 [fix][broker] Fix getPendingAckInternalStats redirect issue.
(#14876)
417d1e5 is described below
commit 417d1e50957bee1b0f88b8172d7831af496ed001
Author: Jiwei Guo <[email protected]>
AuthorDate: Sun Apr 3 03:27:20 2022 +0800
[fix][broker] Fix getPendingAckInternalStats redirect issue. (#14876)
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 93 ++++++++--------------
.../pulsar/broker/admin/v3/Transactions.java | 33 +++++++-
.../broker/admin/v3/AdminApiTransactionTest.java | 22 +++++
3 files changed, 82 insertions(+), 66 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 504ce92..308f18e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -38,9 +38,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
-import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
-import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
-import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
@@ -508,69 +505,41 @@ public abstract class TransactionsBase extends
AdminResource {
}
}
- protected void internalGetPendingAckInternalStats(AsyncResponse
asyncResponse, boolean authoritative,
- TopicName topicName,
String subName, boolean metadata) {
- try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
-
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
+ protected CompletableFuture<TransactionPendingAckInternalStats>
internalGetPendingAckInternalStats(
+ boolean authoritative, TopicName topicName, String subName,
boolean metadata) {
+ if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(SERVICE_UNAVAILABLE,
+ "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ }
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> {
+ CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
+ .getTopics().get(topicName.toString());
+ if (topicFuture == null) {
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND, "Topic not found"));
+ }
+ return topicFuture.thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new
RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- try {
- ManagedLedger managedLedger =
- ((PersistentTopic)
topicObject).getPendingAckManagedLedger(subName).get();
- TransactionPendingAckInternalStats stats =
- new
TransactionPendingAckInternalStats();
- TransactionLogStats pendingAckLogStats = new
TransactionLogStats();
- pendingAckLogStats.managedLedgerName =
managedLedger.getName();
- pendingAckLogStats.managedLedgerInternalStats =
-
managedLedger.getManagedLedgerInternalStats(metadata).get();
- stats.pendingAckLogStats = pendingAckLogStats;
- asyncResponse.resume(stats);
- } catch (Exception exception) {
- if (exception instanceof ExecutionException) {
- if (exception.getCause() instanceof
ServiceUnitNotReadyException) {
- asyncResponse.resume(new
RestException(SERVICE_UNAVAILABLE,
- exception.getCause()));
- return;
- } else if (exception.getCause() instanceof
NotAllowedException) {
- asyncResponse.resume(new
RestException(METHOD_NOT_ALLOWED,
- exception.getCause()));
- return;
- } else if (exception.getCause() instanceof
SubscriptionNotFoundException) {
- asyncResponse.resume(new
RestException(NOT_FOUND, exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(new
RestException(exception));
- }
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND, "Topic not found"));
} else {
- asyncResponse.resume(new
RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
+ Topic topicObject = optionalTopic.get();
+ return ((PersistentTopic)
topicObject).getPendingAckManagedLedger(subName)
+ .thenCompose(managedLedger ->
managedLedger.getManagedLedgerInternalStats(metadata)
+ .thenApply(internalStats -> {
+ TransactionLogStats
pendingAckLogStats = new TransactionLogStats();
+
pendingAckLogStats.managedLedgerName = managedLedger.getName();
+
pendingAckLogStats.managedLedgerInternalStats = internalStats;
+ return pendingAckLogStats;
+ })
+ .thenApply(pendingAckLogStats -> {
+
TransactionPendingAckInternalStats stats =
+ new
TransactionPendingAckInternalStats();
+ stats.pendingAckLogStats =
pendingAckLogStats;
+ return stats;
+ }));
}
});
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with
transactionCoordinatorEnabled=true."));
- }
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e.getCause()));
- }
+ });
}
protected void validateTopicName(String property, String namespace, String
encodedTopic) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 94411f6..bbd7903 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.admin.v3;
+import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -33,14 +36,17 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.util.FutureUtil;
@Path("/transactions")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Api(value = "/transactions", description = "Transactions admin apis", tags =
"transactions")
+@Slf4j
public class Transactions extends TransactionsBase {
@GET
@@ -222,7 +228,26 @@ public class Transactions extends TransactionsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@PathParam("subName") String
subName,
@QueryParam("metadata")
@DefaultValue("false") boolean metadata) {
- internalGetPendingAckInternalStats(asyncResponse, authoritative,
- TopicName.get(TopicDomain.persistent.value(), tenant,
namespace, encodedTopic), subName, metadata);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetPendingAckInternalStats(authoritative, topicName,
subName, metadata)
+ .thenAccept(stats -> asyncResponse.resume(stats))
+ .exceptionally(ex -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to get pending ack internal
stats {}", clientAppId(), topicName, cause);
+ if (cause instanceof
BrokerServiceException.ServiceUnitNotReadyException) {
+ asyncResponse.resume(new
RestException(SERVICE_UNAVAILABLE, cause));
+ } else if (cause instanceof
BrokerServiceException.NotAllowedException) {
+ asyncResponse.resume(new
RestException(METHOD_NOT_ALLOWED, cause));
+ } else if (cause instanceof
BrokerServiceException.SubscriptionNotFoundException) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
cause));
+ } else {
+ asyncResponse.resume(new RestException(cause));
+ }
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 5ba369e..920ad61 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -55,12 +56,14 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
@@ -373,6 +376,25 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic =
"persistent://public/default/testGetPendingAckInternalStats";
final String subName = "test";
+ try {
+ admin.transactions()
+ .getPendingAckInternalStatsAsync(topic, subName,
true).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getPendingAckInternalStatsAsync(topic, subName,
true).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof
PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause =
(PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer =
pulsarClient.newConsumer(Schema.BYTES).topic(topic)