This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8eeb0e2e89f [fix][broker] Fix currently client retries until operation
timeout if the topic does not exist (#23530)
8eeb0e2e89f is described below
commit 8eeb0e2e89f8938348d29044d9e1d843a6251067
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 6 17:01:33 2024 +0800
[fix][broker] Fix currently client retries until operation timeout if the
topic does not exist (#23530)
---
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../broker/admin/GetPartitionMetadataTest.java | 3 +-
.../TokenAuthenticatedProducerConsumerTest.java | 71 +++++++++++++++++++---
.../pulsar/client/impl/PulsarClientImpl.java | 4 +-
4 files changed, 67 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c14602bfca5..f9e593345d8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -713,7 +713,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof WebApplicationException restException) {
if (restException.getResponse().getStatus() ==
Response.Status.NOT_FOUND.getStatusCode()) {
-
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TopicNotFound,
"Tenant or namespace or topic does not exist: " +
topicName.getNamespace() ,
requestId));
lookupSemaphore.release();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
index d06f6ea1c56..f3237676c86 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
@@ -599,8 +599,7 @@ public class GetPartitionMetadataTest extends
TestRetrySupport {
fail("Expected a not found ex");
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
- assertTrue(unwrapEx instanceof
PulsarClientException.BrokerMetadataException ||
- unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException);
+ assertTrue(unwrapEx instanceof
PulsarClientException.TopicDoesNotExistException);
}
}
// Verify: lookup semaphore has been releases.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
index f8ae0279e08..9da3fcbd0ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
@@ -42,8 +43,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -56,31 +58,35 @@ import org.testng.annotations.Test;
public class TokenAuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);
+ private final static String ADMIN_ROLE = "admin";
private final String ADMIN_TOKEN;
+ private final String USER_TOKEN;
private final String TOKEN_PUBLIC_KEY;
+ private final KeyPair kp;
TokenAuthenticatedProducerConsumerTest() throws NoSuchAlgorithmException {
KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
- KeyPair kp = kpg.generateKeyPair();
+ kp = kpg.generateKeyPair();
byte[] encodedPublicKey = kp.getPublic().getEncoded();
TOKEN_PUBLIC_KEY = "data:;base64," +
Base64.getEncoder().encodeToString(encodedPublicKey);
- ADMIN_TOKEN = generateToken(kp);
+ ADMIN_TOKEN = generateToken(ADMIN_ROLE);
+ USER_TOKEN = generateToken("user");
}
- private String generateToken(KeyPair kp) {
+ private String generateToken(String subject) {
PrivateKey pkey = kp.getPrivate();
long expMillis = System.currentTimeMillis() +
Duration.ofHours(1).toMillis();
Date exp = new Date(expMillis);
return Jwts.builder()
- .setSubject("admin")
+ .setSubject(subject)
.setExpiration(exp)
.signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
.compact();
}
- @BeforeMethod
+ @BeforeClass
@Override
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
@@ -118,7 +124,7 @@ public class TokenAuthenticatedProducerConsumerTest extends
ProducerConsumerBase
.authentication(AuthenticationFactory.token(ADMIN_TOKEN)));
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -172,4 +178,53 @@ public class TokenAuthenticatedProducerConsumerTest
extends ProducerConsumerBase
log.info("-- Exiting {} test --", methodName);
}
+ @DataProvider
+ public static Object[][] provider() {
+ // The 1st element specifies whether to use TCP service URL
+ // The 2nd element specifies whether to use a token with correct
permission
+ return new Object[][] {
+ { true, true },
+ { true, false },
+ { false, true },
+ { false, false },
+ };
+ }
+
+ @Test(dataProvider = "provider")
+ public void testTopicNotFound(boolean useTcpServiceUrl, boolean
useCorrectToken) throws Exception {
+ final var operationTimeoutMs = 10000;
+ final var url = useTcpServiceUrl ? pulsar.getBrokerServiceUrl() :
pulsar.getWebServiceAddress();
+ final var token = useCorrectToken ? ADMIN_TOKEN : USER_TOKEN;
+ @Cleanup final var client = PulsarClient.builder().serviceUrl(url)
+ .operationTimeout(operationTimeoutMs, TimeUnit.MILLISECONDS)
+ .authentication(AuthenticationFactory.token(token)).build();
+ final var topic = "my-property/not-exist/tp"; // the namespace does
not exist
+ var start = System.currentTimeMillis();
+ try {
+ client.newProducer().topic(topic).create();
+ Assert.fail();
+ } catch (PulsarClientException e) {
+ final var elapsedMs = System.currentTimeMillis() - start;
+ log.info("Failed to create producer after {} ms: {} {}",
elapsedMs, e.getClass().getName(), e.getMessage());
+ Assert.assertTrue(elapsedMs < operationTimeoutMs);
+ if (useTcpServiceUrl) {
+ Assert.assertTrue(e instanceof
PulsarClientException.TopicDoesNotExistException);
+ } else {
+ Assert.assertTrue(e instanceof
PulsarClientException.NotFoundException);
+ }
+ }
+ start = System.currentTimeMillis();
+ try {
+
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
+ } catch (PulsarClientException e) {
+ final var elapsedMs = System.currentTimeMillis() - start;
+ log.info("Failed to subscribe after {} ms: {} {}", elapsedMs,
e.getClass().getName(), e.getMessage());
+ Assert.assertTrue(elapsedMs < operationTimeoutMs);
+ if (useTcpServiceUrl) {
+ Assert.assertTrue(e instanceof
PulsarClientException.TopicDoesNotExistException);
+ } else {
+ Assert.assertTrue(e instanceof
PulsarClientException.NotFoundException);
+ }
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 603844eeb78..871666620b7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -418,9 +418,9 @@ public class PulsarClientImpl implements PulsarClient {
}
}).exceptionally(ex -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
- if (forceNoPartitioned && actEx instanceof
PulsarClientException.NotFoundException
+ if (forceNoPartitioned && (actEx instanceof
PulsarClientException.NotFoundException
|| actEx instanceof
PulsarClientException.TopicDoesNotExistException
- || actEx instanceof
PulsarAdminException.NotFoundException) {
+ || actEx instanceof
PulsarAdminException.NotFoundException)) {
checkPartitions.complete(0);
} else {
checkPartitions.completeExceptionally(ex);