This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d664fcfb fix: add batchIdx to messageID.String() when batched message 
(#1491)
d664fcfb is described below

commit d664fcfb2284f2f41abace18e2d4163e8ab61c93
Author: adrianiacobghiula <[email protected]>
AuthorDate: Sun May 10 14:10:34 2026 +0200

    fix: add batchIdx to messageID.String() when batched message (#1491)
    
    * fix: add batchIdx to messageID.String() when batched message
    
    * fix: add batchIdx to messageID.String() when batched message
---
 pulsar/impl_message.go                     |  3 +++
 pulsar/impl_message_test.go                | 12 ++++++++++++
 pulsaradmin/pkg/admin/subscription_test.go | 16 ++++++++++------
 3 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index de338516..7b68e190 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -194,6 +194,9 @@ func (id *messageID) BatchSize() int32 {
 }
 
 func (id *messageID) String() string {
+       if id.batchIdx > -1 {
+               return fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID, 
id.partitionIdx, id.batchIdx)
+       }
        return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
 }
 
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index 6a21171c..8f419517 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -129,3 +129,15 @@ func TestAckingMessageIDBatchTwo(t *testing.T) {
        assert.Equal(t, true, ids[0].ack())
        assert.Equal(t, true, tracker.completed())
 }
+
+func TestMessageStringOnMessage(t *testing.T) {
+       id := newMessageID(1, 2, -1, 4, 0)
+
+       assert.Equal(t, "1:2:4", id.String())
+}
+
+func TestMessageStringOnBatchMessage(t *testing.T) {
+       id := newMessageID(1, 2, 3, 4, 5)
+
+       assert.Equal(t, "1:2:4:3", id.String())
+}
diff --git a/pulsaradmin/pkg/admin/subscription_test.go 
b/pulsaradmin/pkg/admin/subscription_test.go
index 08d2cf22..2d937a6e 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -64,24 +64,28 @@ func TestGetMessagesByID(t *testing.T) {
 
        var wg sync.WaitGroup
        wg.Add(numberMessages)
-       messageIDMap := make(map[string]int32)
+       // Group by ledger:entry (ignoring batchIdx) to count batch sizes
+       type ledgerEntry struct {
+               LedgerID int64
+               EntryID  int64
+       }
+       messageIDMap := make(map[ledgerEntry]int32)
        for i := 0; i <= numberMessages; i++ {
                producer.SendAsync(ctx, &pulsar.ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                }, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err 
error) {
                        assert.Nil(t, err)
-                       messageIDMap[id.String()]++
+                       key := ledgerEntry{LedgerID: id.LedgerID(), EntryID: 
id.EntryID()}
+                       messageIDMap[key]++
                        wg.Done()
                })
        }
        wg.Wait()
        topicName, err := utils.GetTopicName(topic)
        assert.NoError(t, err)
-       for id, i := range messageIDMap {
+       for key, i := range messageIDMap {
                assert.Equal(t, i, int32(batchingMaxMessages))
-               messageID, err := utils.ParseMessageID(id)
-               assert.Nil(t, err)
-               messages, err := 
admin.Subscriptions().GetMessagesByID(*topicName, messageID.LedgerID, 
messageID.EntryID)
+               messages, err := 
admin.Subscriptions().GetMessagesByID(*topicName, key.LedgerID, key.EntryID)
                assert.Nil(t, err)
                assert.Equal(t, batchingMaxMessages, len(messages))
        }

Reply via email to