This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ab042ae7 [fix] peek message will return -1 for partitionIndex (#1267)
ab042ae7 is described below
commit ab042ae714d14ff8a07ba0d55ba4d887879e1e00
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 13 18:58:49 2024 +0800
[fix] peek message will return -1 for partitionIndex (#1267)
### Motivation
If peek a partitioned topic, will see a message id: `7316:0:-1:-1`, the
parititonIndex should not be -1.
```
pulsarctl subscription peek --count 10
persistent://public/default/my-topic-partition-0 test-sub
Message ID : 7316:0:-1:-1
Properties :
{
"publish-time": "2024-08-08T17:50:39.476+08:00"
}
Message :
00000000 68 65 6c 6c 6f 2d 31 |hello-1|
```
### Modifications
- Set partition index on peek message.
---
pulsaradmin/pkg/admin/subscription.go | 3 +-
pulsaradmin/pkg/admin/subscription_test.go | 54 ++++++++++++++++++++++++++++++
pulsaradmin/pkg/utils/message_id.go | 9 +++++
pulsaradmin/pkg/utils/topic_name.go | 4 +++
4 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/pulsaradmin/pkg/admin/subscription.go
b/pulsaradmin/pkg/admin/subscription.go
index 8ddb5845..996ebb4e 100644
--- a/pulsaradmin/pkg/admin/subscription.go
+++ b/pulsaradmin/pkg/admin/subscription.go
@@ -234,8 +234,9 @@ const (
)
func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message,
error) {
+
msgID := resp.Header.Get("X-Pulsar-Message-ID")
- ID, err := utils.ParseMessageID(msgID)
+ ID, err := utils.ParseMessageIDWithPartitionIndex(msgID,
topic.GetPartitionIndex())
if err != nil {
return nil, err
}
diff --git a/pulsaradmin/pkg/admin/subscription_test.go
b/pulsaradmin/pkg/admin/subscription_test.go
index c4ba717d..92c79c1f 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -90,6 +90,60 @@ func TestGetMessagesByID(t *testing.T) {
}
+func TestPeekMessageForPartitionedTopic(t *testing.T) {
+ ctx := context.Background()
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ topicName, _ := utils.GetTopicName(topic)
+ subName := "test-sub"
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+
+ err = admin.Topics().Create(*topicName, 2)
+ assert.NoError(t, err)
+
+ err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest)
+ assert.NoError(t, err)
+
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+
+ for i := 0; i < 100; i++ {
+ producer.SendAsync(ctx, &pulsar.ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }, nil)
+ }
+ err = producer.Flush()
+ if err != nil {
+ return
+ }
+
+ for i := 0; i < 2; i++ {
+ topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i)
+ topicName, err := utils.GetTopicName(topicWithPartition)
+ assert.NoError(t, err)
+ messages, err := admin.Subscriptions().PeekMessages(*topicName,
subName, 10)
+ assert.NoError(t, err)
+ assert.NotNil(t, messages)
+ for _, msg := range messages {
+ assert.Equal(t, msg.GetMessageID().PartitionIndex, i)
+ }
+ }
+}
+
func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
diff --git a/pulsaradmin/pkg/utils/message_id.go
b/pulsaradmin/pkg/utils/message_id.go
index d75b613e..f65c031e 100644
--- a/pulsaradmin/pkg/utils/message_id.go
+++ b/pulsaradmin/pkg/utils/message_id.go
@@ -34,6 +34,15 @@ type MessageID struct {
var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
var Earliest = MessageID{-1, -1, -1, -1}
+func ParseMessageIDWithPartitionIndex(str string, index int) (*MessageID,
error) {
+ id, err := ParseMessageID(str)
+ if err != nil {
+ return nil, err
+ }
+ id.PartitionIndex = index
+ return id, nil
+}
+
func ParseMessageID(str string) (*MessageID, error) {
s := strings.Split(str, ":")
diff --git a/pulsaradmin/pkg/utils/topic_name.go
b/pulsaradmin/pkg/utils/topic_name.go
index 268abd73..78719837 100644
--- a/pulsaradmin/pkg/utils/topic_name.go
+++ b/pulsaradmin/pkg/utils/topic_name.go
@@ -143,6 +143,10 @@ func (t *TopicName) GetPartition(index int) (*TopicName,
error) {
return GetTopicName(topicNameWithPartition)
}
+func (t *TopicName) GetPartitionIndex() int {
+ return t.partitionIndex
+}
+
func getPartitionIndex(topic string) int {
if strings.Contains(topic, PARTITIONEDTOPICSUFFIX) {
parts := strings.Split(topic, "-")