Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r973556288
##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
- // if msg is too large
- if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+ uncompressedSize := len(uncompressedPayload)
+
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ }
+
+ mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+ // set default ReplicationClusters when DisableReplication
+ if msg.DisableReplication {
+ msg.ReplicationClusters = []string{"__local__"}
+ }
+
+ sendAsBatch := !p.options.DisableBatching &&
+ msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() < 0
+
+ if !sendAsBatch {
+ // update sequence id for metadata, make the size of
msgMetadata more accurate
+ // batch sending will update sequence ID in the BatchBuilder
+ p.updateMetadataSeqID(mm, msg)
+ }
+
+ maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+ // compress payload if not batching
+ var compressedPayload []byte
+ var compressedSize int
+ var checkSize int
+ if !sendAsBatch {
+ compressedPayload = p.compressionProvider.Compress(nil,
uncompressedPayload)
+ compressedSize = len(compressedPayload)
+ checkSize = compressedSize
+ } else {
+ // final check for batching message is in serializeMessage
+ // this is a double check
+ checkSize = uncompressedSize
+ }
+
+ // if msg is too large and chunking is disabled
+ if checkSize > maxMessageSize && !p.options.EnableChunking {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
- WithField("size", len(payload)).
+ WithField("size", checkSize).
WithField("properties", msg.Properties).
- Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
+ Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
- deliverAt := msg.DeliverAt
- if msg.DeliverAfter.Nanoseconds() > 0 {
- deliverAt = time.Now().Add(msg.DeliverAfter)
+ var totalChunks int
+ // max chunk payload size
+ var payloadChunkSize int
+ if sendAsBatch || !p.options.EnableChunking {
+ totalChunks = 1
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+ } else {
+ payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
mm.Size()
+ if payloadChunkSize <= 0 {
+ request.callback(nil, msg, errMetaTooLarge)
Review Comment:
You're right. And it seems that there are some original code that did not
release publishSemaphore too.
https://github.com/apache/pulsar-client-go/blob/edd5c71651b79bd35358a51ae3925905ed9f17e1/pulsar/producer_partition.go#L484-L490
I have fixed all the publishSemaphore which should be released in the new
commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]