This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 34a8f81 Use internal.Buffer() for message metadata parsing (#89)
34a8f81 is described below
commit 34a8f815302351f6bd6e3ffda875574a250581f8
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Nov 11 14:11:11 2019 -0800
Use internal.Buffer() for message metadata parsing (#89)
* Use internal.Buffer() for message metadata parsing
* Fixed test
---
pulsar/consumer_partition.go | 2 +-
pulsar/internal/buffer.go | 8 ++++++++
pulsar/internal/commands.go | 34 ++++++++++++++++++----------------
pulsar/internal/commands_test.go | 12 ++++++------
4 files changed, 33 insertions(+), 23 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ae4889b..aaf6ade 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -199,7 +199,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage,
headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()
- reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
+ reader := internal.NewMessageReader(headersAndPayload)
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
// TODO send discardCorruptedMessage
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index 98d7116..e4abac9 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -84,6 +84,14 @@ func NewBuffer(size int) Buffer {
}
}
+func NewBufferWrapper(buf []byte) Buffer {
+ return &buffer{
+ data: buf,
+ readerIdx: 0,
+ writerIdx: uint32(len(buf)),
+ }
+}
+
func (b *buffer) ReadableBytes() uint32 {
return b.writerIdx - b.readerIdx
}
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7ed8e68..4e691f0 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,8 +18,6 @@
package internal
import (
- "bytes"
- "encoding/binary"
"errors"
"fmt"
@@ -44,12 +42,16 @@ var ErrCorruptedMessage = errors.New("corrupted message")
// ErrEOM is the error returned by ReadMessage when no more input is available.
var ErrEOM = errors.New("EOF")
-func NewMessageReader(headersAndPayload []byte) *MessageReader {
+func NewMessageReader(headersAndPayload Buffer) *MessageReader {
return &MessageReader{
- buffer: bytes.NewBuffer(headersAndPayload),
+ buffer: headersAndPayload,
}
}
+func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader {
+ return NewMessageReader(NewBufferWrapper(headersAndPayload))
+}
+
// MessageReader provides helper methods to parse
// the metadata and messages from the binary format
// Wire format for a messages
@@ -61,7 +63,7 @@ func NewMessageReader(headersAndPayload []byte)
*MessageReader {
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
[METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
//
type MessageReader struct {
- buffer *bytes.Buffer
+ buffer Buffer
// true if we are parsing a batched message - set after parsing the
message metadata
batched bool
}
@@ -69,15 +71,15 @@ type MessageReader struct {
// ReadChecksum
func (r *MessageReader) readChecksum() (uint32, error) {
- if r.buffer.Len() < 6 {
+ if r.buffer.ReadableBytes() < 6 {
return 0, errors.New("missing message header")
}
// reader magic number
- magicNumber := binary.BigEndian.Uint16(r.buffer.Next(2))
+ magicNumber := r.buffer.ReadUint16()
if magicNumber != magicCrc32c {
return 0, ErrCorruptedMessage
}
- checksum := binary.BigEndian.Uint32(r.buffer.Next(4))
+ checksum := r.buffer.ReadUint32()
return checksum, nil
}
@@ -93,13 +95,13 @@ func (r *MessageReader) ReadMessageMetadata()
(*pb.MessageMetadata, error) {
}
// validate checksum
- computedChecksum := Crc32cCheckSum(r.buffer.Bytes())
+ computedChecksum := Crc32cCheckSum(r.buffer.ReadableSlice())
if checksum != computedChecksum {
return nil, fmt.Errorf("checksum mismatch received: 0x%x
computed: 0x%x", checksum, computedChecksum)
}
- size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
- data := r.buffer.Next(size)
+ size := r.buffer.ReadUint32()
+ data := r.buffer.Read(size)
var meta pb.MessageMetadata
if err := proto.Unmarshal(data, &meta); err != nil {
return nil, ErrCorruptedMessage
@@ -113,7 +115,7 @@ func (r *MessageReader) ReadMessageMetadata()
(*pb.MessageMetadata, error) {
}
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte,
error) {
- if r.buffer.Len() == 0 {
+ if r.buffer.ReadableBytes() == 0 {
return nil, nil, ErrEOM
}
if !r.batched {
@@ -127,20 +129,20 @@ func (r *MessageReader) readMessage()
(*pb.SingleMessageMetadata, []byte, error)
// Wire format
// [PAYLOAD]
- return nil, r.buffer.Next(r.buffer.Len()), nil
+ return nil, r.buffer.Read(r.buffer.ReadableBytes()), nil
}
func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata,
[]byte, error) {
// Wire format
// [METADATA_SIZE][METADATA][PAYLOAD]
- size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+ size := r.buffer.ReadUint32()
var meta pb.SingleMessageMetadata
- if err := proto.Unmarshal(r.buffer.Next(size), &meta); err != nil {
+ if err := proto.Unmarshal(r.buffer.Read(size), &meta); err != nil {
return nil, nil, err
}
- return &meta, r.buffer.Next(int(meta.GetPayloadSize())), nil
+ return &meta, r.buffer.Read(uint32(meta.GetPayloadSize())), nil
}
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index dad658e..5c2e19a 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -41,7 +41,7 @@ func TestConvertStringMap(t *testing.T) {
func TestReadMessageMetadata(t *testing.T) {
// read old style message (not batched)
- reader := NewMessageReader(rawCompatSingleMessage)
+ reader := NewMessageReaderFromArray(rawCompatSingleMessage)
meta, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
@@ -55,7 +55,7 @@ func TestReadMessageMetadata(t *testing.T) {
assert.Equal(t, "2", props[1].GetValue())
// read message with batch of 1
- reader = NewMessageReader(rawBatchMessage1)
+ reader = NewMessageReaderFromArray(rawBatchMessage1)
meta, err = reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
@@ -63,7 +63,7 @@ func TestReadMessageMetadata(t *testing.T) {
assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
// read message with batch of 10
- reader = NewMessageReader(rawBatchMessage10)
+ reader = NewMessageReaderFromArray(rawBatchMessage10)
meta, err = reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
@@ -73,7 +73,7 @@ func TestReadMessageMetadata(t *testing.T) {
func TestReadMessageOldFormat(t *testing.T) {
- reader := NewMessageReader(rawCompatSingleMessage)
+ reader := NewMessageReaderFromArray(rawCompatSingleMessage)
_, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
@@ -93,7 +93,7 @@ func TestReadMessageOldFormat(t *testing.T) {
func TestReadMessagesBatchSize1(t *testing.T) {
- reader := NewMessageReader(rawBatchMessage1)
+ reader := NewMessageReaderFromArray(rawBatchMessage1)
meta, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)
@@ -115,7 +115,7 @@ func TestReadMessagesBatchSize1(t *testing.T) {
func TestReadMessagesBatchSize10(t *testing.T) {
- reader := NewMessageReader(rawBatchMessage10)
+ reader := NewMessageReaderFromArray(rawBatchMessage10)
meta, err := reader.ReadMessageMetadata()
if err != nil {
t.Fatal(err)