Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r963695252


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                }
        }
 
-       // if msg is too large
-       if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+       uncompressedSize := len(uncompressedPayload)
+
+       mm := p.genMetadata(msg, uncompressedSize)
+
+       // set default ReplicationClusters when DisableReplication
+       if msg.DisableReplication {
+               msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       // todo: deliverAt has calculated in genMetadata but it's not a good 
idea to make genMetadata() return it.
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+       }
+
+       sendAsBatch := !p.options.DisableBatching &&
+               msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+       // compress payload if not batching
+       var compressedPayload []byte
+       var compressedSize int
+       var checkSize int
+       if !sendAsBatch {
+               compressedPayload = p.compressionProvider.Compress(nil, 
uncompressedPayload)
+               compressedSize = len(compressedPayload)
+               checkSize = compressedSize
+       } else {
+               // final check for batching message is in serializeMessage
+               // this is a double check
+               checkSize = uncompressedSize
+       }
+
+       // if msg is too large and chunking is disabled
+       if checkSize > maxMessageSize && !p.options.EnableChunking {
                p.publishSemaphore.Release()
                request.callback(nil, request.msg, errMessageTooLarge)
                p.log.WithError(errMessageTooLarge).
-                       WithField("size", len(payload)).
+                       WithField("size", checkSize).
                        WithField("properties", msg.Properties).
-                       Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       Errorf("MaxMessageSize %d", maxMessageSize)
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }
 
+       var totalChunks int
+       // max chunk payload size
+       var payloadChunkSize int
+       if sendAsBatch || !p.options.EnableChunking {
+               totalChunks = 1
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+       } else {
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
mm.Size()
+               if payloadChunkSize <= 0 {
+                       request.callback(nil, msg, errMetaTooLarge)
+                       p.log.WithError(errMetaTooLarge).
+                               WithField("metadata size", mm.Size()).
+                               WithField("properties", msg.Properties).
+                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       p.metrics.PublishErrorsMsgTooLarge.Inc()
+                       return
+               }
+               // set ChunkMaxMessageSize
+               if p.options.ChunkMaxMessageSize != 0 {
+                       payloadChunkSize = 
int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+               }
+               totalChunks = int(math.Max(1, 
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+       }
+
+       // correct limit queue when chunked
+       for i := 0; i < totalChunks-1; i++ {
+               if !p.canAddToQueue(request) {
+                       return
+               }
+       }
+
+       // set total chunks to send request
+       request.totalChunks = totalChunks
+
+       if !sendAsBatch {
+               if msg.SequenceID != nil {
+                       mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+               } else {
+                       mm.SequenceId = 
proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+               }
+               if totalChunks > 1 {
+                       var lhs, rhs int
+                       uuid := fmt.Sprintf("%s-%s", p.producerName, 
strconv.FormatUint(*mm.SequenceId, 10))
+                       mm.Uuid = proto.String(uuid)
+                       mm.NumChunksFromMsg = proto.Int(totalChunks)
+                       mm.TotalChunkMsgSize = proto.Int(compressedSize)
+                       for chunkID := 0; chunkID < totalChunks; chunkID++ {
+                               lhs = chunkID * payloadChunkSize
+                               if rhs = lhs + payloadChunkSize; rhs > 
compressedSize {
+                                       rhs = compressedSize
+                               }
+                               // update chunk id
+                               mm.ChunkId = proto.Int(chunkID)
+                               nsr := &sendRequest{
+                                       ctx:         request.ctx,
+                                       msg:         request.msg,
+                                       callback:    request.callback,
+                                       publishTime: request.publishTime,
+                                       totalChunks: totalChunks,
+                                       chunkID:     chunkID,
+                                       uuid:        uuid,
+                               }
+                               p.internalSingleSend(mm, 
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+                       }
+               } else {
+                       p.internalSingleSend(mm, compressedPayload, request, 
uint32(maxMessageSize))
+               }
+       } else {
+               smm := p.genSingleMetaMessage(msg, uncompressedSize)
+               multiSchemaEnabled := !p.options.DisableMultiSchema
+               added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled)
+               if !added {
+                       // The current batch is full.. flush it and retry
+
+                       p.internalFlushCurrentBatch()
+
+                       // after flushing try again to add the current payload
+                       if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                               msg.ReplicationClusters, deliverAt, 
schemaVersion, multiSchemaEnabled); !ok {
+                               p.publishSemaphore.Release()
+                               request.callback(nil, request.msg, 
errFailAddToBatch)
+                               p.log.WithField("size", uncompressedSize).
+                                       WithField("properties", msg.Properties).
+                                       Error("unable to add message to batch")
+                               return
+                       }
+               }
+               if request.flushImmediately {
+
+                       p.internalFlushCurrentBatch()
+
+               }
+       }
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize 
int) (mm *pb.MessageMetadata) {

Review Comment:
   Related issue: https://github.com/apache/pulsar/pull/16196/files#
   The payload chunk size may be calculated not correctly by two reasons in the 
Java client. 
   1. The MessageMetadata will be updated after computing the payload chunk 
size, i.e. the actual metadata size would be greater.
   2. OpSendMsg#getMessageHeaderAndPayloadSize doesn't exclude all bytes other 
than the metadata and payload, e.g. the 4 bytes checksum field.
   
   Firstly, in this PR, the order in which `messageMetadata` are se has been 
concerned to make sure that `totalchunks` can be calculated correctly. 
Secondly, Since the go client places the data directly into the write buffer 
instead of creating a data buffer for `OpSendMsg` in advance, Go client doesn't 
have the problem mentioned in reason 2.
   
   By the way, the unit test for `chunkSize` has been added to 
message_chunking_test.go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to