This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 59a1360c06f68390f5e65f6d0708a852e3523d23 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Sun Apr 3 03:27:20 2022 +0800 [fix][broker] Fix getPendingAckInternalStats redirect issue. (#14876) (cherry picked from commit 417d1e50957bee1b0f88b8172d7831af496ed001) --- .../pulsar/broker/admin/impl/TransactionsBase.java | 93 ++++++++-------------- .../pulsar/broker/admin/v3/Transactions.java | 33 +++++++- .../broker/admin/v3/AdminApiTransactionTest.java | 22 +++++ 3 files changed, 82 insertions(+), 66 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 504ce92de45..308f18e8bc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -38,9 +38,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; -import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; -import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; @@ -508,69 +505,41 @@ public abstract class TransactionsBase extends AdminResource { } } - protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, boolean authoritative, - TopicName topicName, String subName, boolean metadata) { - try { - if (pulsar().getConfig().isTransactionCoordinatorEnabled()) { - validateTopicOwnership(topicName, authoritative); - CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService() - .getTopics().get(topicName.toString()); - if (topicFuture != null) { - topicFuture.whenComplete((optionalTopic, e) -> { - - if (e != null) { - asyncResponse.resume(new RestException(e)); - return; - } + protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats( + boolean authoritative, TopicName topicName, String subName, boolean metadata) { + if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) { + return FutureUtil.failedFuture(new RestException(SERVICE_UNAVAILABLE, + "This Broker is not configured with transactionCoordinatorEnabled=true.")); + } + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> { + CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService() + .getTopics().get(topicName.toString()); + if (topicFuture == null) { + return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found")); + } + return topicFuture.thenCompose(optionalTopic -> { if (!optionalTopic.isPresent()) { - asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, - "Topic is not owned by this broker!")); - return; - } - Topic topicObject = optionalTopic.get(); - if (topicObject instanceof PersistentTopic) { - try { - ManagedLedger managedLedger = - ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName).get(); - TransactionPendingAckInternalStats stats = - new TransactionPendingAckInternalStats(); - TransactionLogStats pendingAckLogStats = new TransactionLogStats(); - pendingAckLogStats.managedLedgerName = managedLedger.getName(); - pendingAckLogStats.managedLedgerInternalStats = - managedLedger.getManagedLedgerInternalStats(metadata).get(); - stats.pendingAckLogStats = pendingAckLogStats; - asyncResponse.resume(stats); - } catch (Exception exception) { - if (exception instanceof ExecutionException) { - if (exception.getCause() instanceof ServiceUnitNotReadyException) { - asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, - exception.getCause())); - return; - } else if (exception.getCause() instanceof NotAllowedException) { - asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, - exception.getCause())); - return; - } else if (exception.getCause() instanceof SubscriptionNotFoundException) { - asyncResponse.resume(new RestException(NOT_FOUND, exception.getCause())); - return; - } - } - asyncResponse.resume(new RestException(exception)); - } + return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found")); } else { - asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!")); + Topic topicObject = optionalTopic.get(); + return ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName) + .thenCompose(managedLedger -> managedLedger.getManagedLedgerInternalStats(metadata) + .thenApply(internalStats -> { + TransactionLogStats pendingAckLogStats = new TransactionLogStats(); + pendingAckLogStats.managedLedgerName = managedLedger.getName(); + pendingAckLogStats.managedLedgerInternalStats = internalStats; + return pendingAckLogStats; + }) + .thenApply(pendingAckLogStats -> { + TransactionPendingAckInternalStats stats = + new TransactionPendingAckInternalStats(); + stats.pendingAckLogStats = pendingAckLogStats; + return stats; + })); } }); - } else { - asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!")); - } - } else { - asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, - "This Broker is not configured with transactionCoordinatorEnabled=true.")); - } - } catch (Exception e) { - asyncResponse.resume(new RestException(e.getCause())); - } + }); } protected void validateTopicName(String property, String namespace, String encodedTopic) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 94411f6d16d..bbd79036ecf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.admin.v3; +import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -33,14 +36,17 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.admin.impl.TransactionsBase; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.util.FutureUtil; @Path("/transactions") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @Api(value = "/transactions", description = "Transactions admin apis", tags = "transactions") +@Slf4j public class Transactions extends TransactionsBase { @GET @@ -222,7 +228,26 @@ public class Transactions extends TransactionsBase { @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String subName, @QueryParam("metadata") @DefaultValue("false") boolean metadata) { - internalGetPendingAckInternalStats(asyncResponse, authoritative, - TopicName.get(TopicDomain.persistent.value(), tenant, namespace, encodedTopic), subName, metadata); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalGetPendingAckInternalStats(authoritative, topicName, subName, metadata) + .thenAccept(stats -> asyncResponse.resume(stats)) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause); + if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) { + asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause)); + } else if (cause instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause)); + } else if (cause instanceof BrokerServiceException.SubscriptionNotFoundException) { + asyncResponse.resume(new RestException(NOT_FOUND, cause)); + } else { + asyncResponse.resume(new RestException(cause)); + } + return null; + }); + } catch (Exception ex) { + resumeAsyncResponseExceptionally(asyncResponse, ex); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 93a2d0e188e..1f796553bdc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -56,12 +57,14 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest { @@ -374,6 +377,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest { TransactionImpl transaction = (TransactionImpl) getTransaction(); final String topic = "persistent://public/default/testGetPendingAckInternalStats"; final String subName = "test"; + try { + admin.transactions() + .getPendingAckInternalStatsAsync(topic, subName, true).get(); + fail("Should failed here"); + } catch (ExecutionException ex) { + assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException); + PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause(); + assertEquals(cause.getMessage(), "Topic not found"); + } + try { + pulsar.getBrokerService().getTopic(topic, false); + admin.transactions() + .getPendingAckInternalStatsAsync(topic, subName, true).get(); + fail("Should failed here"); + } catch (ExecutionException ex) { + assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException); + PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause(); + assertEquals(cause.getMessage(), "Topic not found"); + } admin.topics().createNonPartitionedTopic(topic); Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)