This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ab042ae7 [fix] peek message will return -1 for partitionIndex (#1267)
ab042ae7 is described below

commit ab042ae714d14ff8a07ba0d55ba4d887879e1e00
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 13 18:58:49 2024 +0800

    [fix] peek message will return -1 for partitionIndex (#1267)
    
    ### Motivation
    If peek a partitioned topic, will see a message id: `7316:0:-1:-1`, the 
parititonIndex should not be -1.
    
    ```
    pulsarctl subscription peek --count 10 
persistent://public/default/my-topic-partition-0 test-sub
    Message ID : 7316:0:-1:-1
    Properties :
    {
        "publish-time": "2024-08-08T17:50:39.476+08:00"
    }
    Message :
    00000000  68 65 6c 6c 6f 2d 31                              |hello-1|
    ```
    
    ### Modifications
    - Set partition index on peek message.
---
 pulsaradmin/pkg/admin/subscription.go      |  3 +-
 pulsaradmin/pkg/admin/subscription_test.go | 54 ++++++++++++++++++++++++++++++
 pulsaradmin/pkg/utils/message_id.go        |  9 +++++
 pulsaradmin/pkg/utils/topic_name.go        |  4 +++
 4 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/pulsaradmin/pkg/admin/subscription.go 
b/pulsaradmin/pkg/admin/subscription.go
index 8ddb5845..996ebb4e 100644
--- a/pulsaradmin/pkg/admin/subscription.go
+++ b/pulsaradmin/pkg/admin/subscription.go
@@ -234,8 +234,9 @@ const (
 )
 
 func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, 
error) {
+
        msgID := resp.Header.Get("X-Pulsar-Message-ID")
-       ID, err := utils.ParseMessageID(msgID)
+       ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, 
topic.GetPartitionIndex())
        if err != nil {
                return nil, err
        }
diff --git a/pulsaradmin/pkg/admin/subscription_test.go 
b/pulsaradmin/pkg/admin/subscription_test.go
index c4ba717d..92c79c1f 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -90,6 +90,60 @@ func TestGetMessagesByID(t *testing.T) {
 
 }
 
+func TestPeekMessageForPartitionedTopic(t *testing.T) {
+       ctx := context.Background()
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       topicName, _ := utils.GetTopicName(topic)
+       subName := "test-sub"
+
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+       assert.NotNil(t, admin)
+
+       err = admin.Topics().Create(*topicName, 2)
+       assert.NoError(t, err)
+
+       err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest)
+       assert.NoError(t, err)
+
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+
+       for i := 0; i < 100; i++ {
+               producer.SendAsync(ctx, &pulsar.ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }, nil)
+       }
+       err = producer.Flush()
+       if err != nil {
+               return
+       }
+
+       for i := 0; i < 2; i++ {
+               topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i)
+               topicName, err := utils.GetTopicName(topicWithPartition)
+               assert.NoError(t, err)
+               messages, err := admin.Subscriptions().PeekMessages(*topicName, 
subName, 10)
+               assert.NoError(t, err)
+               assert.NotNil(t, messages)
+               for _, msg := range messages {
+                       assert.Equal(t, msg.GetMessageID().PartitionIndex, i)
+               }
+       }
+}
+
 func TestGetMessageByID(t *testing.T) {
        randomName := newTopicName()
        topic := "persistent://public/default/" + randomName
diff --git a/pulsaradmin/pkg/utils/message_id.go 
b/pulsaradmin/pkg/utils/message_id.go
index d75b613e..f65c031e 100644
--- a/pulsaradmin/pkg/utils/message_id.go
+++ b/pulsaradmin/pkg/utils/message_id.go
@@ -34,6 +34,15 @@ type MessageID struct {
 var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
 var Earliest = MessageID{-1, -1, -1, -1}
 
+func ParseMessageIDWithPartitionIndex(str string, index int) (*MessageID, 
error) {
+       id, err := ParseMessageID(str)
+       if err != nil {
+               return nil, err
+       }
+       id.PartitionIndex = index
+       return id, nil
+}
+
 func ParseMessageID(str string) (*MessageID, error) {
        s := strings.Split(str, ":")
 
diff --git a/pulsaradmin/pkg/utils/topic_name.go 
b/pulsaradmin/pkg/utils/topic_name.go
index 268abd73..78719837 100644
--- a/pulsaradmin/pkg/utils/topic_name.go
+++ b/pulsaradmin/pkg/utils/topic_name.go
@@ -143,6 +143,10 @@ func (t *TopicName) GetPartition(index int) (*TopicName, 
error) {
        return GetTopicName(topicNameWithPartition)
 }
 
+func (t *TopicName) GetPartitionIndex() int {
+       return t.partitionIndex
+}
+
 func getPartitionIndex(topic string) int {
        if strings.Contains(topic, PARTITIONEDTOPICSUFFIX) {
                parts := strings.Split(topic, "-")

Reply via email to