This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3bbd5a314ed [fix][broker] Fix web tls url null cause NPE (#21137)
3bbd5a314ed is described below
commit 3bbd5a314ed73411b7026db189b3ad7ca17979ba
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Sep 6 20:27:23 2023 -0500
[fix][broker] Fix web tls url null cause NPE (#21137)
---
.../org/apache/pulsar/broker/rest/TopicsBase.java | 13 ++-
.../pulsar/broker/admin/TopicsWithoutTlsTest.java | 128 +++++++++++++++++++++
2 files changed, 135 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 5ab81fcbbff..20c35b4f776 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -54,6 +54,7 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
@@ -433,8 +434,10 @@ public class TopicsBase extends PersistentTopicsBase {
}
LookupResult result = optionalResult.get();
- if
(result.getLookupData().getHttpUrl().equals(pulsar().getWebServiceAddress())
- ||
result.getLookupData().getHttpUrlTls().equals(pulsar().getWebServiceAddressTls()))
{
+ String httpUrl = result.getLookupData().getHttpUrl();
+ String httpUrlTls = result.getLookupData().getHttpUrlTls();
+ if ((StringUtils.isNotBlank(httpUrl) &&
httpUrl.equals(pulsar().getWebServiceAddress()))
+ || (StringUtils.isNotBlank(httpUrlTls) &&
httpUrlTls.equals(pulsar().getWebServiceAddressTls()))) {
// Current broker owns the topic, add to owning topic.
if (log.isDebugEnabled()) {
log.debug("Complete topic look up for rest produce message
request for topic {}, "
@@ -455,12 +458,10 @@ public class TopicsBase extends PersistentTopicsBase {
}
if (result.isRedirect()) {
// Redirect lookup.
-
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
- result.getLookupData().getHttpUrlTls()), false),
redirectAddresses, future);
+ completeLookup(Pair.of(Arrays.asList(httpUrl, httpUrlTls),
false), redirectAddresses, future);
} else {
// Found owner for topic.
-
completeLookup(Pair.of(Arrays.asList(result.getLookupData().getHttpUrl(),
- result.getLookupData().getHttpUrlTls()), true),
redirectAddresses, future);
+ completeLookup(Pair.of(Arrays.asList(httpUrl, httpUrlTls),
true), redirectAddresses, future);
}
}
}).exceptionally(exception -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsWithoutTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsWithoutTlsTest.java
new file mode 100644
index 00000000000..88bf2f8f421
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsWithoutTlsTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.rest.Topics;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessages;
+import org.mockito.ArgumentCaptor;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.net.URI;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class TopicsWithoutTlsTest extends MockedPulsarServiceBaseTest {
+
+ private Topics topics;
+ private final String testLocalCluster = "test";
+ private final String testTenant = "my-tenant";
+ private final String testNamespace = "my-namespace";
+ private final String testTopicName = "my-topic";
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ super.internalSetup();
+ topics = spy(new Topics());
+ topics.setPulsar(pulsar);
+ doReturn(TopicDomain.persistent.value()).when(topics).domain();
+ doReturn("test-app").when(topics).clientAppId();
+
doReturn(mock(AuthenticationDataHttps.class)).when(topics).clientAuthData();
+ admin.clusters().createCluster(testLocalCluster, new
ClusterDataImpl());
+ admin.tenants().createTenant(testTenant, new
TenantInfoImpl(Set.of("role1", "role2"), Set.of(testLocalCluster)));
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace,
+ Set.of(testLocalCluster));
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setBrokerServicePortTls(Optional.empty());
+ this.conf.setWebServicePortTls(Optional.empty());
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testLookUpWithRedirect() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ URI requestPath = URI.create(pulsar.getWebServiceAddress() +
"/topics/my-tenant/my-namespace/my-topic");
+ //create topic on one broker
+ admin.topics().createNonPartitionedTopic(topicName);
+ conf.setBrokerServicePort(Optional.of(0));
+ conf.setWebServicePort(Optional.of(0));
+ @Cleanup
+ PulsarTestContext pulsarTestContext2 =
createAdditionalPulsarTestContext(conf);
+ PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
+ doReturn(false).when(topics).isRequestHttps();
+ UriInfo uriInfo = mock(UriInfo.class);
+ doReturn(requestPath).when(uriInfo).getRequestUri();
+ FieldUtils.writeField(topics, "uri", uriInfo, true);
+ //do produce on another broker
+ topics.setPulsar(pulsar2);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ ProducerMessages producerMessages = new ProducerMessages();
+
producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().
+ writeValueAsString(Schema.INT64.getSchemaInfo()));
+ String message = "[]";
+ producerMessages.setMessages(createMessages(message));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false, producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ // Verify got redirect response
+ Assert.assertEquals(responseCaptor.getValue().getStatusInfo(),
Response.Status.TEMPORARY_REDIRECT);
+ // Verify URI point to address of broker the topic was created on
+
Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
requestPath.toString());
+ }
+
+ private static List<ProducerMessage> createMessages(String message) throws
JsonProcessingException {
+ return ObjectMapperFactory.getMapper().reader()
+ .forType(new TypeReference<List<ProducerMessage>>() {
+ }).readValue(message);
+ }
+}