Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r979344780


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +523,193 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                }
        }
 
-       // if msg is too large
-       if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+       uncompressedSize := len(uncompressedPayload)
+
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+       }
+
+       mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
+       // set default ReplicationClusters when DisableReplication
+       if msg.DisableReplication {
+               msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       sendAsBatch := !p.options.DisableBatching &&
+               msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       if !sendAsBatch {
+               // update sequence id for metadata, make the size of 
msgMetadata more accurate
+               // batch sending will update sequence ID in the BatchBuilder
+               p.updateMetadataSeqID(mm, msg)
+       }
+
+       maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+       // compress payload if not batching
+       var compressedPayload []byte
+       var compressedSize int
+       var checkSize int
+       if !sendAsBatch {
+               compressedPayload = p.compressionProvider.Compress(nil, 
uncompressedPayload)
+               compressedSize = len(compressedPayload)
+               checkSize = compressedSize
+       } else {
+               // final check for batching message is in serializeMessage
+               // this is a double check
+               checkSize = uncompressedSize
+       }
+
+       // if msg is too large and chunking is disabled
+       if checkSize > maxMessageSize && !p.options.EnableChunking {
                p.publishSemaphore.Release()
                request.callback(nil, request.msg, errMessageTooLarge)
                p.log.WithError(errMessageTooLarge).
-                       WithField("size", len(payload)).
+                       WithField("size", checkSize).
                        WithField("properties", msg.Properties).
-                       Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       Errorf("MaxMessageSize %d", maxMessageSize)
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }
 
-       deliverAt := msg.DeliverAt
-       if msg.DeliverAfter.Nanoseconds() > 0 {
-               deliverAt = time.Now().Add(msg.DeliverAfter)
+       var totalChunks int
+       // max chunk payload size
+       var payloadChunkSize int
+       if sendAsBatch || !p.options.EnableChunking {
+               totalChunks = 1
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+       } else {
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
mm.Size()
+               if payloadChunkSize <= 0 {
+                       request.callback(nil, msg, errMetaTooLarge)
+                       p.log.WithError(errMetaTooLarge).
+                               WithField("metadata size", mm.Size()).
+                               WithField("properties", msg.Properties).
+                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       p.metrics.PublishErrorsMsgTooLarge.Inc()
+                       return
+               }
+               // set MaxChunkSize
+               if p.options.MaxChunkSize != 0 {
+                       payloadChunkSize = 
int(math.Min(float64(payloadChunkSize), float64(p.options.MaxChunkSize)))
+               }
+               totalChunks = int(math.Max(1, 
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
        }
 
-       sendAsBatch := !p.options.DisableBatching &&
-               msg.ReplicationClusters == nil &&
-               deliverAt.UnixNano() < 0
+       // correct limit queue when chunked
+       for i := 0; i < totalChunks-1; i++ {
+               if !p.canAddToQueue(request) {

Review Comment:
   Enable `DisableBlockIfQueueFull` may not a good idea. I fix this problem by 
moving the `canAddToQueue` before the real send. This solution can ensure that 
the semaphore acquired for by chunking can be released in time. But it will 
also cause the sending of a single message (no batch) to waste more CPU 
resources (e.g. the single message must wait for permits or be discarded after 
compression).
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to