This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 998bd90fba5 [fix][broker] Can't connecte to non-persist topic when
enable broker client tls (#22991)
998bd90fba5 is described below
commit 998bd90fba5bbc46286e486f601f34875ee8e528
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 4 07:02:26 2024 +0800
[fix][broker] Can't connecte to non-persist topic when enable broker client
tls (#22991)
(cherry picked from commit deb26f7662268def7f838f722de4a677b3d546ed)
---
.../pulsar/broker/namespace/NamespaceService.java | 10 ++++-
.../api/TokenExpirationProduceConsumerTest.java | 45 +++++++++++++++++-----
2 files changed, 45 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 7bf05857095..056679bc89c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1421,7 +1421,15 @@ public class NamespaceService implements AutoCloseable {
return FutureUtil.failedFuture(new
ServiceUnitNotReadyException(
"No broker was available to own " +
topicName));
}
- return
pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
+ LookupData lookupData = lookupResult.get().getLookupData();
+ String brokerUrl;
+ if (pulsar.getConfiguration().isBrokerClientTlsEnabled()
+ &&
StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) {
+ brokerUrl = lookupData.getBrokerUrlTls();
+ } else {
+ brokerUrl = lookupData.getBrokerUrl();
+ }
+ return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)
.thenApply(metadata -> true)
.exceptionallyCompose(ex -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
index 4fc0d315d22..eb3056307d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
@@ -18,11 +18,23 @@
*/
package org.apache.pulsar.client.api;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import java.nio.charset.StandardCharsets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -40,15 +52,6 @@ import org.mockito.internal.util.MockUtil;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.crypto.SecretKey;
-import java.time.Duration;
-import java.util.Base64;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
@Test(groups = "broker-api")
@Slf4j
@@ -114,6 +117,7 @@ public class TokenExpirationProduceConsumerTest extends
TlsProducerConsumerBase
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+ conf.setBrokerClientTlsEnabled(true);
conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
}
@@ -139,6 +143,29 @@ public class TokenExpirationProduceConsumerTest extends
TlsProducerConsumerBase
return clientBuilder.build();
}
+ @Test
+ public void testNonPersistentTopic() throws Exception {
+
+ @Cleanup
+ PulsarClient pulsarClient = getClient(ADMIN_TOKEN);
+
+ String topic = "non-persistent://" + namespaceName +
"/test-token-non-persistent";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test").subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
+ producer.send(msg);
+
+ Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals(receive.getData(), msg);
+ }
+
@Test
public void testTokenExpirationProduceConsumer() throws Exception {
Calendar calendar = Calendar.getInstance();