Gleiphir2769 commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961495238


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                }
        }
 
-       // if msg is too large
-       if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+       uncompressedSize := len(uncompressedPayload)
+
+       mm := p.genMetadata(msg, uncompressedSize)
+
+       // set default ReplicationClusters when DisableReplication
+       if msg.DisableReplication {
+               msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       // todo: deliverAt has calculated in genMetadata but it's not a good 
idea to make genMetadata() return it.
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+       }
+
+       sendAsBatch := !p.options.DisableBatching &&
+               msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+       // compress payload if not batching
+       var compressedPayload []byte
+       var compressedSize int
+       var checkSize int
+       if !sendAsBatch {
+               compressedPayload = p.compressionProvider.Compress(nil, 
uncompressedPayload)
+               compressedSize = len(compressedPayload)
+               checkSize = compressedSize
+       } else {
+               // final check for batching message is in serializeMessage
+               // this is a double check
+               checkSize = uncompressedSize
+       }
+
+       // if msg is too large and chunking is disabled
+       if checkSize > maxMessageSize && !p.options.EnableChunking {
                p.publishSemaphore.Release()
                request.callback(nil, request.msg, errMessageTooLarge)
                p.log.WithError(errMessageTooLarge).
-                       WithField("size", len(payload)).
+                       WithField("size", checkSize).
                        WithField("properties", msg.Properties).
-                       Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       Errorf("MaxMessageSize %d", maxMessageSize)
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }
 
+       var totalChunks int
+       // max chunk payload size
+       var payloadChunkSize int
+       if sendAsBatch || !p.options.EnableChunking {
+               totalChunks = 1
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+       } else {
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
mm.Size()
+               if payloadChunkSize <= 0 {
+                       request.callback(nil, msg, errMetaTooLarge)
+                       p.log.WithError(errMetaTooLarge).
+                               WithField("metadata size", mm.Size()).
+                               WithField("properties", msg.Properties).
+                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       p.metrics.PublishErrorsMsgTooLarge.Inc()
+                       return
+               }
+               // set ChunkMaxMessageSize
+               if p.options.ChunkMaxMessageSize != 0 {
+                       payloadChunkSize = 
int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+               }
+               totalChunks = int(math.Max(1, 
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+       }
+
+       // correct limit queue when chunked
+       for i := 0; i < totalChunks-1; i++ {
+               if !p.canAddToQueue(request) {
+                       return
+               }
+       }
+
+       // set total chunks to send request
+       request.totalChunks = totalChunks
+
+       if !sendAsBatch {
+               if msg.SequenceID != nil {
+                       mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+               } else {
+                       mm.SequenceId = 
proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+               }
+               if totalChunks > 1 {
+                       var lhs, rhs int
+                       uuid := fmt.Sprintf("%s-%s", p.producerName, 
strconv.FormatUint(*mm.SequenceId, 10))
+                       mm.Uuid = proto.String(uuid)
+                       mm.NumChunksFromMsg = proto.Int(totalChunks)
+                       mm.TotalChunkMsgSize = proto.Int(compressedSize)
+                       for chunkID := 0; chunkID < totalChunks; chunkID++ {
+                               lhs = chunkID * payloadChunkSize
+                               if rhs = lhs + payloadChunkSize; rhs > 
compressedSize {
+                                       rhs = compressedSize
+                               }
+                               // update chunk id
+                               mm.ChunkId = proto.Int(chunkID)
+                               nsr := &sendRequest{
+                                       ctx:         request.ctx,
+                                       msg:         request.msg,
+                                       callback:    request.callback,
+                                       publishTime: request.publishTime,
+                                       totalChunks: totalChunks,
+                                       chunkID:     chunkID,
+                                       uuid:        uuid,
+                               }
+                               p.internalSingleSend(mm, 
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+                       }
+               } else {
+                       p.internalSingleSend(mm, compressedPayload, request, 
uint32(maxMessageSize))
+               }
+       } else {
+               smm := p.genSingleMetaMessage(msg, uncompressedSize)
+               multiSchemaEnabled := !p.options.DisableMultiSchema
+               added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled)
+               if !added {
+                       // The current batch is full.. flush it and retry
+
+                       p.internalFlushCurrentBatch()
+
+                       // after flushing try again to add the current payload
+                       if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                               msg.ReplicationClusters, deliverAt, 
schemaVersion, multiSchemaEnabled); !ok {
+                               p.publishSemaphore.Release()
+                               request.callback(nil, request.msg, 
errFailAddToBatch)
+                               p.log.WithField("size", uncompressedSize).
+                                       WithField("properties", msg.Properties).
+                                       Error("unable to add message to batch")
+                               return
+                       }
+               }
+               if request.flushImmediately {
+
+                       p.internalFlushCurrentBatch()
+
+               }
+       }
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize 
int) (mm *pb.MessageMetadata) {
+       mm = &pb.MessageMetadata{
+               ProducerName:     &p.producerName,
+               PublishTime:      
proto.Uint64(internal.TimestampMillis(time.Now())),
+               ReplicateTo:      msg.ReplicationClusters,
+               UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+       }
+
+       if msg.Key != "" {
+               mm.PartitionKey = proto.String(msg.Key)
+       }
+
+       if msg.Properties != nil {
+               mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+       }
+
        deliverAt := msg.DeliverAt
        if msg.DeliverAfter.Nanoseconds() > 0 {
                deliverAt = time.Now().Add(msg.DeliverAfter)
        }
+       if deliverAt.UnixNano() > 0 {
+               mm.DeliverAtTime = 
proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+       }
 
-       sendAsBatch := !p.options.DisableBatching &&
-               msg.ReplicationClusters == nil &&
-               deliverAt.UnixNano() < 0
+       return
+}
 
-       smm := &pb.SingleMessageMetadata{
-               PayloadSize: proto.Int(len(payload)),
+func (p *partitionProducer) genSingleMetaMessage(msg *ProducerMessage,

Review Comment:
   Yes, the chunking or single message do not need smm (SingleMessageMetadata), 
it only used for batching messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to