This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 94184ea  Improve and fix message parsing. (#80)
94184ea is described below

commit 94184eab06c111a27f57aeefdfca7e5cffa9539f
Author: cckellogg <[email protected]>
AuthorDate: Sun Nov 3 07:41:24 2019 -0800

    Improve and fix message parsing. (#80)
---
 pulsar/impl_partition_consumer.go | 111 ++++++++------
 pulsar/internal/commands.go       | 294 +++++++++++++++-----------------------
 pulsar/internal/commands_test.go  | 179 +++++++++++++++++++++--
 3 files changed, 351 insertions(+), 233 deletions(-)

diff --git a/pulsar/impl_partition_consumer.go 
b/pulsar/impl_partition_consumer.go
index 2d8800d..58b3ffe 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -24,12 +24,13 @@ import (
        "sync"
        "time"
 
-       "github.com/apache/pulsar-client-go/pkg/pb"
-       "github.com/apache/pulsar-client-go/pulsar/internal"
-       "github.com/apache/pulsar-client-go/util"
        "github.com/golang/protobuf/proto"
 
        log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/util"
 )
 
 const maxRedeliverUnacknowledged = 1000
@@ -600,59 +601,85 @@ func (pc *partitionConsumer) internalFlow(permits uint32) 
error {
 }
 
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload []byte) error {
-       msgID := response.GetMessageId()
+       pbMsgID := response.GetMessageId()
 
-       id := newMessageID(int64(msgID.GetLedgerId()), 
int64(msgID.GetEntryId()),
-               int(msgID.GetBatchIndex()), pc.partitionIdx)
+       reader := internal.NewMessageReader(headersAndPayload)
 
-       msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload)
+       msgMeta, err := reader.ReadMessageMetadata()
        if err != nil {
-               return fmt.Errorf("parse message error:%s", err)
-       }
-
-       for _, payload := range payloadList {
-               msg := &message{
-                       publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-                       eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-                       key:         msgMeta.GetPartitionKey(),
-                       properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
-                       topic:       pc.topic,
-                       msgID:       id,
-                       payLoad:     payload,
-               }
+               // TODO send discardCorruptedMessage
+               return err
+       }
 
-               consumerMsg := ConsumerMessage{
-                       Message:  msg,
-                       Consumer: pc,
+       numMsgs := 1
+       if msgMeta.NumMessagesInBatch != nil {
+               numMsgs = int(msgMeta.GetNumMessagesInBatch())
+       }
+       for i := 0; i < numMsgs; i++ {
+               ssm, payload, err := reader.ReadMessage()
+               if err != nil {
+                       // TODO send
+                       return err
                }
 
-               select {
-               case pc.subQueue <- consumerMsg:
-                       //Add messageId to redeliverMessages buffer, avoiding 
duplicates.
-                       newMid := response.GetMessageId()
-                       var dup bool
-
-                       pc.omu.Lock()
-                       for _, mid := range pc.redeliverMessages {
-                               if proto.Equal(mid, newMid) {
-                                       dup = true
-                                       break
-                               }
+               msgID := newMessageID(int64(pbMsgID.GetLedgerId()), 
int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+               var msg Message
+               if ssm == nil {
+                       msg = &message{
+                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+                               key:         msgMeta.GetPartitionKey(),
+                               properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
+                               topic:       pc.topic,
+                               msgID:       msgID,
+                               payLoad:     payload,
                        }
-
-                       if !dup {
-                               pc.redeliverMessages = 
append(pc.redeliverMessages, newMid)
+               } else {
+                       msg = &message{
+                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:   
timeFromUnixTimestampMillis(ssm.GetEventTime()),
+                               key:         ssm.GetPartitionKey(),
+                               properties:  
internal.ConvertToStringMap(ssm.GetProperties()),
+                               topic:       pc.topic,
+                               msgID:       msgID,
+                               payLoad:     payload,
                        }
-                       pc.omu.Unlock()
-                       continue
-               default:
-                       return fmt.Errorf("consumer message channel on topic %s 
is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+               }
+
+               if err := pc.dispatchMessage(msg, pbMsgID); err != nil {
+                       // TODO handle error
+                       return err
                }
        }
 
        return nil
 }
 
+
+func (pc *partitionConsumer) dispatchMessage(msg Message, msgID 
*pb.MessageIdData) error {
+       select {
+       case pc.subQueue <- ConsumerMessage{Consumer:pc, Message:msg}:
+               //Add messageId to redeliverMessages buffer, avoiding 
duplicates.
+               var dup bool
+
+               pc.omu.Lock()
+               for _, mid := range pc.redeliverMessages {
+                       if proto.Equal(mid, msgID) {
+                               dup = true
+                               break
+                       }
+               }
+
+               if !dup {
+                       pc.redeliverMessages = append(pc.redeliverMessages, 
msgID)
+               }
+               pc.omu.Unlock()
+       default:
+               return fmt.Errorf("consumer message channel on topic %s is full 
(capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
+       }
+       return nil
+}
+
 type handleAck struct {
        msgID     MessageID
        waitGroup *sync.WaitGroup
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 2800b92..7ed8e68 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -20,12 +20,14 @@ package internal
 import (
        "bytes"
        "encoding/binary"
+       "errors"
        "fmt"
-       "io"
 
-       "github.com/apache/pulsar-client-go/pkg/pb"
        "github.com/golang/protobuf/proto"
+
        log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
 )
 
 const (
@@ -34,6 +36,114 @@ const (
        magicCrc32c  uint16 = 0x0e01
 )
 
+// ErrCorruptedMessage is the error returned by ReadMessageData when it has 
detected corrupted data.
+// The data is considered corrupted if it's missing a header, a checksum 
mismatch or there
+// was an error when unmarshalling the message metadata.
+var ErrCorruptedMessage = errors.New("corrupted message")
+
+// ErrEOM is the error returned by ReadMessage when no more input is available.
+var ErrEOM = errors.New("EOF")
+
+func NewMessageReader(headersAndPayload []byte) *MessageReader {
+       return &MessageReader{
+               buffer: bytes.NewBuffer(headersAndPayload),
+       }
+}
+
+// MessageReader provides helper methods to parse
+// the metadata and messages from the binary format
+// Wire format for a messages
+//
+// Old format (single message)
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+//
+// Batch format
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] 
[METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
+//
+type MessageReader struct {
+       buffer *bytes.Buffer
+       // true if we are parsing a batched message - set after parsing the 
message metadata
+       batched bool
+}
+
+
+// ReadChecksum
+func (r *MessageReader) readChecksum() (uint32, error) {
+       if r.buffer.Len() < 6 {
+               return 0, errors.New("missing message header")
+       }
+       // reader magic number
+       magicNumber := binary.BigEndian.Uint16(r.buffer.Next(2))
+       if magicNumber != magicCrc32c {
+               return 0, ErrCorruptedMessage
+       }
+       checksum := binary.BigEndian.Uint32(r.buffer.Next(4))
+       return checksum, nil
+}
+
+
+func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
+       // Wire format
+       // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
+
+       // read checksum
+       checksum, err := r.readChecksum()
+       if err != nil {
+               return nil, err
+       }
+
+       // validate checksum
+       computedChecksum := Crc32cCheckSum(r.buffer.Bytes())
+       if checksum != computedChecksum {
+               return nil, fmt.Errorf("checksum mismatch received: 0x%x 
computed: 0x%x", checksum, computedChecksum)
+       }
+
+       size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+       data := r.buffer.Next(size)
+       var meta pb.MessageMetadata
+       if err := proto.Unmarshal(data, &meta); err != nil {
+               return nil, ErrCorruptedMessage
+       }
+
+       if meta.NumMessagesInBatch != nil {
+               r.batched = true
+       }
+
+       return &meta, nil
+}
+
+func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, 
error) {
+       if r.buffer.Len() == 0 {
+               return nil, nil, ErrEOM
+       }
+       if !r.batched {
+               return r.readMessage()
+       }
+
+       return r.readSingleMessage()
+}
+
+func (r *MessageReader) readMessage() (*pb.SingleMessageMetadata, []byte, 
error) {
+       // Wire format
+       // [PAYLOAD]
+
+       return nil, r.buffer.Next(r.buffer.Len()), nil
+}
+
+func (r *MessageReader) readSingleMessage() (*pb.SingleMessageMetadata, 
[]byte, error) {
+       // Wire format
+       // [METADATA_SIZE][METADATA][PAYLOAD]
+
+       size := int(binary.BigEndian.Uint32(r.buffer.Next(4)))
+       var meta pb.SingleMessageMetadata
+       if err := proto.Unmarshal(r.buffer.Next(size), &meta); err != nil {
+               return nil, nil, err
+       }
+
+       return &meta, r.buffer.Next(int(meta.GetPayloadSize())), nil
+}
+
+
 func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) 
*pb.BaseCommand {
        cmd := &pb.BaseCommand{
                Type: &cmdType,
@@ -87,142 +197,6 @@ func addSingleMessageToBatch(wb Buffer, smm 
*pb.SingleMessageMetadata, payload [
        wb.Write(payload)
 }
 
-func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, 
payloadList [][]byte, err error) {
-       // reusable buffer for 4-byte uint32s
-       buf32 := make([]byte, 4)
-       r := bytes.NewReader(headersAndPayload)
-       // Wrap our reader so that we can only read
-       // bytes from our frame
-       lr := &io.LimitedReader{
-               N: int64(len(headersAndPayload)),
-               R: r,
-       }
-       // There are 3 possibilities for the following fields:
-       //  - EOF: If so, this is a "simple" command. No more parsing required.
-       //  - 2-byte magic number: Indicates the following 4 bytes are a 
checksum
-       //  - 4-byte metadata size
-
-       // The message may optionally stop here. If so,
-       // this is a "simple" command.
-       if lr.N <= 0 {
-               return nil, nil, nil
-       }
-
-       // Optionally, the next 2 bytes may be the magicNumber. If
-       // so, it indicates that the following 4 bytes are a checksum.
-       // If not, the following 2 bytes (plus the 2 bytes already read),
-       // are the metadataSize, which is why a 4 byte buffer is used.
-       if _, err = io.ReadFull(lr, buf32); err != nil {
-               return nil, nil, err
-       }
-
-       // Check for magicNumber which indicates a checksum
-       var chksum CheckSum
-       var expectedChksum []byte
-
-       magicNumber := make([]byte, 2)
-       binary.BigEndian.PutUint16(magicNumber, magicCrc32c)
-       if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] {
-               expectedChksum = make([]byte, 4)
-
-               // We already read the 2-byte magicNumber and the
-               // initial 2 bytes of the checksum
-               expectedChksum[0] = buf32[2]
-               expectedChksum[1] = buf32[3]
-
-               // Read the remaining 2 bytes of the checksum
-               if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil {
-                       return nil, nil, err
-               }
-
-               // Use a tee reader to compute the checksum
-               // of everything consumed after this point
-               lr.R = io.TeeReader(lr.R, &chksum)
-
-               // Fill buffer with metadata size, which is what it
-               // would already contain if there were no magic number / 
checksum
-               if _, err = io.ReadFull(lr, buf32); err != nil {
-                       return nil, nil, err
-               }
-       }
-
-       // Read metadataSize
-       metadataSize := binary.BigEndian.Uint32(buf32)
-       // guard against allocating large buffer
-       if metadataSize > MaxFrameSize {
-               return nil, nil, fmt.Errorf("frame metadata size (%d) "+
-                       "cannot b greater than max frame size (%d)", 
metadataSize, MaxFrameSize)
-       }
-
-       // Read protobuf encoded metadata
-       metaBuf := make([]byte, metadataSize)
-       if _, err = io.ReadFull(lr, metaBuf); err != nil {
-               return nil, nil, err
-       }
-       msgMeta = new(pb.MessageMetadata)
-       if err = proto.Unmarshal(metaBuf, msgMeta); err != nil {
-               return nil, nil, err
-       }
-
-       numMsg := msgMeta.GetNumMessagesInBatch()
-
-       if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
-               payloads := make([]byte, lr.N)
-               if _, err = io.ReadFull(lr, payloads); err != nil {
-                       return nil, nil, err
-               }
-
-               singleMessages, e := decodeBatchPayload(payloads, numMsg)
-               if e != nil {
-                       return nil, nil, e
-               }
-
-               payloadList = make([][]byte, 0, numMsg)
-               for _, singleMsg := range singleMessages {
-                       msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
-                       msgMeta.Properties = singleMsg.SingleMeta.Properties
-                       msgMeta.EventTime = singleMsg.SingleMeta.EventTime
-                       payloadList = append(payloadList, 
singleMsg.SinglePayload)
-               }
-
-               if err = computeChecksum(chksum, expectedChksum); err != nil {
-                       return nil, nil, err
-               }
-               return msgMeta, payloadList, nil
-       }
-       // Anything left in the frame is considered
-       // the payload and can be any sequence of bytes.
-       payloadList = make([][]byte, 0, 10)
-       if lr.N > 0 {
-               // guard against allocating large buffer
-               if lr.N > MaxFrameSize {
-                       return nil, nil, fmt.Errorf("frame payload size (%d) 
cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
-               }
-
-               payload := make([]byte, lr.N)
-               if _, err = io.ReadFull(lr, payload); err != nil {
-                       return nil, nil, err
-               }
-
-               payloadList = append(payloadList, payload)
-       }
-
-       if err = computeChecksum(chksum, expectedChksum); err != nil {
-               return nil, nil, err
-       }
-
-       return msgMeta, payloadList, nil
-}
-
-func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
-       computed := chksum.compute()
-       if !bytes.Equal(computed, expectedChksum) {
-               return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
-                       "not match given checksum (0x%X)", computed, 
expectedChksum)
-       }
-       return nil
-}
-
 func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata 
*pb.MessageMetadata, payload []byte) {
        // Wire format
        // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] 
[METADATA_SIZE][METADATA] [PAYLOAD]
@@ -270,50 +244,6 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, 
msgMetadata *pb.MessageM
        wb.PutUint32(checksum, checksumIdx)
 }
 
-// singleMessage represents one of the elements of the batch type payload
-type singleMessage struct {
-       SingleMetaSize uint32
-       SingleMeta     *pb.SingleMessageMetadata
-       SinglePayload  []byte
-}
-
-// decodeBatchPayload parses the payload of the batch type
-// If the producer uses the batch function, msg.Payload will be a 
singleMessage array structure.
-func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
-       buf32 := make([]byte, 4)
-       rdBuf := bytes.NewReader(bp)
-       singleMsgList := make([]*singleMessage, 0, batchNum)
-       for i := int32(0); i < batchNum; i++ {
-               // singleMetaSize
-               if _, err := io.ReadFull(rdBuf, buf32); err != nil {
-                       return nil, err
-               }
-               singleMetaSize := binary.BigEndian.Uint32(buf32)
-
-               // singleMeta
-               singleMetaBuf := make([]byte, singleMetaSize)
-               if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
-                       return nil, err
-               }
-               singleMeta := new(pb.SingleMessageMetadata)
-               if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != 
nil {
-                       return nil, err
-               }
-               // payload
-               singlePayload := make([]byte, singleMeta.GetPayloadSize())
-               if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
-                       return nil, err
-               }
-               singleMsg := &singleMessage{
-                       SingleMetaSize: singleMetaSize,
-                       SingleMeta:     singleMeta,
-                       SinglePayload:  singlePayload,
-               }
-
-               singleMsgList = append(singleMsgList, singleMsg)
-       }
-       return singleMsgList, nil
-}
 
 // ConvertFromStringMap convert a string map to a KeyValue []byte
 func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 0102956..dad658e 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -38,19 +38,180 @@ func TestConvertStringMap(t *testing.T) {
        assert.Equal(t, "2", m2["b"])
 }
 
-func TestDecodeBatchPayload(t *testing.T) {
-       // singleMsg = singleMetaSize(4  bytes) + singleMeta(var length) + 
payload
-       singleMsg := []byte{0, 0, 0, 2, 24, 12, 104, 101, 108, 108, 111, 45, 
112, 117, 108, 115, 97, 114}
-       list, err := decodeBatchPayload(singleMsg, 1)
+
+func TestReadMessageMetadata(t *testing.T) {
+       // read old style message (not batched)
+       reader := NewMessageReader(rawCompatSingleMessage)
+       meta, err := reader.ReadMessageMetadata()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       props := meta.GetProperties()
+       assert.Equal(t, len(props), 2)
+       assert.Equal(t, "a", props[0].GetKey())
+       assert.Equal(t, "1", props[0].GetValue())
+       assert.Equal(t, "b", props[1].GetKey())
+       assert.Equal(t, "2", props[1].GetValue())
+
+       // read message with batch of 1
+       reader = NewMessageReader(rawBatchMessage1)
+       meta, err = reader.ReadMessageMetadata()
        if err != nil {
                t.Fatal(err)
        }
-       if get, want := len(list), 1; get != want {
-               t.Errorf("want %v, but get %v", get, want)
+       assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
+
+       // read message with batch of 10
+       reader = NewMessageReader(rawBatchMessage10)
+       meta, err = reader.ReadMessageMetadata()
+       if err != nil {
+               t.Fatal(err)
+       }
+       assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
+}
+
+
+func TestReadMessageOldFormat(t *testing.T) {
+       reader := NewMessageReader(rawCompatSingleMessage)
+       _, err := reader.ReadMessageMetadata()
+       if err != nil {
+               t.Fatal(err)
        }
 
-       m := list[0]
-       if get, want := string(m.SinglePayload), "hello-pulsar"; get != want {
-               t.Errorf("want %v, but get %v", get, want)
+       ssm, payload, err := reader.ReadMessage()
+       if err != nil {
+               t.Fatal(err)
        }
+       // old message format does not have a single message metadata
+       assert.Equal(t, true, ssm == nil)
+       assert.Equal(t, "hello", string(payload))
+
+       _ , _, err = reader.ReadMessage()
+       assert.Equal(t, ErrEOM, err)
+}
+
+
+func TestReadMessagesBatchSize1(t *testing.T) {
+       reader := NewMessageReader(rawBatchMessage1)
+       meta, err := reader.ReadMessageMetadata()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       assert.Equal(t, 1, int(meta.GetNumMessagesInBatch()))
+       for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
+               ssm, payload, err := reader.ReadMessage()
+               if err != nil {
+                       t.Fatal(err)
+               }
+               assert.Equal(t, true, ssm != nil)
+               assert.Equal(t, "hello", string(payload))
+       }
+
+       _ , _, err = reader.ReadMessage()
+       assert.Equal(t, ErrEOM, err)
+}
+
+
+func TestReadMessagesBatchSize10(t *testing.T) {
+       reader := NewMessageReader(rawBatchMessage10)
+       meta, err := reader.ReadMessageMetadata()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
+       for i := 0; i < int(meta.GetNumMessagesInBatch()); i++ {
+               ssm, payload, err := reader.ReadMessage()
+               if err != nil {
+                       t.Fatal(err)
+               }
+               assert.Equal(t, true, ssm != nil)
+               assert.Equal(t, "hello", string(payload))
+       }
+
+       _ , _, err = reader.ReadMessage()
+       assert.Equal(t, ErrEOM, err)
+}
+
+
+// Raw single message in old format
+// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
+// payload = "hello"
+var rawCompatSingleMessage = []byte{
+       0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00,
+       0x00, 0x31, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
+       0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
+       0x34, 0x2d, 0x30, 0x10, 0x00, 0x18, 0xac, 0xef,
+       0xe8, 0xa0, 0xe2, 0x2d, 0x22, 0x06, 0x0a, 0x01,
+       0x61, 0x12, 0x01, 0x31, 0x22, 0x06, 0x0a, 0x01,
+       0x62, 0x12, 0x01, 0x32, 0x48, 0x05, 0x60, 0x05,
+       0x82, 0x01, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+}
+
+// Message with batch of 1
+// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" 
value:"2" >
+// payload = "hello"
+var rawBatchMessage1 = []byte{
+       0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00,
+       0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e,
+       0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x37,
+       0x34, 0x2d, 0x31, 0x10, 0x00, 0x18, 0xdb, 0x80,
+       0xf4, 0xa0, 0xe2, 0x2d, 0x58, 0x01, 0x82, 0x01,
+       0x00, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
+       0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
+       0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
+       0x05, 0x40, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+}
+
+// Message with batch of 10
+// singe message metadata properties:<key:"a" value:"1" > properties:<key:"b" 
value:"2" >
+// payload = "hello"
+var rawBatchMessage10 = []byte{
+       0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08,
+       0x00, 0x00, 0x00, 0x1f, 0x0a, 0x0f, 0x73, 0x74,
+       0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65,
+       0x2d, 0x37, 0x34, 0x2d, 0x32, 0x10, 0x00, 0x18,
+       0xd0, 0xc2, 0xfa, 0xa0, 0xe2, 0x2d, 0x58, 0x0a,
+       0x82, 0x01, 0x00, 0x00, 0x00, 0x00, 0x16, 0x0a,
+       0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
+       0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
+       0x05, 0x28, 0x05, 0x40, 0x00, 0x68, 0x65, 0x6c,
+       0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
+       0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
+       0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
+       0x28, 0x05, 0x40, 0x01, 0x68, 0x65, 0x6c, 0x6c,
+       0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a,
+       0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a,
+       0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28,
+       0x05, 0x40, 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
+       0x00, 0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01,
+       0x61, 0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01,
+       0x62, 0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05,
+       0x40, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00,
+       0x00, 0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61,
+       0x12, 0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62,
+       0x12, 0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40,
+       0x04, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00,
+       0x00, 0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12,
+       0x01, 0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12,
+       0x01, 0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x05,
+       0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00,
+       0x16, 0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01,
+       0x31, 0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01,
+       0x32, 0x18, 0x05, 0x28, 0x05, 0x40, 0x06, 0x68,
+       0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16,
+       0x0a, 0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31,
+       0x0a, 0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32,
+       0x18, 0x05, 0x28, 0x05, 0x40, 0x07, 0x68, 0x65,
+       0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a,
+       0x06, 0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a,
+       0x06, 0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18,
+       0x05, 0x28, 0x05, 0x40, 0x08, 0x68, 0x65, 0x6c,
+       0x6c, 0x6f, 0x00, 0x00, 0x00, 0x16, 0x0a, 0x06,
+       0x0a, 0x01, 0x61, 0x12, 0x01, 0x31, 0x0a, 0x06,
+       0x0a, 0x01, 0x62, 0x12, 0x01, 0x32, 0x18, 0x05,
+       0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
+       0x6f,
 }

Reply via email to