This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c676b9593e2e45ae0b60a97ed7fa5ed53ccd8cb4 Author: Yunze Xu <[email protected]> AuthorDate: Wed Nov 6 17:01:33 2024 +0800 [fix][broker] Fix currently client retries until operation timeout if the topic does not exist (#23530) (cherry picked from commit 8eeb0e2e89f8938348d29044d9e1d843a6251067) --- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../broker/admin/GetPartitionMetadataTest.java | 3 +- .../TokenAuthenticatedProducerConsumerTest.java | 71 +++++++++++++++++++--- .../pulsar/client/impl/PulsarClientImpl.java | 4 +- 4 files changed, 67 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index beef7819589..a1dcd7af6b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -668,7 +668,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { Throwable actEx = FutureUtil.unwrapCompletionException(ex); if (actEx instanceof WebApplicationException restException) { if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) { - writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, + writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound, "Tenant or namespace or topic does not exist: " + topicName.getNamespace() , requestId)); lookupSemaphore.release(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index d06f6ea1c56..f3237676c86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -599,8 +599,7 @@ public class GetPartitionMetadataTest extends TestRetrySupport { fail("Expected a not found ex"); } catch (Exception ex) { Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); - assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException || - unwrapEx instanceof PulsarClientException.TopicDoesNotExistException); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException); } } // Verify: lookup semaphore has been releases. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java index 4d5e7deaf7d..8f56d8b24ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.auth.AuthenticationToken; @@ -42,8 +43,9 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -56,31 +58,35 @@ import org.testng.annotations.Test; public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class); + private final static String ADMIN_ROLE = "admin"; private final String ADMIN_TOKEN; + private final String USER_TOKEN; private final String TOKEN_PUBLIC_KEY; + private final KeyPair kp; TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException { KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA"); - KeyPair kp = kpg.generateKeyPair(); + kp = kpg.generateKeyPair(); byte[] encodedPublicKey = kp.getPublic().getEncoded(); TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey); - ADMIN_TOKEN = generateToken(kp); + ADMIN_TOKEN = generateToken(ADMIN_ROLE); + USER_TOKEN = generateToken("user"); } - private String generateToken(KeyPair kp) { + private String generateToken(String subject) { PrivateKey pkey = kp.getPrivate(); long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis(); Date exp = new Date(expMillis); return Jwts.builder() - .setSubject("admin") + .setSubject(subject) .setExpiration(exp) .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey)) .compact(); } - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { conf.setAuthenticationEnabled(true); @@ -117,7 +123,7 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase .authentication(AuthenticationFactory.token(ADMIN_TOKEN))); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -171,4 +177,53 @@ public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase log.info("-- Exiting {} test --", methodName); } + @DataProvider + public static Object[][] provider() { + // The 1st element specifies whether to use TCP service URL + // The 2nd element specifies whether to use a token with correct permission + return new Object[][] { + { true, true }, + { true, false }, + { false, true }, + { false, false }, + }; + } + + @Test(dataProvider = "provider") + public void testTopicNotFound(boolean useTcpServiceUrl, boolean useCorrectToken) throws Exception { + final var operationTimeoutMs = 10000; + final var url = useTcpServiceUrl ? pulsar.getBrokerServiceUrl() : pulsar.getWebServiceAddress(); + final var token = useCorrectToken ? ADMIN_TOKEN : USER_TOKEN; + @Cleanup final var client = PulsarClient.builder().serviceUrl(url) + .operationTimeout(operationTimeoutMs, TimeUnit.MILLISECONDS) + .authentication(AuthenticationFactory.token(token)).build(); + final var topic = "my-property/not-exist/tp"; // the namespace does not exist + var start = System.currentTimeMillis(); + try { + client.newProducer().topic(topic).create(); + Assert.fail(); + } catch (PulsarClientException e) { + final var elapsedMs = System.currentTimeMillis() - start; + log.info("Failed to create producer after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage()); + Assert.assertTrue(elapsedMs < operationTimeoutMs); + if (useTcpServiceUrl) { + Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException); + } else { + Assert.assertTrue(e instanceof PulsarClientException.NotFoundException); + } + } + start = System.currentTimeMillis(); + try { + client.newConsumer().topic(topic).subscriptionName("sub").subscribe(); + } catch (PulsarClientException e) { + final var elapsedMs = System.currentTimeMillis() - start; + log.info("Failed to subscribe after {} ms: {} {}", elapsedMs, e.getClass().getName(), e.getMessage()); + Assert.assertTrue(elapsedMs < operationTimeoutMs); + if (useTcpServiceUrl) { + Assert.assertTrue(e instanceof PulsarClientException.TopicDoesNotExistException); + } else { + Assert.assertTrue(e instanceof PulsarClientException.NotFoundException); + } + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 82e7c828104..52583ce0afe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -409,9 +409,9 @@ public class PulsarClientImpl implements PulsarClient { } }).exceptionally(ex -> { Throwable actEx = FutureUtil.unwrapCompletionException(ex); - if (forceNoPartitioned && actEx instanceof PulsarClientException.NotFoundException + if (forceNoPartitioned && (actEx instanceof PulsarClientException.NotFoundException || actEx instanceof PulsarClientException.TopicDoesNotExistException - || actEx instanceof PulsarAdminException.NotFoundException) { + || actEx instanceof PulsarAdminException.NotFoundException)) { checkPartitions.complete(0); } else { checkPartitions.completeExceptionally(ex);
