This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch fix_message_parse_err in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 2d5db1d8f9c2611d7e79bd768e06e5c8aa64d50f Author: coderzc <[email protected]> AuthorDate: Mon Nov 3 12:29:21 2025 +0800 Fix the issue of unable to parse non-batch messages that with non-empty properties and empty payloads. --- pulsar/internal/commands.go | 9 +++++++++ pulsar/reader_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index b9f46234..25b19123 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -75,6 +75,8 @@ type MessageReader struct { buffer Buffer // true if we are parsing a batched message - set after parsing the message metadata batched bool + // true if the message has properties - set after parsing the message metadata + hasProperties bool } // ReadChecksum @@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { r.batched = true } + if meta.Properties != nil && len(meta.Properties) > 0 { + r.hasProperties = true + } + return &meta, nil } @@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) { func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { + if r.hasProperties { + return nil, []byte{}, nil + } return nil, nil, ErrEOM } if !r.batched { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 58c72525..e9d01ee1 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) { require.Error(t, err) require.Nil(t, msg) } + +func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Properties: map[string]string{"key": "value"}, + Payload: []byte{}, + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(ctx) + assert.NoError(t, err) + + assert.Equal(t, map[string]string{"key": "value"}, msg.Properties()) + assert.Equal(t, []byte{}, msg.Payload()) + } +}
