gunli commented on code in PR #1071:
URL: https://github.com/apache/pulsar-client-go/pull/1071#discussion_r1276239137
##########
pulsar/producer_partition.go:
##########
@@ -1121,78 +958,301 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
p.internalSendAsync(ctx, msg, callback, false)
}
-func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
- callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
- p.log.Error("Message is nil")
- runCallback(callback, nil, msg, newError(InvalidMessage,
"Message is nil"))
- return
+ return newError(InvalidMessage, "Message is nil")
}
if msg.Value != nil && msg.Payload != nil {
- p.log.Error("Can not set Value and Payload both")
- runCallback(callback, nil, msg, newError(InvalidMessage, "Can
not set Value and Payload both"))
- return
+ return newError(InvalidMessage, "Can not set Value and Payload
both")
}
- // Register transaction operation to transaction and the transaction
coordinator.
- var newCallback func(MessageID, *ProducerMessage, error)
- var txn *transaction
- if msg.Transaction != nil {
- transactionImpl := (msg.Transaction).(*transaction)
- txn = transactionImpl
- if transactionImpl.state != TxnOpen {
- p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
- " by a non-open transaction.")
- runCallback(callback, nil, msg, newError(InvalidStatus,
"Failed to send message by a non-open transaction."))
- return
+ if p.options.DisableMultiSchema {
+ if msg.Schema != nil && p.options.Schema != nil &&
+ msg.Schema.GetSchemaInfo().hash() !=
p.options.Schema.GetSchemaInfo().hash() {
+ p.log.Errorf("The producer %s of the topic %s is
disabled the `MultiSchema`", p.producerName, p.topic)
+ return fmt.Errorf("msg schema can not match with
producer schema")
}
+ }
- if err := transactionImpl.registerProducerTopic(p.topic); err
!= nil {
- runCallback(callback, nil, msg, err)
- return
+ return nil
+}
+
+func (p *partitionProducer) updateSchema(sr *sendRequest) error {
+ var schema Schema
+ var schemaVersion []byte
+ var err error
+
+ if sr.msg.Schema != nil {
+ schema = sr.msg.Schema
+ } else if p.options.Schema != nil {
+ schema = p.options.Schema
+ }
+
+ if schema == nil {
+ return nil
+ }
+
+ schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+ if schemaVersion == nil {
+ schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+ if err != nil {
+ return fmt.Errorf("get schema version fail, err: %w",
err)
}
- if err := transactionImpl.registerSendOrAckOp(); err != nil {
- runCallback(callback, nil, msg, err)
- return
+ p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+ }
+
+ sr.schema = schema
+ sr.schemaVersion = schemaVersion
+ return nil
+}
+
+func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error {
+ // read payload from message
+ sr.uncompressedPayload = sr.msg.Payload
+
+ if sr.msg.Value != nil {
+ if sr.schema == nil {
+ p.log.Errorf("Schema encode message failed %s",
sr.msg.Value)
+ return newError(SchemaFailure, "set schema value
without setting schema")
}
- newCallback = func(id MessageID, producerMessage
*ProducerMessage, err error) {
- runCallback(callback, id, producerMessage, err)
- transactionImpl.endSendOrAckOp(err)
+
+ // payload and schema are mutually exclusive
+ // try to get payload from schema value only if payload is not
set
+ schemaPayload, err := sr.schema.Encode(sr.msg.Value)
+ if err != nil {
+ p.log.WithError(err).Errorf("Schema encode message
failed %s", sr.msg.Value)
+ return newError(SchemaFailure, err.Error())
}
+
+ sr.uncompressedPayload = schemaPayload
+ }
+
+ sr.uncompressedSize = int64(len(sr.uncompressedPayload))
+ return nil
+}
+
+func (p *partitionProducer) updateMetaData(sr *sendRequest) {
+ deliverAt := sr.msg.DeliverAt
+ if sr.msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(sr.msg.DeliverAfter)
+ }
+
+ sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if sr.msg.DisableReplication {
+ sr.msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sr.sendAsBatch = !p.options.DisableBatching &&
+ sr.msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sr.sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(sr.mm, sr.msg)
+ }
+
+ sr.deliverAt = deliverAt
+}
+
+func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
+ checkSize := sr.uncompressedSize
+ if !sr.sendAsBatch {
+ sr.compressedPayload = p.compressionProvider.Compress(nil,
sr.uncompressedPayload)
+ sr.compressedSize = len(sr.compressedPayload)
+
+ // set the compress type in msgMetaData
+ compressionType := pb.CompressionType(p.options.CompressionType)
+ if compressionType != pb.CompressionType_NONE {
+ sr.mm.Compression = &compressionType
+ }
+
+ checkSize = int64(sr.compressedSize)
+ }
+
+ sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize()))
+
+ // if msg is too large and chunking is disabled
+ if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
+ p.log.WithError(errMessageTooLarge).
+ WithField("size", checkSize).
+ WithField("properties", sr.msg.Properties).
+ Errorf("MaxMessageSize %d", sr.maxMessageSize)
+
+ return errMessageTooLarge
+ }
+
+ if sr.sendAsBatch || !p.options.EnableChunking {
+ sr.totalChunks = 1
+ sr.payloadChunkSize = int(sr.maxMessageSize)
+ return nil
+ }
+
+ sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
+ if sr.payloadChunkSize <= 0 {
+ p.log.WithError(errMetaTooLarge).
+ WithField("metadata size", proto.Size(sr.mm)).
+ WithField("properties", sr.msg.Properties).
+ Errorf("MaxMessageSize %d", sr.maxMessageSize)
+
+ return errMetaTooLarge
+ }
+
+ // set ChunkMaxMessageSize
+ if p.options.ChunkMaxMessageSize != 0 {
+ sr.payloadChunkSize =
int(math.Min(float64(sr.payloadChunkSize),
float64(p.options.ChunkMaxMessageSize)))
+ }
+
+ sr.totalChunks = int(math.Max(1,
math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
+ return nil
+}
+
+func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
+ if sr.msg.Transaction == nil {
+ return nil
+ }
+
+ txn := (sr.msg.Transaction).(*transaction)
+ if txn.state != TxnOpen {
+ p.log.WithField("state", txn.state).Error("Failed to send
message" +
+ " by a non-open transaction.")
+ return newError(InvalidStatus, "Failed to send message by a
non-open transaction.")
+ }
+
+ if err := txn.registerProducerTopic(p.topic); err != nil {
+ return err
+ }
+
+ if err := txn.registerSendOrAckOp(); err != nil {
+ return err
+ }
+
+ sr.transaction = txn
+ return nil
+}
+
+func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
+ for i := 0; i < sr.totalChunks; i++ {
+ if p.options.DisableBlockIfQueueFull {
+ if !p.publishSemaphore.TryAcquire() {
+ return errSendQueueIsFull
+ }
+ } else {
+ if !p.publishSemaphore.Acquire(sr.ctx) {
+ return errContextExpired
+ }
+ }
+ }
+
+ p.metrics.MessagesPending.Add(float64(sr.totalChunks))
+ sr.semaphore = p.publishSemaphore
+ sr.reservedSemaphore = sr.totalChunks
+ return nil
+}
+
+func (p *partitionProducer) reserveMem(sr *sendRequest) error {
+ requiredMem := sr.uncompressedSize
+ if !sr.sendAsBatch {
+ requiredMem = int64(sr.compressedSize)
+ }
+
+ if p.options.DisableBlockIfQueueFull {
+ if !p.client.memLimit.TryReserveMemory(requiredMem) {
+ return errMemoryBufferIsFull
+ }
+
} else {
- newCallback = callback
+ if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
+ return errContextExpired
+ }
}
+
+ sr.memLimit = p.client.memLimit
+ sr.reservedMem += requiredMem
+ p.metrics.BytesPending.Add(float64(requiredMem))
+ return nil
+}
+
+func (p *partitionProducer) reserveResources(sr *sendRequest) error {
+ if err := p.reserveSemaphore(sr); err != nil {
+ return err
+ }
+ if err := p.reserveMem(sr); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+ err := p.validateMsg(msg)
Review Comment:
inline will make internalSendAsync a BIG func, about 200 lines, it hard to
read, spilt into small funcs will be more clear.
##########
pulsar/producer_partition.go:
##########
@@ -1121,78 +958,301 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
p.internalSendAsync(ctx, msg, callback, false)
}
-func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
- callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
- p.log.Error("Message is nil")
- runCallback(callback, nil, msg, newError(InvalidMessage,
"Message is nil"))
- return
+ return newError(InvalidMessage, "Message is nil")
}
if msg.Value != nil && msg.Payload != nil {
- p.log.Error("Can not set Value and Payload both")
- runCallback(callback, nil, msg, newError(InvalidMessage, "Can
not set Value and Payload both"))
- return
+ return newError(InvalidMessage, "Can not set Value and Payload
both")
}
- // Register transaction operation to transaction and the transaction
coordinator.
- var newCallback func(MessageID, *ProducerMessage, error)
- var txn *transaction
- if msg.Transaction != nil {
- transactionImpl := (msg.Transaction).(*transaction)
- txn = transactionImpl
- if transactionImpl.state != TxnOpen {
- p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
- " by a non-open transaction.")
- runCallback(callback, nil, msg, newError(InvalidStatus,
"Failed to send message by a non-open transaction."))
- return
+ if p.options.DisableMultiSchema {
+ if msg.Schema != nil && p.options.Schema != nil &&
+ msg.Schema.GetSchemaInfo().hash() !=
p.options.Schema.GetSchemaInfo().hash() {
+ p.log.Errorf("The producer %s of the topic %s is
disabled the `MultiSchema`", p.producerName, p.topic)
+ return fmt.Errorf("msg schema can not match with
producer schema")
}
+ }
- if err := transactionImpl.registerProducerTopic(p.topic); err
!= nil {
- runCallback(callback, nil, msg, err)
- return
+ return nil
+}
+
+func (p *partitionProducer) updateSchema(sr *sendRequest) error {
+ var schema Schema
+ var schemaVersion []byte
+ var err error
+
+ if sr.msg.Schema != nil {
+ schema = sr.msg.Schema
+ } else if p.options.Schema != nil {
+ schema = p.options.Schema
+ }
+
+ if schema == nil {
+ return nil
+ }
+
+ schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+ if schemaVersion == nil {
+ schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+ if err != nil {
+ return fmt.Errorf("get schema version fail, err: %w",
err)
}
- if err := transactionImpl.registerSendOrAckOp(); err != nil {
- runCallback(callback, nil, msg, err)
- return
+ p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+ }
+
+ sr.schema = schema
+ sr.schemaVersion = schemaVersion
+ return nil
+}
+
+func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error {
+ // read payload from message
+ sr.uncompressedPayload = sr.msg.Payload
+
+ if sr.msg.Value != nil {
+ if sr.schema == nil {
+ p.log.Errorf("Schema encode message failed %s",
sr.msg.Value)
+ return newError(SchemaFailure, "set schema value
without setting schema")
}
- newCallback = func(id MessageID, producerMessage
*ProducerMessage, err error) {
- runCallback(callback, id, producerMessage, err)
- transactionImpl.endSendOrAckOp(err)
+
+ // payload and schema are mutually exclusive
+ // try to get payload from schema value only if payload is not
set
+ schemaPayload, err := sr.schema.Encode(sr.msg.Value)
+ if err != nil {
+ p.log.WithError(err).Errorf("Schema encode message
failed %s", sr.msg.Value)
+ return newError(SchemaFailure, err.Error())
}
+
+ sr.uncompressedPayload = schemaPayload
+ }
+
+ sr.uncompressedSize = int64(len(sr.uncompressedPayload))
+ return nil
+}
+
+func (p *partitionProducer) updateMetaData(sr *sendRequest) {
+ deliverAt := sr.msg.DeliverAt
+ if sr.msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(sr.msg.DeliverAfter)
+ }
+
+ sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if sr.msg.DisableReplication {
+ sr.msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sr.sendAsBatch = !p.options.DisableBatching &&
+ sr.msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sr.sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(sr.mm, sr.msg)
+ }
+
+ sr.deliverAt = deliverAt
+}
+
+func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
+ checkSize := sr.uncompressedSize
+ if !sr.sendAsBatch {
+ sr.compressedPayload = p.compressionProvider.Compress(nil,
sr.uncompressedPayload)
+ sr.compressedSize = len(sr.compressedPayload)
+
+ // set the compress type in msgMetaData
+ compressionType := pb.CompressionType(p.options.CompressionType)
+ if compressionType != pb.CompressionType_NONE {
+ sr.mm.Compression = &compressionType
+ }
+
+ checkSize = int64(sr.compressedSize)
+ }
+
+ sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize()))
+
+ // if msg is too large and chunking is disabled
+ if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
+ p.log.WithError(errMessageTooLarge).
+ WithField("size", checkSize).
+ WithField("properties", sr.msg.Properties).
+ Errorf("MaxMessageSize %d", sr.maxMessageSize)
+
+ return errMessageTooLarge
+ }
+
+ if sr.sendAsBatch || !p.options.EnableChunking {
+ sr.totalChunks = 1
+ sr.payloadChunkSize = int(sr.maxMessageSize)
+ return nil
+ }
+
+ sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
+ if sr.payloadChunkSize <= 0 {
+ p.log.WithError(errMetaTooLarge).
+ WithField("metadata size", proto.Size(sr.mm)).
+ WithField("properties", sr.msg.Properties).
+ Errorf("MaxMessageSize %d", sr.maxMessageSize)
+
+ return errMetaTooLarge
+ }
+
+ // set ChunkMaxMessageSize
+ if p.options.ChunkMaxMessageSize != 0 {
+ sr.payloadChunkSize =
int(math.Min(float64(sr.payloadChunkSize),
float64(p.options.ChunkMaxMessageSize)))
+ }
+
+ sr.totalChunks = int(math.Max(1,
math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
+ return nil
+}
+
+func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
+ if sr.msg.Transaction == nil {
+ return nil
+ }
+
+ txn := (sr.msg.Transaction).(*transaction)
+ if txn.state != TxnOpen {
+ p.log.WithField("state", txn.state).Error("Failed to send
message" +
+ " by a non-open transaction.")
+ return newError(InvalidStatus, "Failed to send message by a
non-open transaction.")
+ }
+
+ if err := txn.registerProducerTopic(p.topic); err != nil {
+ return err
+ }
+
+ if err := txn.registerSendOrAckOp(); err != nil {
+ return err
+ }
+
+ sr.transaction = txn
+ return nil
+}
+
+func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
+ for i := 0; i < sr.totalChunks; i++ {
+ if p.options.DisableBlockIfQueueFull {
+ if !p.publishSemaphore.TryAcquire() {
+ return errSendQueueIsFull
+ }
+ } else {
+ if !p.publishSemaphore.Acquire(sr.ctx) {
+ return errContextExpired
+ }
+ }
+ }
+
+ p.metrics.MessagesPending.Add(float64(sr.totalChunks))
+ sr.semaphore = p.publishSemaphore
+ sr.reservedSemaphore = sr.totalChunks
+ return nil
+}
+
+func (p *partitionProducer) reserveMem(sr *sendRequest) error {
+ requiredMem := sr.uncompressedSize
+ if !sr.sendAsBatch {
+ requiredMem = int64(sr.compressedSize)
+ }
+
+ if p.options.DisableBlockIfQueueFull {
+ if !p.client.memLimit.TryReserveMemory(requiredMem) {
+ return errMemoryBufferIsFull
+ }
+
} else {
- newCallback = callback
+ if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
+ return errContextExpired
+ }
}
+
+ sr.memLimit = p.client.memLimit
+ sr.reservedMem += requiredMem
+ p.metrics.BytesPending.Add(float64(requiredMem))
+ return nil
+}
+
+func (p *partitionProducer) reserveResources(sr *sendRequest) error {
+ if err := p.reserveSemaphore(sr); err != nil {
+ return err
+ }
+ if err := p.reserveMem(sr); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+ err := p.validateMsg(msg)
+ if err != nil {
+ p.log.Error(err)
+ runCallback(callback, nil, msg, err)
+ return
+ }
+
if p.getProducerState() != producerReady {
// Producer is closing
- runCallback(newCallback, nil, msg, errProducerClosed)
+ runCallback(callback, nil, msg, errProducerClosed)
return
}
- // bc only works when DisableBlockIfQueueFull is false
- bc := make(chan struct{})
+ // run interceptors before encoding/compressing
+ p.options.Interceptors.BeforeSend(p, msg)
- // callbackOnce make sure the callback is only invoked once in chunking
- callbackOnce := &sync.Once{}
sr := &sendRequest{
+ producer: p,
ctx: ctx,
msg: msg,
- callback: newCallback,
- callbackOnce: callbackOnce,
- flushImmediately: flushImmediately,
+ callback: callback,
+ callbackOnce: &sync.Once{},
publishTime: time.Now(),
- blockCh: bc,
- closeBlockChOnce: &sync.Once{},
- transaction: txn,
+ flushImmediately: flushImmediately,
}
- p.options.Interceptors.BeforeSend(p, msg)
- p.dataChan <- sr
+ err = p.updateSchema(sr)
Review Comment:
inline will make internalSendAsync a BIG func, about 200 lines, it hard to
read, spilt into small funcs will be more clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]