Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r974259601
##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
- // if msg is too large
- if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+ uncompressedSize := len(uncompressedPayload)
+
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ }
+
+ mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if msg.DisableReplication {
+ msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sendAsBatch := !p.options.DisableBatching &&
+ msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(mm, msg)
+ }
+
+ maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+ // compress payload if not batching
+ var compressedPayload []byte
+ var compressedSize int
+ var checkSize int
+ if !sendAsBatch {
+ compressedPayload = p.compressionProvider.Compress(nil,
uncompressedPayload)
+ compressedSize = len(compressedPayload)
+ checkSize = compressedSize
+ } else {
+ // final check for batching message is in serializeMessage
+ // this is a double check
+ checkSize = uncompressedSize
+ }
+
+ // if msg is too large and chunking is disabled
+ if checkSize > maxMessageSize && !p.options.EnableChunking {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
- WithField("size", len(payload)).
+ WithField("size", checkSize).
WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
- deliverAt := msg.DeliverAt
- if msg.DeliverAfter.Nanoseconds() > 0 {
- deliverAt = time.Now().Add(msg.DeliverAfter)
+ var totalChunks int
+ // max chunk payload size
+ var payloadChunkSize int
+ if sendAsBatch || !p.options.EnableChunking {
+ totalChunks = 1
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+ } else {
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
mm.Size()
+ if payloadChunkSize <= 0 {
+ request.callback(nil, msg, errMetaTooLarge)
+ p.log.WithError(errMetaTooLarge).
+ WithField("metadata size", mm.Size()).
+ WithField("properties", msg.Properties).
+ Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ p.metrics.PublishErrorsMsgTooLarge.Inc()
+ return
+ }
+ // set MaxChunkSize
+ if p.options.MaxChunkSize != 0 {
+ payloadChunkSize =
int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+ }
+ totalChunks = int(math.Max(1,
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
- sendAsBatch := !p.options.DisableBatching &&
- msg.ReplicationClusters == nil &&
- deliverAt.UnixNano() < 0
+ // correct limit queue when chunked
+ for i := 0; i < totalChunks-1; i++ {
+ if !p.canAddToQueue(request) {
Review Comment:
I think BlockIfQueueFull shoud not be enabled when `chunking`. When
`chunking` is enabled, the sending process will acquire more than one permits
(the number equals to chunks). It may never be able to release the permits it
has acquired. I create a pr here https://github.com/apache/pulsar/pull/17447.
##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
- // if msg is too large
- if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+ uncompressedSize := len(uncompressedPayload)
+
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ }
+
+ mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if msg.DisableReplication {
+ msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sendAsBatch := !p.options.DisableBatching &&
+ msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(mm, msg)
+ }
+
+ maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+ // compress payload if not batching
+ var compressedPayload []byte
+ var compressedSize int
+ var checkSize int
+ if !sendAsBatch {
+ compressedPayload = p.compressionProvider.Compress(nil,
uncompressedPayload)
+ compressedSize = len(compressedPayload)
+ checkSize = compressedSize
+ } else {
+ // final check for batching message is in serializeMessage
+ // this is a double check
+ checkSize = uncompressedSize
+ }
+
+ // if msg is too large and chunking is disabled
+ if checkSize > maxMessageSize && !p.options.EnableChunking {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
- WithField("size", len(payload)).
+ WithField("size", checkSize).
WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
- deliverAt := msg.DeliverAt
- if msg.DeliverAfter.Nanoseconds() > 0 {
- deliverAt = time.Now().Add(msg.DeliverAfter)
+ var totalChunks int
+ // max chunk payload size
+ var payloadChunkSize int
+ if sendAsBatch || !p.options.EnableChunking {
+ totalChunks = 1
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+ } else {
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
mm.Size()
+ if payloadChunkSize <= 0 {
+ request.callback(nil, msg, errMetaTooLarge)
+ p.log.WithError(errMetaTooLarge).
+ WithField("metadata size", mm.Size()).
+ WithField("properties", msg.Properties).
+ Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ p.metrics.PublishErrorsMsgTooLarge.Inc()
+ return
+ }
+ // set MaxChunkSize
+ if p.options.MaxChunkSize != 0 {
+ payloadChunkSize =
int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+ }
+ totalChunks = int(math.Max(1,
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
- sendAsBatch := !p.options.DisableBatching &&
- msg.ReplicationClusters == nil &&
- deliverAt.UnixNano() < 0
+ // correct limit queue when chunked
+ for i := 0; i < totalChunks-1; i++ {
+ if !p.canAddToQueue(request) {
Review Comment:
I think BlockIfQueueFull shoud not be enabled when `chunking`. When
`chunking` is enabled, the sending process will acquire more than one permits
(the number equals to chunks). It may never be able to release the permits it
has acquired. I have created a pr here
https://github.com/apache/pulsar/pull/17447.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]