This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c676b9593e2e45ae0b60a97ed7fa5ed53ccd8cb4
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 6 17:01:33 2024 +0800

    [fix][broker] Fix currently client retries until operation timeout if the 
topic does not exist (#23530)
    
    (cherry picked from commit 8eeb0e2e89f8938348d29044d9e1d843a6251067)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../broker/admin/GetPartitionMetadataTest.java     |  3 +-
 .../TokenAuthenticatedProducerConsumerTest.java    | 71 +++++++++++++++++++---
 .../pulsar/client/impl/PulsarClientImpl.java       |  4 +-
 4 files changed, 67 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index beef7819589..a1dcd7af6b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -668,7 +668,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 Throwable actEx = FutureUtil.unwrapCompletionException(ex);
                 if (actEx instanceof WebApplicationException restException) {
                     if (restException.getResponse().getStatus() == 
Response.Status.NOT_FOUND.getStatusCode()) {
-                        
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                        
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound,
                         "Tenant or namespace or topic does not exist: " + 
topicName.getNamespace() ,
                                 requestId));
                         lookupSemaphore.release();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index d06f6ea1c56..f3237676c86 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -599,8 +599,7 @@ public class GetPartitionMetadataTest extends 
TestRetrySupport {
                 fail("Expected a not found ex");
             } catch (Exception ex) {
                 Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
-                assertTrue(unwrapEx instanceof 
PulsarClientException.BrokerMetadataException ||
-                        unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException);
+                assertTrue(unwrapEx instanceof 
PulsarClientException.TopicDoesNotExistException);
             }
         }
         // Verify: lookup semaphore has been releases.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index 4d5e7deaf7d..8f56d8b24ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
@@ -42,8 +43,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -56,31 +58,35 @@ import org.testng.annotations.Test;
 public class TokenAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);
 
+    private final static String ADMIN_ROLE = "admin";
     private final String ADMIN_TOKEN;
+    private final String USER_TOKEN;
     private final String TOKEN_PUBLIC_KEY;
+    private final KeyPair kp;
 
     TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException {
         KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
-        KeyPair kp = kpg.generateKeyPair();
+        kp = kpg.generateKeyPair();
 
         byte[] encodedPublicKey = kp.getPublic().getEncoded();
         TOKEN_PUBLIC_KEY = "data:;base64," + 
Base64.getEncoder().encodeToString(encodedPublicKey);
-        ADMIN_TOKEN = generateToken(kp);
+        ADMIN_TOKEN = generateToken(ADMIN_ROLE);
+        USER_TOKEN = generateToken("user");
     }
 
-    private String generateToken(KeyPair kp) {
+    private String generateToken(String subject) {
         PrivateKey pkey = kp.getPrivate();
         long expMillis = System.currentTimeMillis() + 
Duration.ofHours(1).toMillis();
         Date exp = new Date(expMillis);
 
         return Jwts.builder()
-            .setSubject("admin")
+            .setSubject(subject)
             .setExpiration(exp)
             .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
             .compact();
     }
 
-    @BeforeMethod
+    @BeforeClass
     @Override
     protected void setup() throws Exception {
         conf.setAuthenticationEnabled(true);
@@ -117,7 +123,7 @@ public class TokenAuthenticatedProducerConsumerTest extends 
ProducerConsumerBase
                 .authentication(AuthenticationFactory.token(ADMIN_TOKEN)));
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -171,4 +177,53 @@ public class TokenAuthenticatedProducerConsumerTest 
extends ProducerConsumerBase
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @DataProvider
+    public static Object[][] provider() {
+        // The 1st element specifies whether to use TCP service URL
+        // The 2nd element specifies whether to use a token with correct 
permission
+        return new Object[][] {
+                { true, true },
+                { true, false },
+                { false, true },
+                { false, false },
+        };
+    }
+
+    @Test(dataProvider = "provider")
+    public void testTopicNotFound(boolean useTcpServiceUrl, boolean 
useCorrectToken) throws Exception {
+        final var operationTimeoutMs = 10000;
+        final var url = useTcpServiceUrl ? pulsar.getBrokerServiceUrl() : 
pulsar.getWebServiceAddress();
+        final var token = useCorrectToken ? ADMIN_TOKEN : USER_TOKEN;
+        @Cleanup final var client = PulsarClient.builder().serviceUrl(url)
+                .operationTimeout(operationTimeoutMs, TimeUnit.MILLISECONDS)
+                .authentication(AuthenticationFactory.token(token)).build();
+        final var topic = "my-property/not-exist/tp"; // the namespace does 
not exist
+        var start = System.currentTimeMillis();
+        try {
+            client.newProducer().topic(topic).create();
+            Assert.fail();
+        } catch (PulsarClientException e) {
+            final var elapsedMs = System.currentTimeMillis() - start;
+            log.info("Failed to create producer after {} ms: {} {}", 
elapsedMs, e.getClass().getName(), e.getMessage());
+            Assert.assertTrue(elapsedMs < operationTimeoutMs);
+            if (useTcpServiceUrl) {
+                Assert.assertTrue(e instanceof 
PulsarClientException.TopicDoesNotExistException);
+            } else {
+                Assert.assertTrue(e instanceof 
PulsarClientException.NotFoundException);
+            }
+        }
+        start = System.currentTimeMillis();
+        try {
+            
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
+        } catch (PulsarClientException e) {
+            final var elapsedMs = System.currentTimeMillis() - start;
+            log.info("Failed to subscribe after {} ms: {} {}", elapsedMs, 
e.getClass().getName(), e.getMessage());
+            Assert.assertTrue(elapsedMs < operationTimeoutMs);
+            if (useTcpServiceUrl) {
+                Assert.assertTrue(e instanceof 
PulsarClientException.TopicDoesNotExistException);
+            } else {
+                Assert.assertTrue(e instanceof 
PulsarClientException.NotFoundException);
+            }
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 82e7c828104..52583ce0afe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -409,9 +409,9 @@ public class PulsarClientImpl implements PulsarClient {
             }
         }).exceptionally(ex -> {
             Throwable actEx = FutureUtil.unwrapCompletionException(ex);
-            if (forceNoPartitioned && actEx instanceof 
PulsarClientException.NotFoundException
+            if (forceNoPartitioned && (actEx instanceof 
PulsarClientException.NotFoundException
                     || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
-                    || actEx instanceof 
PulsarAdminException.NotFoundException) {
+                    || actEx instanceof 
PulsarAdminException.NotFoundException)) {
                 checkPartitions.complete(0);
             } else {
                 checkPartitions.completeExceptionally(ex);

Reply via email to