This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e098ff1954d [fix][broker] Reformat property in
generateResponseWithEntry (#20481)
e098ff1954d is described below
commit e098ff1954d7cd7f9dfa1c7cfc18eaea81b45522
Author: StevenLuMT <[email protected]>
AuthorDate: Fri Jun 30 12:49:43 2023 +0800
[fix][broker] Reformat property in generateResponseWithEntry (#20481)
---
.../broker/admin/impl/PersistentTopicsBase.java | 9 +++++---
.../pulsar/broker/admin/PersistentTopicsTest.java | 25 ++++++++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 8 ++++---
3 files changed, 36 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 952a775c4af..426c05fca38 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
+import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -3087,9 +3088,11 @@ public class PersistentTopicsBase extends AdminResource {
ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
- for (KeyValue keyValue : metadata.getPropertiesList()) {
- responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(),
keyValue.getValue());
- }
+
+ Map<String, String> properties = metadata.getPropertiesList().stream()
+ .collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue, (v1, v2) -> v2));
+ responseBuilder.header("X-Pulsar-PROPERTY", new
Gson().toJson(properties));
+
if (brokerEntryMetadata != null) {
if (brokerEntryMetadata.hasBrokerTimestamp()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 284e50c8302..a5de86b08df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1386,6 +1386,31 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testGetMessageById4SpecialPropsInMsg() throws Exception {
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
+ admin.tenants().createTenant("tenant-xyz", tenantInfo);
+ admin.namespaces().createNamespace("tenant-xyz/ns-abc",
Set.of("test"));
+ final String topicName1 =
"persistent://tenant-xyz/ns-abc/testGetMessageById1";
+ admin.topics().createNonPartitionedTopic(topicName1);
+ Map<String, String> inSpecialProps = new HashMap<>();
+ inSpecialProps.put("city=shanghai", "tag");
+ inSpecialProps.put("city,beijing", "haidian");
+ @Cleanup
+ ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>)
pulsarClient.newProducer().topic(topicName1)
+ .enableBatching(false).create();
+ String data1 = "test1";
+ MessageIdImpl id1 = (MessageIdImpl)
producer1.newMessage().value(data1.getBytes()).properties(inSpecialProps)
+ .send();
+
+ Message<byte[]> message1 = admin.topics().getMessageById(topicName1,
id1.getLedgerId(), id1.getEntryId());
+ Assert.assertEquals(message1.getData(), data1.getBytes());
+ Map<String, String> outSpecialProps = message1.getProperties();
+ for (String k : inSpecialProps.keySet()) {
+ Assert.assertEquals(inSpecialProps.get(k), outSpecialProps.get(k));
+ }
+ }
+
@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 33d1cd17858..e0c64319ea2 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin.internal;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.InputStream;
@@ -1391,9 +1392,10 @@ public class TopicsImpl extends BaseResource implements
Topics {
for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
- if (header.contains("X-Pulsar-PROPERTY-")) {
- String keyName =
header.substring("X-Pulsar-PROPERTY-".length());
- properties.put(keyName, (String) entry.getValue().get(0));
+ if ("X-Pulsar-PROPERTY".equals(header)) {
+ Map<String, String> msgPropsTmp = new
Gson().fromJson((String) entry.getValue().get(0), Map.class);
+ properties.putAll(msgPropsTmp);
+ break;
}
}