This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 6f0af8b6b01 Fix http produce msg redirect issue. (#15551)
6f0af8b6b01 is described below
commit 6f0af8b6b0122789a9c5bf1101206867e87c2d42
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri May 13 10:36:33 2022 +0800
Fix http produce msg redirect issue. (#15551)
Master Issue: #15546
### Motivation
When lookup the topic ownership using REST produce, the redirect URI is
incorrect, because :
```
uri.getPath(false); //Get the path of the current request relative to the
base URI as a string.
```
So the redirect URI does not contain the base path:
```
URI redirectURI = new URI(String.format("%s%s", redirectAddresses.get(0),
uri.getPath(false)))
```
(cherry picked from commit 7f976da1b51cd868ec49b5ab43259fea4d48c8e9)
---
.../main/java/org/apache/pulsar/broker/rest/TopicsBase.java | 11 ++++++++---
.../test/java/org/apache/pulsar/broker/admin/TopicsTest.java | 8 ++++----
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 770d77794d5..86e8956d950 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.rest;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -378,10 +379,14 @@ public class TopicsBase extends PersistentTopicsBase {
log.debug("Redirect rest produce request for topic {}
from {} to {}.",
topicName, pulsar().getWebServiceAddress(),
redirectAddresses.get(0));
}
- URI redirectURI = new URI(String.format("%s%s",
redirectAddresses.get(0), uri.getPath(false)));
+ URL redirectAddress = new URL(redirectAddresses.get(0));
+ URI redirectURI = UriBuilder.fromUri(uri.getRequestUri())
+ .host(redirectAddress.getHost())
+ .port(redirectAddress.getPort())
+ .build();
asyncResponse.resume(Response.temporaryRedirect(redirectURI).build());
future.complete(true);
- } catch (URISyntaxException | NullPointerException e) {
+ } catch (Exception e) {
if (log.isDebugEnabled()) {
log.error("Error in preparing redirect url with rest
produce message request for topic {}: {}",
topicName, e.getMessage(), e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 7b77b1a74f2..6466c9495c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -77,6 +77,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.ByteArrayOutputStream;
+import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -313,13 +314,13 @@ public class TopicsTest extends
MockedPulsarServiceBaseTest {
@Test
public void testLookUpWithRedirect() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
- String requestPath =
"/admin/v3/topics/my-tenant/my-namespace/my-topic";
+ URI requestPath = URI.create(pulsar.getWebServiceAddress() +
"/topics/my-tenant/my-namespace/my-topic");
//create topic on one broker
admin.topics().createNonPartitionedTopic(topicName);
PulsarService pulsar2 = startBroker(getDefaultConf());
doReturn(false).when(topics).isRequestHttps();
UriInfo uriInfo = mock(UriInfo.class);
- doReturn(requestPath).when(uriInfo).getPath(anyBoolean());
+ doReturn(requestPath).when(uriInfo).getRequestUri();
Whitebox.setInternalState(topics, "uri", uriInfo);
//do produce on another broker
topics.setPulsar(pulsar2);
@@ -336,8 +337,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest
{
// Verify got redirect response
Assert.assertEquals(responseCaptor.getValue().getStatusInfo(),
Response.Status.TEMPORARY_REDIRECT);
// Verify URI point to address of broker the topic was created on
- Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
- pulsar.getWebServiceAddress() + requestPath);
+
Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
requestPath.toString());
}
@Test