This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 793008e7 refactor: prepare sendrequest and move to internalSendAsync 
(#1120)
793008e7 is described below

commit 793008e7f3b233bfa64e8c22b7cabe10aa334bdb
Author: tison <[email protected]>
AuthorDate: Wed Oct 25 15:39:23 2023 +0800

    refactor: prepare sendrequest and move to internalSendAsync (#1120)
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: gunli <[email protected]>
---
 pulsar/message_chunking_test.go |  31 ++-
 pulsar/producer_partition.go    | 457 +++++++++++++++++++++-------------------
 2 files changed, 259 insertions(+), 229 deletions(-)

diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index ee3ab177..fbdcaa0c 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -552,26 +552,41 @@ func sendSingleChunk(p Producer, uuid string, chunkID 
int, totalChunks int) {
        msg := &ProducerMessage{
                Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
        }
+       wholePayload := msg.Payload
        producerImpl := p.(*producer).producers[0].(*partitionProducer)
-       mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now())
+       mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now())
        mm.Uuid = proto.String(uuid)
        mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
-       mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload)))
+       mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload)))
        mm.ChunkId = proto.Int32(int32(chunkID))
        producerImpl.updateMetadataSeqID(mm, msg)
-
-       doneCh := make(chan struct{})
        producerImpl.internalSingleSend(
                mm,
                msg.Payload,
                &sendRequest{
                        callback: func(id MessageID, producerMessage 
*ProducerMessage, err error) {
-                               close(doneCh)
                        },
-                       msg: msg,
+                       ctx:                 context.Background(),
+                       msg:                 msg,
+                       flushImmediately:    true,
+                       totalChunks:         totalChunks,
+                       chunkID:             chunkID,
+                       uuid:                uuid,
+                       chunkRecorder:       newChunkRecorder(),
+                       transaction:         nil,
+                       reservedMem:         0,
+                       sendAsBatch:         false,
+                       schema:              nil,
+                       schemaVersion:       nil,
+                       uncompressedPayload: wholePayload,
+                       uncompressedSize:    int64(len(wholePayload)),
+                       compressedPayload:   wholePayload,
+                       compressedSize:      len(wholePayload),
+                       payloadChunkSize:    internal.MaxMessageSize - 
proto.Size(mm),
+                       mm:                  mm,
+                       deliverAt:           time.Now(),
+                       maxMessageSize:      internal.MaxMessageSize,
                },
                uint32(internal.MaxMessageSize),
        )
-
-       <-doneCh
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ac07341d..b00ed6b5 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -478,201 +478,31 @@ func runCallback(cb func(MessageID, *ProducerMessage, 
error), id MessageID, msg
        cb(id, msg, err)
 }
 
