This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a7ef9ede Compatible the HTTP header properties with PIP-279 (#1299)
a7ef9ede is described below
commit a7ef9edebe2234213ce4c45bda844ded1fa9e2ff
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Oct 24 20:15:00 2024 +0800
Compatible the HTTP header properties with PIP-279 (#1299)
### Motivation
After [pip-279](https://github.com/apache/pulsar/pull/20627), all
properties keys and values use json string save to header: `X-Pulsar-Property`
This PR to compatible with this change when using subscription admin API.
Also, Using `pip-279` also avoids the issue where the Go HTTP client
automatically formats HTTP headers: https://github.com/golang/go/issues/37834,
This will impact the peek command, the previous method might retrieve
`properties` with inconsistent casing compared to the user-defined.
### Modifications
- Compatible the HTTP header properties with PIP-279
---
pulsaradmin/pkg/admin/subscription.go | 15 ++++++++-
pulsaradmin/pkg/admin/subscription_test.go | 52 ++++++++++++++++++++++++++++++
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git a/pulsaradmin/pkg/admin/subscription.go
b/pulsaradmin/pkg/admin/subscription.go
index 996ebb4e..a1a13619 100644
--- a/pulsaradmin/pkg/admin/subscription.go
+++ b/pulsaradmin/pkg/admin/subscription.go
@@ -20,6 +20,7 @@ package admin
import (
"bytes"
"encoding/binary"
+ "encoding/json"
"io"
"net/http"
"net/url"
@@ -230,7 +231,14 @@ func safeRespClose(resp *http.Response) {
const (
PublishTimeHeader = "X-Pulsar-Publish-Time"
BatchHeader = "X-Pulsar-Num-Batch-Message"
- PropertyPrefix = "X-Pulsar-Property-"
+
+ // PropertyPrefix is part of the old protocol for message properties.
+ PropertyPrefix = "X-Pulsar-Property-"
+
+ // PropertyHeader is part of the new protocol introduced in SNIP-279
+ // https://github.com/apache/pulsar/pull/20627
+ // The value is a JSON string representing the properties.
+ PropertyHeader = "X-Pulsar-Property"
)
func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message,
error) {
@@ -261,6 +269,11 @@ func handleResp(topic utils.TopicName, resp
*http.Response) ([]*utils.Message, e
properties[BatchHeader] = h
}
return getIndividualMsgsFromBatch(topic, ID, payload,
properties)
+ case k == PropertyHeader:
+ propJSON := resp.Header.Get(k)
+ if err := json.Unmarshal([]byte(propJSON),
&properties); err != nil {
+ return nil, err
+ }
case strings.Contains(k, PropertyPrefix):
key := strings.TrimPrefix(k, PropertyPrefix)
properties[key] = resp.Header.Get(k)
diff --git a/pulsaradmin/pkg/admin/subscription_test.go
b/pulsaradmin/pkg/admin/subscription_test.go
index 92c79c1f..98db6961 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -144,6 +144,58 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
}
}
+func TestPeekMessageWithProperties(t *testing.T) {
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ topicName, _ := utils.GetTopicName(topic)
+ subName := "test-sub"
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ // Create a producer for non-batch messages
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ props := map[string]string{
+ "key1": "value1",
+ "KEY2": "VALUE2",
+ "KeY3": "VaLuE3",
+ "details=man": "good at playing basketball",
+ }
+
+ _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
+ Payload: []byte("test-message"),
+ Properties: props,
+ })
+ assert.NoError(t, err)
+
+ // Peek messages
+ messages, err := admin.Subscriptions().PeekMessages(*topicName,
subName, 1)
+ assert.NoError(t, err)
+ assert.NotNil(t, messages)
+
+ // Verify properties of messages
+ for _, msg := range messages {
+ assert.Equal(t, "value1", msg.Properties["key1"])
+ assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
+ assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
+ assert.Equal(t, "good at playing basketball",
msg.Properties["details=man"])
+ }
+}
+
func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName