This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 793008e7 refactor: prepare sendrequest and move to internalSendAsync
(#1120)
793008e7 is described below
commit 793008e7f3b233bfa64e8c22b7cabe10aa334bdb
Author: tison <[email protected]>
AuthorDate: Wed Oct 25 15:39:23 2023 +0800
refactor: prepare sendrequest and move to internalSendAsync (#1120)
Signed-off-by: tison <[email protected]>
Co-authored-by: gunli <[email protected]>
---
pulsar/message_chunking_test.go | 31 ++-
pulsar/producer_partition.go | 457 +++++++++++++++++++++-------------------
2 files changed, 259 insertions(+), 229 deletions(-)
diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go
index ee3ab177..fbdcaa0c 100644
--- a/pulsar/message_chunking_test.go
+++ b/pulsar/message_chunking_test.go
@@ -552,26 +552,41 @@ func sendSingleChunk(p Producer, uuid string, chunkID
int, totalChunks int) {
msg := &ProducerMessage{
Payload: []byte(fmt.Sprintf("chunk-%s-%d|", uuid, chunkID)),
}
+ wholePayload := msg.Payload
producerImpl := p.(*producer).producers[0].(*partitionProducer)
- mm := producerImpl.genMetadata(msg, len(msg.Payload), time.Now())
+ mm := producerImpl.genMetadata(msg, len(wholePayload), time.Now())
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
- mm.TotalChunkMsgSize = proto.Int32(int32(len(msg.Payload)))
+ mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload)))
mm.ChunkId = proto.Int32(int32(chunkID))
producerImpl.updateMetadataSeqID(mm, msg)
-
- doneCh := make(chan struct{})
producerImpl.internalSingleSend(
mm,
msg.Payload,
&sendRequest{
callback: func(id MessageID, producerMessage
*ProducerMessage, err error) {
- close(doneCh)
},
- msg: msg,
+ ctx: context.Background(),
+ msg: msg,
+ flushImmediately: true,
+ totalChunks: totalChunks,
+ chunkID: chunkID,
+ uuid: uuid,
+ chunkRecorder: newChunkRecorder(),
+ transaction: nil,
+ reservedMem: 0,
+ sendAsBatch: false,
+ schema: nil,
+ schemaVersion: nil,
+ uncompressedPayload: wholePayload,
+ uncompressedSize: int64(len(wholePayload)),
+ compressedPayload: wholePayload,
+ compressedSize: len(wholePayload),
+ payloadChunkSize: internal.MaxMessageSize -
proto.Size(mm),
+ mm: mm,
+ deliverAt: time.Now(),
+ maxMessageSize: internal.MaxMessageSize,
},
uint32(internal.MaxMessageSize),
)
-
- <-doneCh
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ac07341d..b00ed6b5 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -478,201 +478,31 @@ func runCallback(cb func(MessageID, *ProducerMessage,
error), id MessageID, msg
cb(id, msg, err)
}
-func (p *partitionProducer) internalSend(request *sendRequest) {
- p.log.Debug("Received send request: ", *request.msg)
+func (p *partitionProducer) internalSend(sr *sendRequest) {
+ p.log.Debug("Received send request: ", *sr.msg)
- msg := request.msg
-
- // read payload from message
- uncompressedPayload := msg.Payload
+ msg := sr.msg
- var schemaPayload []byte
- var err error
-
- // The block chan must be closed when returned with exception
- defer request.stopBlock()
- if !p.canAddToQueue(request) {
+ if !p.canAddToQueue(sr) {
return
}
- var schema Schema
- var schemaVersion []byte
- if msg.Schema != nil {
- schema = msg.Schema
- } else if p.options.Schema != nil {
- schema = p.options.Schema
- }
- if msg.Value != nil {
- // payload and schema are mutually exclusive
- // try to get payload from schema value only if payload is not
set
- if uncompressedPayload == nil && schema != nil {
- schemaPayload, err = schema.Encode(msg.Value)
- if err != nil {
- runCallback(request.callback, nil, request.msg,
newError(SchemaFailure, err.Error()))
- p.log.WithError(err).Errorf("Schema encode
message failed %s", msg.Value)
- return
- }
- }
- }
- if uncompressedPayload == nil {
- uncompressedPayload = schemaPayload
- }
-
- if schema != nil {
- schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
- if schemaVersion == nil {
- schemaVersion, err =
p.getOrCreateSchema(schema.GetSchemaInfo())
- if err != nil {
- p.log.WithError(err).Error("get schema version
fail")
- runCallback(request.callback, nil, request.msg,
fmt.Errorf("get schema version fail, err: %w", err))
- return
- }
- p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
- }
- }
-
- uncompressedSize := len(uncompressedPayload)
-
// try to reserve memory for uncompressedPayload
- if !p.canReserveMem(request, int64(uncompressedSize)) {
+ if !p.canReserveMem(sr, sr.uncompressedSize) {
return
}
- deliverAt := msg.DeliverAt
- if msg.DeliverAfter.Nanoseconds() > 0 {
- deliverAt = time.Now().Add(msg.DeliverAfter)
- }
-
- // set default ReplicationClusters when DisableReplication
- if msg.DisableReplication {
- msg.ReplicationClusters = []string{"__local__"}
- }
-
- mm := p.genMetadata(msg, uncompressedSize, deliverAt)
-
- sendAsBatch := !p.options.DisableBatching &&
- msg.ReplicationClusters == nil &&
- deliverAt.UnixNano() < 0
-
- // Once the batching is enabled, it can close blockCh early to make
block finish
- if sendAsBatch {
- request.stopBlock()
- } else {
- // update sequence id for metadata, make the size of
msgMetadata more accurate
- // batch sending will update sequence ID in the BatchBuilder
- p.updateMetadataSeqID(mm, msg)
- }
-
- maxMessageSize := int(p._getConn().GetMaxMessageSize())
-
- // compress payload if not batching
- var compressedPayload []byte
- var compressedSize int
- var checkSize int
- if !sendAsBatch {
- compressedPayload = p.compressionProvider.Compress(nil,
uncompressedPayload)
- compressedSize = len(compressedPayload)
- checkSize = compressedSize
-
- // set the compress type in msgMetaData
- compressionType := pb.CompressionType(p.options.CompressionType)
- if compressionType != pb.CompressionType_NONE {
- mm.Compression = &compressionType
- }
- } else {
- // final check for batching message is in serializeMessage
- // this is a double check
- checkSize = uncompressedSize
- }
-
- // if msg is too large and chunking is disabled
- if checkSize > maxMessageSize && !p.options.EnableChunking {
- p.releaseSemaphoreAndMem(int64(uncompressedSize))
- runCallback(request.callback, nil, request.msg,
errMessageTooLarge)
- p.log.WithError(errMessageTooLarge).
- WithField("size", checkSize).
- WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d", maxMessageSize)
+ if err := p.updateChunkInfo(sr); err != nil {
+ p.releaseSemaphoreAndMem(sr.uncompressedSize)
+ runCallback(sr.callback, nil, sr.msg, err)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
- var totalChunks int
- // max chunk payload size
- var payloadChunkSize int
- if sendAsBatch || !p.options.EnableChunking {
- totalChunks = 1
- payloadChunkSize = int(p._getConn().GetMaxMessageSize())
- } else {
- payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
proto.Size(mm)
- if payloadChunkSize <= 0 {
- p.releaseSemaphoreAndMem(int64(uncompressedSize))
- runCallback(request.callback, nil, msg, errMetaTooLarge)
- p.log.WithError(errMetaTooLarge).
- WithField("metadata size", proto.Size(mm)).
- WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
- p.metrics.PublishErrorsMsgTooLarge.Inc()
- return
- }
- // set ChunkMaxMessageSize
- if p.options.ChunkMaxMessageSize != 0 {
- payloadChunkSize =
int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
- }
- totalChunks = int(math.Max(1,
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
- }
-
- // set total chunks to send request
- request.totalChunks = totalChunks
-
- if !sendAsBatch {
- if totalChunks > 1 {
- var lhs, rhs int
- uuid := fmt.Sprintf("%s-%s", p.producerName,
strconv.FormatUint(*mm.SequenceId, 10))
- mm.Uuid = proto.String(uuid)
- mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
- mm.TotalChunkMsgSize =
proto.Int32(int32(compressedSize))
- cr := newChunkRecorder()
- for chunkID := 0; chunkID < totalChunks; chunkID++ {
- lhs = chunkID * payloadChunkSize
- if rhs = lhs + payloadChunkSize; rhs >
compressedSize {
- rhs = compressedSize
- }
- // update chunk id
- mm.ChunkId = proto.Int32(int32(chunkID))
- nsr := &sendRequest{
- ctx: request.ctx,
- msg: request.msg,
- callback: request.callback,
- callbackOnce: request.callbackOnce,
- publishTime: request.publishTime,
- blockCh: request.blockCh,
- closeBlockChOnce:
request.closeBlockChOnce,
- totalChunks: totalChunks,
- chunkID: chunkID,
- uuid: uuid,
- chunkRecorder: cr,
- transaction: request.transaction,
- reservedMem: int64(rhs - lhs),
- }
- // the permit of first chunk has acquired
- if chunkID != 0 && !p.canAddToQueue(nsr) {
-
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
- return
- }
- p.internalSingleSend(mm,
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
- }
- // close the blockCh when all the chunks acquired
permits
- request.stopBlock()
- } else {
- // close the blockCh when totalChunks is 1 (it has
acquired permits)
- request.stopBlock()
- p.internalSingleSend(mm, compressedPayload, request,
uint32(maxMessageSize))
- }
- } else {
- smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
+ if sr.sendAsBatch {
+ smm := p.genSingleMessageMetadataInBatch(msg,
int(sr.uncompressedSize))
multiSchemaEnabled := !p.options.DisableMultiSchema
- added := addRequestToBatch(smm, p, uncompressedPayload,
request, msg, deliverAt, schemaVersion,
+ added := addRequestToBatch(smm, p, sr.uncompressedPayload, sr,
msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
@@ -680,21 +510,71 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
- if ok := addRequestToBatch(smm, p, uncompressedPayload,
request, msg, deliverAt, schemaVersion,
+ if ok := addRequestToBatch(smm, p,
sr.uncompressedPayload, sr, msg, sr.deliverAt, sr.schemaVersion,
multiSchemaEnabled); !ok {
-
p.releaseSemaphoreAndMem(int64(uncompressedSize))
- runCallback(request.callback, nil, request.msg,
errFailAddToBatch)
- p.log.WithField("size", uncompressedSize).
+ p.releaseSemaphoreAndMem(sr.uncompressedSize)
+ runCallback(sr.callback, nil, sr.msg,
errFailAddToBatch)
+ p.log.WithField("size", sr.uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
}
}
- if request.flushImmediately {
-
+ if sr.flushImmediately {
p.internalFlushCurrentBatch()
+ }
+ return
+ }
+
+ if sr.totalChunks <= 1 {
+ p.internalSingleSend(sr.mm, sr.compressedPayload, sr,
uint32(sr.maxMessageSize))
+ return
+ }
+ var lhs, rhs int
+ uuid := fmt.Sprintf("%s-%s", p.producerName,
strconv.FormatUint(*sr.mm.SequenceId, 10))
+ sr.mm.Uuid = proto.String(uuid)
+ sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks))
+ sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize))
+ cr := newChunkRecorder()
+ for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
+ lhs = chunkID * sr.payloadChunkSize
+ if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
+ rhs = sr.compressedSize
}
+ // update chunk id
+ sr.mm.ChunkId = proto.Int32(int32(chunkID))
+ nsr := &sendRequest{
+ ctx: sr.ctx,
+ msg: sr.msg,
+ callback: sr.callback,
+ callbackOnce: sr.callbackOnce,
+ publishTime: sr.publishTime,
+ flushImmediately: sr.flushImmediately,
+ totalChunks: sr.totalChunks,
+ chunkID: chunkID,
+ uuid: uuid,
+ chunkRecorder: cr,
+ transaction: sr.transaction,
+ reservedMem: int64(rhs - lhs),
+ sendAsBatch: sr.sendAsBatch,
+ schema: sr.schema,
+ schemaVersion: sr.schemaVersion,
+ uncompressedPayload: sr.uncompressedPayload,
+ uncompressedSize: sr.uncompressedSize,
+ compressedPayload: sr.compressedPayload,
+ compressedSize: sr.compressedSize,
+ payloadChunkSize: sr.payloadChunkSize,
+ mm: sr.mm,
+ deliverAt: sr.deliverAt,
+ maxMessageSize: sr.maxMessageSize,
+ }
+ // the permit of first chunk has acquired
+ if chunkID != 0 && !p.canAddToQueue(nsr) {
+ p.releaseSemaphoreAndMem(sr.uncompressedSize -
int64(lhs))
+ return
+ }
+ p.internalSingleSend(sr.mm, sr.compressedPayload[lhs:rhs], nsr,
uint32(sr.maxMessageSize))
}
}
@@ -766,8 +646,10 @@ func (p *partitionProducer)
updateSingleMessageMetadataSeqID(smm *pb.SingleMessa
}
}
-func (p *partitionProducer) genSingleMessageMetadataInBatch(msg
*ProducerMessage,
- uncompressedSize int) (smm *pb.SingleMessageMetadata) {
+func (p *partitionProducer) genSingleMessageMetadataInBatch(
+ msg *ProducerMessage,
+ uncompressedSize int,
+) (smm *pb.SingleMessageMetadata) {
smm = &pb.SingleMessageMetadata{
PayloadSize: proto.Int32(int32(uncompressedSize)),
}
@@ -1163,6 +1045,134 @@ func (p *partitionProducer) prepareTransaction(sr
*sendRequest) error {
return nil
}
+func (p *partitionProducer) updateSchema(sr *sendRequest) error {
+ var schema Schema
+ var schemaVersion []byte
+ var err error
+
+ if sr.msg.Schema != nil {
+ schema = sr.msg.Schema
+ } else if p.options.Schema != nil {
+ schema = p.options.Schema
+ }
+
+ if schema == nil {
+ return nil
+ }
+
+ schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+ if schemaVersion == nil {
+ schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+ if err != nil {
+ return fmt.Errorf("get schema version fail, err: %w",
err)
+ }
+ p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+ }
+
+ sr.schema = schema
+ sr.schemaVersion = schemaVersion
+ return nil
+}
+
+func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
+ // read payload from message
+ sr.uncompressedPayload = sr.msg.Payload
+
+ if sr.msg.Value != nil {
+ if sr.schema == nil {
+ p.log.Errorf("Schema encode message failed %s",
sr.msg.Value)
+ return newError(SchemaFailure, "set schema value
without setting schema")
+ }
+
+ // payload and schema are mutually exclusive
+ // try to get payload from schema value only if payload is not
set
+ schemaPayload, err := sr.schema.Encode(sr.msg.Value)
+ if err != nil {
+ p.log.WithError(err).Errorf("Schema encode message
failed %s", sr.msg.Value)
+ return newError(SchemaFailure, err.Error())
+ }
+
+ sr.uncompressedPayload = schemaPayload
+ }
+
+ sr.uncompressedSize = int64(len(sr.uncompressedPayload))
+ return nil
+}
+
+func (p *partitionProducer) updateMetaData(sr *sendRequest) {
+ deliverAt := sr.msg.DeliverAt
+ if sr.msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(sr.msg.DeliverAfter)
+ }
+
+ // set default ReplicationClusters when DisableReplication
+ if sr.msg.DisableReplication {
+ sr.msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt)
+
+ sr.sendAsBatch = !p.options.DisableBatching &&
+ sr.msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sr.sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(sr.mm, sr.msg)
+ }
+
+ sr.deliverAt = deliverAt
+}
+
+func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
+ checkSize := sr.uncompressedSize
+ if !sr.sendAsBatch {
+ sr.compressedPayload = p.compressionProvider.Compress(nil,
sr.uncompressedPayload)
+ sr.compressedSize = len(sr.compressedPayload)
+
+ // set the compress type in msgMetaData
+ compressionType := pb.CompressionType(p.options.CompressionType)
+ if compressionType != pb.CompressionType_NONE {
+ sr.mm.Compression = &compressionType
+ }
+
+ checkSize = int64(sr.compressedSize)
+ }
+
+ sr.maxMessageSize = p._getConn().GetMaxMessageSize()
+
+ // if msg is too large and chunking is disabled
+ if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
+ p.log.WithError(errMessageTooLarge).
+ WithField("size", checkSize).
+ WithField("properties", sr.msg.Properties).
+ Errorf("MaxMessageSize %d", sr.maxMessageSize)
+ return errMessageTooLarge
+ }
+
+ if sr.sendAsBatch || !p.options.EnableChunking {
+ sr.totalChunks = 1
+ sr.payloadChunkSize = int(sr.maxMessageSize)
+ } else {
+ sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
+ if sr.payloadChunkSize <= 0 {
+ p.log.WithError(errMetaTooLarge).
+ WithField("metadata size", proto.Size(sr.mm)).
+ WithField("properties", sr.msg.Properties).
+ Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ return errMetaTooLarge
+ }
+ // set ChunkMaxMessageSize
+ if p.options.ChunkMaxMessageSize != 0 {
+ sr.payloadChunkSize =
int(math.Min(float64(sr.payloadChunkSize),
float64(p.options.ChunkMaxMessageSize)))
+ }
+ sr.totalChunks = int(math.Max(1,
math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
+ }
+
+ return nil
+}
+
func (p *partitionProducer) internalSendAsync(
ctx context.Context,
msg *ProducerMessage,
@@ -1175,19 +1185,13 @@ func (p *partitionProducer) internalSendAsync(
return
}
- // bc only works when DisableBlockIfQueueFull is false
- bc := make(chan struct{})
- // callbackOnce make sure the callback is only invoked once in chunking
- callbackOnce := &sync.Once{}
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: callback,
- callbackOnce: callbackOnce,
+ callbackOnce: &sync.Once{},
flushImmediately: flushImmediately,
publishTime: time.Now(),
- blockCh: bc,
- closeBlockChOnce: &sync.Once{},
}
if err := p.prepareTransaction(sr); err != nil {
runCallback(sr.callback, nil, msg, err)
@@ -1202,12 +1206,21 @@ func (p *partitionProducer) internalSendAsync(
p.options.Interceptors.BeforeSend(p, msg)
- p.dataChan <- sr
+ if err := p.updateSchema(sr); err != nil {
+ p.log.Error(err)
+ runCallback(sr.callback, nil, msg, err)
+ return
+ }
- if !p.options.DisableBlockIfQueueFull {
- // block if queue full
- <-bc
+ if err := p.updateUncompressedPayload(sr); err != nil {
+ p.log.Error(err)
+ runCallback(sr.callback, nil, msg, err)
+ return
}
+
+ p.updateMetaData(sr)
+
+ p.dataChan <- sr
}
func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt) {
@@ -1435,27 +1448,29 @@ func (p *partitionProducer) Close() {
}
type sendRequest struct {
- ctx context.Context
- msg *ProducerMessage
- callback func(MessageID, *ProducerMessage, error)
- callbackOnce *sync.Once
- publishTime time.Time
- flushImmediately bool
- blockCh chan struct{}
- closeBlockChOnce *sync.Once
- totalChunks int
- chunkID int
- uuid string
- chunkRecorder *chunkRecorder
- transaction *transaction
- reservedMem int64
-}
-
-// stopBlock can be invoked multiple times safety
-func (sr *sendRequest) stopBlock() {
- sr.closeBlockChOnce.Do(func() {
- close(sr.blockCh)
- })
+ ctx context.Context
+ msg *ProducerMessage
+ callback func(MessageID, *ProducerMessage, error)
+ callbackOnce *sync.Once
+ publishTime time.Time
+ flushImmediately bool
+ totalChunks int
+ chunkID int
+ uuid string
+ chunkRecorder *chunkRecorder
+ transaction *transaction
+ reservedMem int64
+ sendAsBatch bool
+ schema Schema
+ schemaVersion []byte
+ uncompressedPayload []byte
+ uncompressedSize int64
+ compressedPayload []byte
+ compressedSize int
+ payloadChunkSize int
+ mm *pb.MessageMetadata
+ deliverAt time.Time
+ maxMessageSize int32
}
type closeProducer struct {