-func (p *partitionProducer) internalSend(request *sendRequest) {
-       p.log.Debug("Received send request: ", *request.msg)
+func (p *partitionProducer) internalSend(sr *sendRequest) {
+       p.log.Debug("Received send request: ", *sr.msg)
 
-       msg := request.msg
-
-       // read payload from message
-       uncompressedPayload := msg.Payload
+       msg := sr.msg
 
-       var schemaPayload []byte
-       var err error
-
-       // The block chan must be closed when returned with exception
-       defer request.stopBlock()
-       if !p.canAddToQueue(request) {
+       if !p.canAddToQueue(sr) {
                return
        }
 
-       var schema Schema
-       var schemaVersion []byte
-       if msg.Schema != nil {
-               schema = msg.Schema
-       } else if p.options.Schema != nil {
-               schema = p.options.Schema
-       }
-       if msg.Value != nil {
-               // payload and schema are mutually exclusive
-               // try to get payload from schema value only if payload is not 
set
-               if uncompressedPayload == nil && schema != nil {
-                       schemaPayload, err = schema.Encode(msg.Value)
-                       if err != nil {
-                               runCallback(request.callback, nil, request.msg, 
newError(SchemaFailure, err.Error()))
-                               p.log.WithError(err).Errorf("Schema encode 
message failed %s", msg.Value)
-                               return
-                       }
-               }
-       }
-       if uncompressedPayload == nil {
-               uncompressedPayload = schemaPayload
-       }
-
-       if schema != nil {
-               schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
-               if schemaVersion == nil {
-                       schemaVersion, err = 
p.getOrCreateSchema(schema.GetSchemaInfo())
-                       if err != nil {
-                               p.log.WithError(err).Error("get schema version 
fail")
-                               runCallback(request.callback, nil, request.msg, 
fmt.Errorf("get schema version fail, err: %w", err))
-                               return
-                       }
-                       p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
-               }
-       }
-
-       uncompressedSize := len(uncompressedPayload)
-
        // try to reserve memory for uncompressedPayload
-       if !p.canReserveMem(request, int64(uncompressedSize)) {
+       if !p.canReserveMem(sr, sr.uncompressedSize) {
                return
        }
 
-       deliverAt := msg.DeliverAt
-       if msg.DeliverAfter.Nanoseconds() > 0 {
-               deliverAt = time.Now().Add(msg.DeliverAfter)
-       }
-
-       // set default ReplicationClusters when DisableReplication
-       if msg.DisableReplication {
-               msg.ReplicationClusters = []string{"__local__"}
-       }
-
-       mm := p.genMetadata(msg, uncompressedSize, deliverAt)
-
-       sendAsBatch := !p.options.DisableBatching &&
-               msg.ReplicationClusters == nil &&
-               deliverAt.UnixNano() < 0
-
-       // Once the batching is enabled, it can close blockCh early to make 
block finish
-       if sendAsBatch {
-               request.stopBlock()
-       } else {
-               // update sequence id for metadata, make the size of 
msgMetadata more accurate
-               // batch sending will update sequence ID in the BatchBuilder
-               p.updateMetadataSeqID(mm, msg)
-       }
-
-       maxMessageSize := int(p._getConn().GetMaxMessageSize())
-
-       // compress payload if not batching
-       var compressedPayload []byte
-       var compressedSize int
-       var checkSize int
-       if !sendAsBatch {
-               compressedPayload = p.compressionProvider.Compress(nil, 
uncompressedPayload)
-               compressedSize = len(compressedPayload)
-               checkSize = compressedSize
-
-               // set the compress type in msgMetaData
-               compressionType := pb.CompressionType(p.options.CompressionType)
-               if compressionType != pb.CompressionType_NONE {
-                       mm.Compression = &compressionType
-               }
-       } else {
-               // final check for batching message is in serializeMessage
-               // this is a double check
-               checkSize = uncompressedSize
-       }
-
-       // if msg is too large and chunking is disabled
-       if checkSize > maxMessageSize && !p.options.EnableChunking {
-               p.releaseSemaphoreAndMem(int64(uncompressedSize))
-               runCallback(request.callback, nil, request.msg, 
errMessageTooLarge)
-               p.log.WithError(errMessageTooLarge).
-                       WithField("size", checkSize).
-                       WithField("properties", msg.Properties).
-                       Errorf("MaxMessageSize %d", maxMessageSize)
+       if err := p.updateChunkInfo(sr); err != nil {
+               p.releaseSemaphoreAndMem(sr.uncompressedSize)
+               runCallback(sr.callback, nil, sr.msg, err)
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }
 
-       var totalChunks int
-       // max chunk payload size
-       var payloadChunkSize int
-       if sendAsBatch || !p.options.EnableChunking {
-               totalChunks = 1
-               payloadChunkSize = int(p._getConn().GetMaxMessageSize())
-       } else {
-               payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
proto.Size(mm)
-               if payloadChunkSize <= 0 {
-                       p.releaseSemaphoreAndMem(int64(uncompressedSize))
-                       runCallback(request.callback, nil, msg, errMetaTooLarge)
-                       p.log.WithError(errMetaTooLarge).
-                               WithField("metadata size", proto.Size(mm)).
-                               WithField("properties", msg.Properties).
-                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
-                       p.metrics.PublishErrorsMsgTooLarge.Inc()
-                       return
-               }
-               // set ChunkMaxMessageSize
-               if p.options.ChunkMaxMessageSize != 0 {
-                       payloadChunkSize = 
int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
-               }
-               totalChunks = int(math.Max(1, 
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
-       }
-
-       // set total chunks to send request
-       request.totalChunks = totalChunks
-
-       if !sendAsBatch {
-               if totalChunks > 1 {
-                       var lhs, rhs int
-                       uuid := fmt.Sprintf("%s-%s", p.producerName, 
strconv.FormatUint(*mm.SequenceId, 10))
-                       mm.Uuid = proto.String(uuid)
-                       mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
-                       mm.TotalChunkMsgSize = 
proto.Int32(int32(compressedSize))
-                       cr := newChunkRecorder()
-                       for chunkID := 0; chunkID < totalChunks; chunkID++ {
-                               lhs = chunkID * payloadChunkSize
-                               if rhs = lhs + payloadChunkSize; rhs > 
compressedSize {
-                                       rhs = compressedSize
-                               }
-                               // update chunk id
-                               mm.ChunkId = proto.Int32(int32(chunkID))
-                               nsr := &sendRequest{
-                                       ctx:              request.ctx,
-                                       msg:              request.msg,
-                                       callback:         request.callback,
-                                       callbackOnce:     request.callbackOnce,
-                                       publishTime:      request.publishTime,
-                                       blockCh:          request.blockCh,
-                                       closeBlockChOnce: 
request.closeBlockChOnce,
-                                       totalChunks:      totalChunks,
-                                       chunkID:          chunkID,
-                                       uuid:             uuid,
-                                       chunkRecorder:    cr,
-                                       transaction:      request.transaction,
-                                       reservedMem:      int64(rhs - lhs),
-                               }
-                               // the permit of first chunk has acquired
-                               if chunkID != 0 && !p.canAddToQueue(nsr) {
-                                       
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
-                                       return
-                               }
-                               p.internalSingleSend(mm, 
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
-                       }
-                       // close the blockCh when all the chunks acquired 
permits
-                       request.stopBlock()
-               } else {
-                       // close the blockCh when totalChunks is 1 (it has 
acquired permits)
-                       request.stopBlock()
-                       p.internalSingleSend(mm, compressedPayload, request, 
uint32(maxMessageSize))
-               }
-       } else {
-               smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
+       if sr.sendAsBatch {
+               smm := p.genSingleMessageMetadataInBatch(msg, 
int(sr.uncompressedSize))
                multiSchemaEnabled := !p.options.DisableMultiSchema
-               added := addRequestToBatch(smm, p, uncompressedPayload, 
request, msg, deliverAt, schemaVersion,
+               added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr, 
msg, sr.deliverAt, sr.schemaVersion,
                        multiSchemaEnabled)
                if !added {
                        // The current batch is full. flush it and retry
@@ -680,21 +510,71 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                        p.internalFlushCurrentBatch()
 
                        // after flushing try again to add the current payload
-                       if ok := addRequestToBatch(smm, p, uncompressedPayload, 
request, msg, deliverAt, schemaVersion,
+                       if ok := addRequestToBatch(smm, p, 
sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
                                multiSchemaEnabled); !ok {
-                               
p.releaseSemaphoreAndMem(int64(uncompressedSize))
-                               runCallback(request.callback, nil, request.msg, 
errFailAddToBatch)
-                               p.log.WithField("size", uncompressedSize).
+                               p.releaseSemaphoreAndMem(sr.uncompressedSize)
+                               runCallback(sr.callback, nil, sr.msg, 
errFailAddToBatch)
+                               p.log.WithField("size", sr.uncompressedSize).
                                        WithField("properties", msg.Properties).
                                        Error("unable to add message to batch")
                                return
                        }
                }
-               if request.flushImmediately {
-
+               if sr.flushImmediately {
                        p.internalFlushCurrentBatch()
+               }
+               return
+       }
+
+       if sr.totalChunks <= 1 {
+               p.internalSingleSend(sr.mm, sr.compressedPayload, sr, 
uint32(sr.maxMessageSize))
+               return
+       }
 
+       var lhs, rhs int
+       uuid := fmt.Sprintf("%s-%s", p.producerName, 
strconv.FormatUint(*sr.mm.SequenceId, 10))
+       sr.mm.Uuid = proto.String(uuid)
+       sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks))
+       sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize))
+       cr := newChunkRecorder()
+       for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
+               lhs = chunkID * sr.payloadChunkSize
+               if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
+                       rhs = sr.compressedSize
                }
+               // update chunk id
+               sr.mm.ChunkId = proto.Int32(int32(chunkID))
+               nsr := &sendRequest{
+                       ctx:                 sr.ctx,
+                       msg:                 sr.msg,
+                       callback:            sr.callback,
+                       callbackOnce:        sr.callbackOnce,
+                       publishTime:         sr.publishTime,
+                       flushImmediately:    sr.flushImmediately,
+                       totalChunks:         sr.totalChunks,
+                       chunkID:             chunkID,
+                       uuid:                uuid,
+                       chunkRecorder:       cr,
+                       transaction:         sr.transaction,
+                       reservedMem:         int64(rhs - lhs),
+                       sendAsBatch:         sr.sendAsBatch,
+                       schema:              sr.schema,
+                       schemaVersion:       sr.schemaVersion,
+                       uncompressedPayload: sr.uncompressedPayload,
+                       uncompressedSize:    sr.uncompressedSize,
+                       compressedPayload:   sr.compressedPayload,
+                       compressedSize:      sr.compressedSize,
+                       payloadChunkSize:    sr.payloadChunkSize,
+                       mm:                  sr.mm,
+                       deliverAt:           sr.deliverAt,
+                       maxMessageSize:      sr.maxMessageSize,
+               }
+               // the permit of first chunk has acquired
+               if chunkID != 0 && !p.canAddToQueue(nsr) {
+                       p.releaseSemaphoreAndMem(sr.uncompressedSize - 
int64(lhs))
+                       return
+               }
+               p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr, 
uint32(sr.maxMessageSize))
        }
 }
 
@@ -766,8 +646,10 @@ func (p *partitionProducer) 
updateSingleMessageMetadataSeqID(smm *pb.SingleMessa
        }
 }
 
-func (p *partitionProducer) genSingleMessageMetadataInBatch(msg 
*ProducerMessage,
-       uncompressedSize int) (smm *pb.SingleMessageMetadata) {
+func (p *partitionProducer) genSingleMessageMetadataInBatch(
+       msg *ProducerMessage,
+       uncompressedSize int,
+) (smm *pb.SingleMessageMetadata) {
        smm = &pb.SingleMessageMetadata{
                PayloadSize: proto.Int32(int32(uncompressedSize)),
        }
@@ -1163,6 +1045,134 @@ func (p *partitionProducer) prepareTransaction(sr 
*sendRequest) error {
        return nil
 }
 
+func (p *partitionProducer) updateSchema(sr *sendRequest) error {
+       var schema Schema
+       var schemaVersion []byte
+       var err error
+
+       if sr.msg.Schema != nil {
+               schema = sr.msg.Schema
+       } else if p.options.Schema != nil {
+               schema = p.options.Schema
+       }
+
+       if schema == nil {
+               return nil
+       }
+
+       schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+       if schemaVersion == nil {
+               schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+               if err != nil {
+                       return fmt.Errorf("get schema version fail, err: %w", 
err)
+               }
+               p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+       }
+
+       sr.schema = schema
+       sr.schemaVersion = schemaVersion
+       return nil
+}
+
+func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
+       // read payload from message
+       sr.uncompressedPayload = sr.msg.Payload
+
+       if sr.msg.Value != nil {
+               if sr.schema == nil {
+                       p.log.Errorf("Schema encode message failed %s", 
sr.msg.Value)
+                       return newError(SchemaFailure, "set schema value 
without setting schema")
+               }
+
+               // payload and schema are mutually exclusive
+               // try to get payload from schema value only if payload is not 
set
+               schemaPayload, err := sr.schema.Encode(sr.msg.Value)
+               if err != nil {
+                       p.log.WithError(err).Errorf("Schema encode message 
failed %s", sr.msg.Value)
+                       return newError(SchemaFailure, err.Error())
+               }
+
+               sr.uncompressedPayload = schemaPayload
+       }
+
+       sr.uncompressedSize = int64(len(sr.uncompressedPayload))
+       return nil
+}
+
+func (p *partitionProducer) updateMetaData(sr *sendRequest) {
+       deliverAt := sr.msg.DeliverAt
+       if sr.msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(sr.msg.DeliverAfter)
+       }
+
+       // set default ReplicationClusters when DisableReplication
+       if sr.msg.DisableReplication {
+               sr.msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt)
+
+       sr.sendAsBatch = !p.options.DisableBatching &&
+               sr.msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       if !sr.sendAsBatch {
+               // update sequence id for metadata, make the size of 
msgMetadata more accurate
+               // batch sending will update sequence ID in the BatchBuilder
+               p.updateMetadataSeqID(sr.mm, sr.msg)
+       }
+
+       sr.deliverAt = deliverAt
+}
+
+func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
+       checkSize := sr.uncompressedSize
+       if !sr.sendAsBatch {
+               sr.compressedPayload = p.compressionProvider.Compress(nil, 
sr.uncompressedPayload)
+               sr.compressedSize = len(sr.compressedPayload)
+
+               // set the compress type in msgMetaData
+               compressionType := pb.CompressionType(p.options.CompressionType)
+               if compressionType != pb.CompressionType_NONE {
+                       sr.mm.Compression = &compressionType
+               }
+
+               checkSize = int64(sr.compressedSize)
+       }
+
+       sr.maxMessageSize = p._getConn().GetMaxMessageSize()
+
+       // if msg is too large and chunking is disabled
+       if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
+               p.log.WithError(errMessageTooLarge).
+                       WithField("size", checkSize).
+                       WithField("properties", sr.msg.Properties).
+                       Errorf("MaxMessageSize %d", sr.maxMessageSize)
+               return errMessageTooLarge
+       }
+
+       if sr.sendAsBatch || !p.options.EnableChunking {
+               sr.totalChunks = 1
+               sr.payloadChunkSize = int(sr.maxMessageSize)
+       } else {
+               sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
+               if sr.payloadChunkSize <= 0 {
+                       p.log.WithError(errMetaTooLarge).
+                               WithField("metadata size", proto.Size(sr.mm)).
+                               WithField("properties", sr.msg.Properties).
+                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       return errMetaTooLarge
+               }
+               // set ChunkMaxMessageSize
+               if p.options.ChunkMaxMessageSize != 0 {
+                       sr.payloadChunkSize = 
int(math.Min(float64(sr.payloadChunkSize), 
float64(p.options.ChunkMaxMessageSize)))
+               }
+               sr.totalChunks = int(math.Max(1, 
math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
+       }
+
+       return nil
+}
+
 func (p *partitionProducer) internalSendAsync(
        ctx context.Context,
        msg *ProducerMessage,
@@ -1175,19 +1185,13 @@ func (p *partitionProducer) internalSendAsync(
                return
        }
 
-       // bc only works when DisableBlockIfQueueFull is false
-       bc := make(chan struct{})
-       // callbackOnce make sure the callback is only invoked once in chunking
-       callbackOnce := &sync.Once{}
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
                callback:         callback,
-               callbackOnce:     callbackOnce,
+               callbackOnce:     &sync.Once{},
                flushImmediately: flushImmediately,
                publishTime:      time.Now(),
-               blockCh:          bc,
-               closeBlockChOnce: &sync.Once{},
        }
        if err := p.prepareTransaction(sr); err != nil {
                runCallback(sr.callback, nil, msg, err)
@@ -1202,12 +1206,21 @@ func (p *partitionProducer) internalSendAsync(
 
        p.options.Interceptors.BeforeSend(p, msg)
 
-       p.dataChan <- sr
+       if err := p.updateSchema(sr); err != nil {
+               p.log.Error(err)
+               runCallback(sr.callback, nil, msg, err)
+               return
+       }
 
-       if !p.options.DisableBlockIfQueueFull {
-               // block if queue full
-               <-bc
+       if err := p.updateUncompressedPayload(sr); err != nil {
+               p.log.Error(err)
+               runCallback(sr.callback, nil, msg, err)
+               return
        }
+
+       p.updateMetaData(sr)
+
+       p.dataChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt) {
@@ -1435,27 +1448,29 @@ func (p *partitionProducer) Close() {
 }
 
 type sendRequest struct {
-       ctx              context.Context
-       msg              *ProducerMessage
-       callback         func(MessageID, *ProducerMessage, error)
-       callbackOnce     *sync.Once
-       publishTime      time.Time
-       flushImmediately bool
-       blockCh          chan struct{}
-       closeBlockChOnce *sync.Once
-       totalChunks      int
-       chunkID          int
-       uuid             string
-       chunkRecorder    *chunkRecorder
-       transaction      *transaction
-       reservedMem      int64
-}
-
-// stopBlock can be invoked multiple times safety
-func (sr *sendRequest) stopBlock() {
-       sr.closeBlockChOnce.Do(func() {
-               close(sr.blockCh)
-       })
+       ctx                 context.Context
+       msg                 *ProducerMessage
+       callback            func(MessageID, *ProducerMessage, error)
+       callbackOnce        *sync.Once
+       publishTime         time.Time
+       flushImmediately    bool
+       totalChunks         int
+       chunkID             int
+       uuid                string
+       chunkRecorder       *chunkRecorder
+       transaction         *transaction
+       reservedMem         int64
+       sendAsBatch         bool
+       schema              Schema
+       schemaVersion       []byte
+       uncompressedPayload []byte
+       uncompressedSize    int64
+       compressedPayload   []byte
+       compressedSize      int
+       payloadChunkSize    int
+       mm                  *pb.MessageMetadata
+       deliverAt           time.Time
+       maxMessageSize      int32
 }
 
 type closeProducer struct {

Reply via email to