This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a7ef9ede Compatible the HTTP header properties with PIP-279 (#1299)
a7ef9ede is described below

commit a7ef9edebe2234213ce4c45bda844ded1fa9e2ff
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Oct 24 20:15:00 2024 +0800

    Compatible the HTTP header properties with PIP-279 (#1299)
    
    ### Motivation
    
    After [pip-279](https://github.com/apache/pulsar/pull/20627), all 
properties keys and values use json string save to header: `X-Pulsar-Property`
    
    This PR to compatible with this change when using subscription admin API.
    
    Also, Using `pip-279` also avoids the issue where the Go HTTP client 
automatically formats HTTP headers: https://github.com/golang/go/issues/37834, 
This will impact the peek command, the previous method might retrieve 
`properties` with inconsistent casing compared to the user-defined.
    
    ### Modifications
    - Compatible the HTTP header properties with PIP-279
---
 pulsaradmin/pkg/admin/subscription.go      | 15 ++++++++-
 pulsaradmin/pkg/admin/subscription_test.go | 52 ++++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/pulsaradmin/pkg/admin/subscription.go 
b/pulsaradmin/pkg/admin/subscription.go
index 996ebb4e..a1a13619 100644
--- a/pulsaradmin/pkg/admin/subscription.go
+++ b/pulsaradmin/pkg/admin/subscription.go
@@ -20,6 +20,7 @@ package admin
 import (
        "bytes"
        "encoding/binary"
+       "encoding/json"
        "io"
        "net/http"
        "net/url"
@@ -230,7 +231,14 @@ func safeRespClose(resp *http.Response) {
 const (
        PublishTimeHeader = "X-Pulsar-Publish-Time"
        BatchHeader       = "X-Pulsar-Num-Batch-Message"
-       PropertyPrefix    = "X-Pulsar-Property-"
+
+       // PropertyPrefix is part of the old protocol for message properties.
+       PropertyPrefix = "X-Pulsar-Property-"
+
+       // PropertyHeader is part of the new protocol introduced in SNIP-279
+       // https://github.com/apache/pulsar/pull/20627
+       // The value is a JSON string representing the properties.
+       PropertyHeader = "X-Pulsar-Property"
 )
 
 func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, 
error) {
@@ -261,6 +269,11 @@ func handleResp(topic utils.TopicName, resp 
*http.Response) ([]*utils.Message, e
                                properties[BatchHeader] = h
                        }
                        return getIndividualMsgsFromBatch(topic, ID, payload, 
properties)
+               case k == PropertyHeader:
+                       propJSON := resp.Header.Get(k)
+                       if err := json.Unmarshal([]byte(propJSON), 
&properties); err != nil {
+                               return nil, err
+                       }
                case strings.Contains(k, PropertyPrefix):
                        key := strings.TrimPrefix(k, PropertyPrefix)
                        properties[key] = resp.Header.Get(k)
diff --git a/pulsaradmin/pkg/admin/subscription_test.go 
b/pulsaradmin/pkg/admin/subscription_test.go
index 92c79c1f..98db6961 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -144,6 +144,58 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
        }
 }
 
+func TestPeekMessageWithProperties(t *testing.T) {
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       topicName, _ := utils.GetTopicName(topic)
+       subName := "test-sub"
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       // Create a producer for non-batch messages
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+
+       props := map[string]string{
+               "key1":        "value1",
+               "KEY2":        "VALUE2",
+               "KeY3":        "VaLuE3",
+               "details=man": "good at playing basketball",
+       }
+
+       _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
+               Payload:    []byte("test-message"),
+               Properties: props,
+       })
+       assert.NoError(t, err)
+
+       // Peek messages
+       messages, err := admin.Subscriptions().PeekMessages(*topicName, 
subName, 1)
+       assert.NoError(t, err)
+       assert.NotNil(t, messages)
+
+       // Verify properties of messages
+       for _, msg := range messages {
+               assert.Equal(t, "value1", msg.Properties["key1"])
+               assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
+               assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
+               assert.Equal(t, "good at playing basketball", 
msg.Properties["details=man"])
+       }
+}
+
 func TestGetMessageByID(t *testing.T) {
        randomName := newTopicName()
        topic := "persistent://public/default/" + randomName

Reply via email to