This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e098ff1954d [fix][broker] Reformat property in 
generateResponseWithEntry (#20481)
e098ff1954d is described below

commit e098ff1954d7cd7f9dfa1c7cfc18eaea81b45522
Author: StevenLuMT <[email protected]>
AuthorDate: Fri Jun 30 12:49:43 2023 +0800

    [fix][broker] Reformat property in generateResponseWithEntry (#20481)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  9 +++++---
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 25 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  8 ++++---
 3 files changed, 36 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 952a775c4af..426c05fca38 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.github.zafarkhaja.semver.Version;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -3087,9 +3088,11 @@ public class PersistentTopicsBase extends AdminResource {
 
         ResponseBuilder responseBuilder = Response.ok();
         responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
-        for (KeyValue keyValue : metadata.getPropertiesList()) {
-            responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), 
keyValue.getValue());
-        }
+
+        Map<String, String> properties = metadata.getPropertiesList().stream()
+                .collect(Collectors.toMap(KeyValue::getKey, 
KeyValue::getValue, (v1, v2) -> v2));
+        responseBuilder.header("X-Pulsar-PROPERTY", new 
Gson().toJson(properties));
+
         if (brokerEntryMetadata != null) {
             if (brokerEntryMetadata.hasBrokerTimestamp()) {
                 
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 284e50c8302..a5de86b08df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1386,6 +1386,31 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testGetMessageById4SpecialPropsInMsg() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", 
Set.of("test"));
+        final String topicName1 = 
"persistent://tenant-xyz/ns-abc/testGetMessageById1";
+        admin.topics().createNonPartitionedTopic(topicName1);
+        Map<String, String> inSpecialProps = new HashMap<>();
+        inSpecialProps.put("city=shanghai", "tag");
+        inSpecialProps.put("city,beijing", "haidian");
+        @Cleanup
+        ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) 
pulsarClient.newProducer().topic(topicName1)
+                .enableBatching(false).create();
+        String data1 = "test1";
+        MessageIdImpl id1 = (MessageIdImpl) 
producer1.newMessage().value(data1.getBytes()).properties(inSpecialProps)
+                .send();
+
+        Message<byte[]> message1 = admin.topics().getMessageById(topicName1, 
id1.getLedgerId(), id1.getEntryId());
+        Assert.assertEquals(message1.getData(), data1.getBytes());
+        Map<String, String> outSpecialProps = message1.getProperties();
+        for (String k : inSpecialProps.keySet()) {
+            Assert.assertEquals(inSpecialProps.get(k), outSpecialProps.get(k));
+        }
+    }
+
     @Test
     public void testGetMessageIdByTimestamp() throws Exception {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 33d1cd17858..e0c64319ea2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin.internal;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.InputStream;
@@ -1391,9 +1392,10 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
             for (Entry<String, List<Object>> entry : headers.entrySet()) {
                 String header = entry.getKey();
-                if (header.contains("X-Pulsar-PROPERTY-")) {
-                    String keyName = 
header.substring("X-Pulsar-PROPERTY-".length());
-                    properties.put(keyName, (String) entry.getValue().get(0));
+                if ("X-Pulsar-PROPERTY".equals(header)) {
+                    Map<String, String> msgPropsTmp = new 
Gson().fromJson((String) entry.getValue().get(0), Map.class);
+                    properties.putAll(msgPropsTmp);
+                    break;
                 }
             }
 

Reply via email to