This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a8f81  Use internal.Buffer() for message metadata parsing (#89)
34a8f81 is described below

commit 34a8f815302351f6bd6e3ffda875574a250581f8
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Nov 11 14:11:11 2019 -0800

    Use internal.Buffer() for message metadata parsing (#89)
    
    * Use internal.Buffer() for message metadata parsing
    
    * Fixed test
---
 pulsar/consumer_partition.go     |  2 +-
 pulsar/internal/buffer.go        |  8 ++++++++
 pulsar/internal/commands.go      | 34 ++++++++++++++++++----------------
 pulsar/internal/commands_test.go | 12 ++++++------
 4 files changed, 33 insertions(+), 23 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ae4889b..aaf6ade 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -199,7 +199,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
        pbMsgID := response.GetMessageId()
 
-       reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
+       reader := internal.NewMessageReader(headersAndPayload)
        msgMeta, err := reader.ReadMessageMetadata()
        if err != nil {
                // TODO send discardCorruptedMessage
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index 98d7116..e4abac9 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -84,6 +84,14 @@ func NewBuffer(size int) Buffer {
        }
 }
 
+func NewBufferWrapper(buf []byte) Buffer {
+       return &buffer{
+               data:      buf,
+               readerIdx: 0,
+               writerIdx: uint32(len(buf)),
+       }
+}
+
 func (b *buffer) ReadableBytes() uint32 {
        return b.writerIdx - b.readerIdx
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7ed8e68..4e691f0 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,8 +18,6 @@
 package internal
 
 import (
-       "bytes"
-       "encoding/binary"
        "errors"
        "fmt"
 
@@ -44,12 +42,16 @@ var ErrCorruptedMessage = errors.New("corrupted message")
 // ErrEOM is the error returned by ReadMessage when no more input is available.
 var ErrEOM = errors.New("EOF")
 
-func NewMessageReader(headersAndPayload []byte) *MessageReader {
+func NewMessageReader(headersAndPayload Buffer) *MessageReader {
        return &MessageReader{
-               buffer: bytes.NewBuffer(headersAndPayload),
+               buffer: headersAndPayload,
        }
 }
 
+func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader {
+       return NewMessageReader(NewBufferWrapper(headersAndPayload))
+}
+
 // MessageReader provides helper methods to parse
 // the metadata and messages from the binary format
 // Wire format for a messages
@@ -61,7 +63,7 @@ func NewMessageReader(headersAndPayload []byte) 
*MessageReader {
 // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] 
[METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
 //
 type MessageReader struct {
-       buffer *bytes.Buffer
+       buffer Buffer
        // true if we are parsing a batched message - set after parsing the 
message metadata
        batched bool
 }
@@ -69,15 +71,15 @@ type MessageReader struct {
 
 // ReadChecksum
 func (r *MessageReader) readChecksum() (uint32, error) {
-       if r.buffer.Len() < 6 {
+       if r.buffer.ReadableBytes() < 6 {
                return 0, errors.New("missing message header")
        }
        // reader magic number
-       magicNumber := binary.BigEndian.Uint16(r.buffer.Next(2))
+       magicNumber := r.buffer.ReadUint16()
        if magicNumber != magicCrc32c {
                return 0, ErrCorruptedMessage
        }
-       checksum := binary.BigEndian.Uint32(r.buffer.Next(4))
+       checksum := r.buffer.ReadUint32()
        return checksum, nil
 }
 
@@ -93,13 +95,13 @@ func (r *MessageReader) ReadMessageMetadata() 
(*pb.MessageMetadata, error) {
        }
 
        // validate checksum
-       computedChecksum := Crc32cCheckSum(r.buffer.Bytes())
+       computedChecksum := Crc32cCheckSum(r.buffer.ReadableSlice())
        if checksum != computedChecksum {
                return nil, fmt.Errorf("checksum mismatch received: 0x%x 
computed: 0x%x", checksum, computedChecksum)
        }
 
-       size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
-       data := r.buffer.Next(size)
+       size := r.buffer.ReadUint32()
+       data := r.buffer.Read(size)
        var meta pb.MessageMetadata
        if err := proto.Unmarshal(data, &meta); err != nil {
                return nil, ErrCorruptedMessage
@@ -113,7 +115,7 @@ func (r *MessageReader) ReadMessageMetadata() 
(*pb.MessageMetadata, error) {
 }
 
 func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, 
error) {
-       if r.buffer.Len() == 0 {
+       if r.buffer.ReadableBytes() == 0 {
                return nil, nil, ErrEOM
        }
        if !r.batched {
@@ -127,20 +129,20 @@ func (r *MessageReader) readMessage() 
(*pb.SingleMessageMetadata, []byte, error)
        // Wire format
        // [PAYLOAD]
 
-       return nil, r.buffer.Next(r.buffer.Len()), nil
+       return nil, r.buffer.Read(r.buffer.ReadableBytes()), nil
 }
 
 func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, 
[]byte, error) {
        // Wire format
        // [METADATA_SIZE][METADATA][PAYLOAD]
 
-       size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+       size := r.buffer.ReadUint32()
        var meta pb.SingleMessageMetadata
-       if err := proto.Unmarshal(r.buffer.Next(size), &meta); err != nil {
+       if err := proto.Unmarshal(r.buffer.Read(size), &meta); err != nil {
                return nil, nil, err
        }
 
-       return &meta, r.buffer.Next(int(meta.GetPayloadSize())), nil
+       return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil
 }
 
 
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index dad658e..5c2e19a 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -41,7 +41,7 @@ func TestConvertStringMap(t *testing.T) {
 
 func TestReadMessageMetadata(t *testing.T) {
        // read old style message (not batched)
-       reader := NewMessageReader(rawCompatSingleMessage)
+       reader := NewMessageReaderFromArray(rawCompatSingleMessage)
        meta, err := reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)
@@ -55,7 +55,7 @@ func TestReadMessageMetadata(t *testing.T) {
        assert.Equal(t, "2", props[1].GetValue())
 
        // read message with batch of 1
-       reader = NewMessageReader(rawBatchMessage1)
+       reader = NewMessageReaderFromArray(rawBatchMessage1)
        meta, err = reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)
@@ -63,7 +63,7 @@ func TestReadMessageMetadata(t *testing.T) {
        assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
 
        // read message with batch of 10
-       reader = NewMessageReader(rawBatchMessage10)
+       reader = NewMessageReaderFromArray(rawBatchMessage10)
        meta, err = reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)
@@ -73,7 +73,7 @@ func TestReadMessageMetadata(t *testing.T) {
 
 
 func TestReadMessageOldFormat(t *testing.T) {
-       reader := NewMessageReader(rawCompatSingleMessage)
+       reader := NewMessageReaderFromArray(rawCompatSingleMessage)
        _, err := reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)
@@ -93,7 +93,7 @@ func TestReadMessageOldFormat(t *testing.T) {
 
 
 func TestReadMessagesBatchSize1(t *testing.T) {
-       reader := NewMessageReader(rawBatchMessage1)
+       reader := NewMessageReaderFromArray(rawBatchMessage1)
        meta, err := reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)
@@ -115,7 +115,7 @@ func TestReadMessagesBatchSize1(t *testing.T) {
 
 
 func TestReadMessagesBatchSize10(t *testing.T) {
-       reader := NewMessageReader(rawBatchMessage10)
+       reader := NewMessageReaderFromArray(rawBatchMessage10)
        meta, err := reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)

Reply via email to