This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch fix_message_parse_err
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 2d5db1d8f9c2611d7e79bd768e06e5c8aa64d50f
Author: coderzc <[email protected]>
AuthorDate: Mon Nov 3 12:29:21 2025 +0800

    Fix the issue of unable to parse non-batch messages that with non-empty 
properties and empty payloads.
---
 pulsar/internal/commands.go |  9 +++++++++
 pulsar/reader_test.go       | 46 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b9f46234..25b19123 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -75,6 +75,8 @@ type MessageReader struct {
        buffer Buffer
        // true if we are parsing a batched message - set after parsing the 
message metadata
        batched bool
+       // true if the message has properties - set after parsing the message 
metadata
+       hasProperties bool
 }
 
 // ReadChecksum
@@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata() 
(*pb.MessageMetadata, error) {
                r.batched = true
        }
 
+       if meta.Properties != nil && len(meta.Properties) > 0 {
+               r.hasProperties = true
+       }
+
        return &meta, nil
 }
 
@@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata() 
(*pb.BrokerEntryMetadata, error) {
 
 func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, 
error) {
        if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
+               if r.hasProperties {
+                       return nil, []byte{}, nil
+               }
                return nil, nil, ErrEOM
        }
        if !r.batched {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 58c72525..e9d01ee1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) {
        require.Error(t, err)
        require.Nil(t, msg)
 }
+
+func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create reader
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Properties: map[string]string{"key": "value"},
+                       Payload:    []byte{},
+               })
+               assert.NoError(t, err)
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := reader.Next(ctx)
+               assert.NoError(t, err)
+
+               assert.Equal(t, map[string]string{"key": "value"}, 
msg.Properties())
+               assert.Equal(t, []byte{}, msg.Payload())
+       }
+}

Reply via email to