This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new deb26f76622 [fix][broker] Can't connecte to non-persist topic when
enable broker client tls (#22991)
deb26f76622 is described below
commit deb26f7662268def7f838f722de4a677b3d546ed
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 4 07:02:26 2024 +0800
[fix][broker] Can't connecte to non-persist topic when enable broker client
tls (#22991)
---
.../pulsar/broker/namespace/NamespaceService.java | 10 +++++++-
.../api/TokenExpirationProduceConsumerTest.java | 27 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dfd03dfbc6e..2a1584df961 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1471,7 +1471,15 @@ public class NamespaceService implements AutoCloseable {
return FutureUtil.failedFuture(new
ServiceUnitNotReadyException(
"No broker was available to own " +
topicName));
}
- return
pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
+ LookupData lookupData = lookupResult.get().getLookupData();
+ String brokerUrl;
+ if (pulsar.getConfiguration().isBrokerClientTlsEnabled()
+ &&
StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) {
+ brokerUrl = lookupData.getBrokerUrlTls();
+ } else {
+ brokerUrl = lookupData.getBrokerUrl();
+ }
+ return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)
.thenApply(metadata -> true)
.exceptionallyCompose(ex -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
index fa9099f3d2f..d8ed1055720 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
@@ -32,6 +34,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
+import java.nio.charset.StandardCharsets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -107,6 +110,7 @@ public class TokenExpirationProduceConsumerTest extends
TlsProducerConsumerBase
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+ conf.setBrokerClientTlsEnabled(true);
conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
}
@@ -132,6 +136,29 @@ public class TokenExpirationProduceConsumerTest extends
TlsProducerConsumerBase
return clientBuilder.build();
}
+ @Test
+ public void testNonPersistentTopic() throws Exception {
+
+ @Cleanup
+ PulsarClient pulsarClient = getClient(ADMIN_TOKEN);
+
+ String topic = "non-persistent://" + namespaceName +
"/test-token-non-persistent";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test").subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+ producer.send(msg);
+
+ Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getData(), msg);
+ }
+
@Test
public void testTokenExpirationProduceConsumer() throws Exception {
Calendar calendar = Calendar.getInstance();