RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r972711264
##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
// Encryption specifies the fields required to encrypt a message
Encryption *ProducerEncryptionInfo
+
+ // EnableChunking controls whether automatic chunking of messages is
enabled for the producer. By default, chunking
+ // is disabled.
+ // Chunking can not be enabled when batching is enabled.
+ EnableChunking bool
+
+ // MaxChunkSize is the max size of single chunk payload.
+ // It will actually only take effect if it is smaller than
broker.MaxMessageSize
+ MaxChunkSize uint
Review Comment:
It's better to keep it consistent with the java client:
`ChunkMaxMessageSize`.
##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
- // if msg is too large
- if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+ uncompressedSize := len(uncompressedPayload)
+
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ }
+
+ mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if msg.DisableReplication {
+ msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sendAsBatch := !p.options.DisableBatching &&
+ msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(mm, msg)
+ }
+
+ maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+ // compress payload if not batching
+ var compressedPayload []byte
+ var compressedSize int
+ var checkSize int
+ if !sendAsBatch {
+ compressedPayload = p.compressionProvider.Compress(nil,
uncompressedPayload)
+ compressedSize = len(compressedPayload)
+ checkSize = compressedSize
+ } else {
+ // final check for batching message is in serializeMessage
+ // this is a double check
+ checkSize = uncompressedSize
+ }
+
+ // if msg is too large and chunking is disabled
+ if checkSize > maxMessageSize && !p.options.EnableChunking {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
- WithField("size", len(payload)).
+ WithField("size", checkSize).
WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
- deliverAt := msg.DeliverAt
- if msg.DeliverAfter.Nanoseconds() > 0 {
- deliverAt = time.Now().Add(msg.DeliverAfter)
+ var totalChunks int
+ // max chunk payload size
+ var payloadChunkSize int
+ if sendAsBatch || !p.options.EnableChunking {
+ totalChunks = 1
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+ } else {
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
mm.Size()
+ if payloadChunkSize <= 0 {
+ request.callback(nil, msg, errMetaTooLarge)
Review Comment:
Seems to need to release publishSemaphore here.
##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
- // if msg is too large
- if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+ uncompressedSize := len(uncompressedPayload)
+
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ }
+
+ mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if msg.DisableReplication {
+ msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sendAsBatch := !p.options.DisableBatching &&
+ msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(mm, msg)
+ }
+
+ maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+ // compress payload if not batching
+ var compressedPayload []byte
+ var compressedSize int
+ var checkSize int
+ if !sendAsBatch {
+ compressedPayload = p.compressionProvider.Compress(nil,
uncompressedPayload)
+ compressedSize = len(compressedPayload)
+ checkSize = compressedSize
+ } else {
+ // final check for batching message is in serializeMessage
+ // this is a double check
+ checkSize = uncompressedSize
+ }
+
+ // if msg is too large and chunking is disabled
+ if checkSize > maxMessageSize && !p.options.EnableChunking {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
- WithField("size", len(payload)).
+ WithField("size", checkSize).
WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
- deliverAt := msg.DeliverAt
- if msg.DeliverAfter.Nanoseconds() > 0 {
- deliverAt = time.Now().Add(msg.DeliverAfter)
+ var totalChunks int
+ // max chunk payload size
+ var payloadChunkSize int
+ if sendAsBatch || !p.options.EnableChunking {
+ totalChunks = 1
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+ } else {
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
mm.Size()
+ if payloadChunkSize <= 0 {
+ request.callback(nil, msg, errMetaTooLarge)
+ p.log.WithError(errMetaTooLarge).
+ WithField("metadata size", mm.Size()).
+ WithField("properties", msg.Properties).
+ Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ p.metrics.PublishErrorsMsgTooLarge.Inc()
+ return
+ }
+ // set MaxChunkSize
+ if p.options.MaxChunkSize != 0 {
+ payloadChunkSize =
int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+ }
+ totalChunks = int(math.Max(1,
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
- sendAsBatch := !p.options.DisableBatching &&
- msg.ReplicationClusters == nil &&
- deliverAt.UnixNano() < 0
+ // correct limit queue when chunked
+ for i := 0; i < totalChunks-1; i++ {
+ if !p.canAddToQueue(request) {
Review Comment:
Seems the DisableBlockIfQueueFull will not work here.
##########
pulsar/producer_partition.go:
##########
@@ -560,49 +728,60 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
smm.Properties = internal.ConvertFromStringMap(msg.Properties)
}
+ var sequenceID uint64
if msg.SequenceID != nil {
- sequenceID := uint64(*msg.SequenceID)
- smm.SequenceId = proto.Uint64(sequenceID)
+ sequenceID = uint64(*msg.SequenceID)
+ } else {
+ sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
}
- if !sendAsBatch {
- p.internalFlushCurrentBatch()
- }
+ smm.SequenceId = proto.Uint64(sequenceID)
- if msg.DisableReplication {
- msg.ReplicationClusters = []string{"__local__"}
- }
- multiSchemaEnabled := !p.options.DisableMultiSchema
- added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload,
request,
- msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled)
- if !added {
- // The current batch is full.. flush it and retry
+ return
+}
- p.internalFlushCurrentBatch()
+func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
+ compressedPayload []byte,
+ request *sendRequest,
+ maxMessageSize uint32) {
+ msg := request.msg
- // after flushing try again to add the current payload
- if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator,
payload, request,
- msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
- p.publishSemaphore.Release()
- request.callback(nil, request.msg, errFailAddToBatch)
- p.log.WithField("size", len(payload)).
- WithField("properties", msg.Properties).
- Error("unable to add message to batch")
- return
- }
- }
+ payloadBuf := internal.NewBuffer(len(compressedPayload))
+ payloadBuf.Write(compressedPayload)
- if !sendAsBatch || request.flushImmediately {
+ buffer := p.GetBuffer()
+ if buffer == nil {
+ buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3
/ 2))
+ }
- p.internalFlushCurrentBatch()
+ sid := *mm.SequenceId
+ if err := internal.SingleSend(
+ buffer,
+ p.producerID,
+ sid,
+ mm,
+ payloadBuf,
+ p.encryptor,
+ maxMessageSize,
+ ); err != nil {
+ request.callback(nil, request.msg, err)
Review Comment:
Seems need to release the publishSemaphore here.
##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
// Encryption specifies the fields required to encrypt a message
Encryption *ProducerEncryptionInfo
+
+ // EnableChunking controls whether automatic chunking of messages is
enabled for the producer. By default, chunking
+ // is disabled.
+ // Chunking can not be enabled when batching is enabled.
+ EnableChunking bool
+
+ // MaxChunkSize is the max size of single chunk payload.
+ // It will actually only take effect if it is smaller than
broker.MaxMessageSize
Review Comment:
```suggestion
// It will actually only take effect if it is smaller than the
maxMessageSize from the broker.
```
##########
pulsar/producer_partition.go:
##########
@@ -745,28 +933,34 @@ func (p *partitionProducer) failTimeoutMessages() {
}
func (p *partitionProducer) internalFlushCurrentBatches() {
- batchesData, sequenceIDs, callbacks, errors :=
p.batchBuilder.FlushBatches()
+ batchesData, sequenceIDs, callbacks, errs :=
p.batchBuilder.FlushBatches()
Review Comment:
Why rename it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]