RobertIndie commented on code in PR #805:
URL: https://github.com/apache/pulsar-client-go/pull/805#discussion_r961383344


##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                }
        }
 
-       // if msg is too large
-       if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+       uncompressedSize := len(uncompressedPayload)
+
+       mm := p.genMetadata(msg, uncompressedSize)
+
+       // set default ReplicationClusters when DisableReplication
+       if msg.DisableReplication {
+               msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       // todo: deliverAt has calculated in genMetadata but it's not a good 
idea to make genMetadata() return it.
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+       }
+
+       sendAsBatch := !p.options.DisableBatching &&
+               msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+       // compress payload if not batching
+       var compressedPayload []byte
+       var compressedSize int
+       var checkSize int
+       if !sendAsBatch {
+               compressedPayload = p.compressionProvider.Compress(nil, 
uncompressedPayload)
+               compressedSize = len(compressedPayload)
+               checkSize = compressedSize
+       } else {
+               // final check for batching message is in serializeMessage
+               // this is a double check
+               checkSize = uncompressedSize
+       }
+
+       // if msg is too large and chunking is disabled
+       if checkSize > maxMessageSize && !p.options.EnableChunking {
                p.publishSemaphore.Release()
                request.callback(nil, request.msg, errMessageTooLarge)
                p.log.WithError(errMessageTooLarge).
-                       WithField("size", len(payload)).
+                       WithField("size", checkSize).
                        WithField("properties", msg.Properties).
-                       Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       Errorf("MaxMessageSize %d", maxMessageSize)
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }
 
+       var totalChunks int
+       // max chunk payload size
+       var payloadChunkSize int
+       if sendAsBatch || !p.options.EnableChunking {
+               totalChunks = 1
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+       } else {
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
mm.Size()
+               if payloadChunkSize <= 0 {
+                       request.callback(nil, msg, errMetaTooLarge)
+                       p.log.WithError(errMetaTooLarge).
+                               WithField("metadata size", mm.Size()).
+                               WithField("properties", msg.Properties).
+                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       p.metrics.PublishErrorsMsgTooLarge.Inc()
+                       return
+               }
+               // set ChunkMaxMessageSize
+               if p.options.ChunkMaxMessageSize != 0 {
+                       payloadChunkSize = 
int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+               }
+               totalChunks = int(math.Max(1, 
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+       }
+
+       // correct limit queue when chunked
+       for i := 0; i < totalChunks-1; i++ {
+               if !p.canAddToQueue(request) {
+                       return
+               }
+       }
+
+       // set total chunks to send request
+       request.totalChunks = totalChunks
+
+       if !sendAsBatch {
+               if msg.SequenceID != nil {
+                       mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+               } else {
+                       mm.SequenceId = 
proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+               }
+               if totalChunks > 1 {
+                       var lhs, rhs int
+                       uuid := fmt.Sprintf("%s-%s", p.producerName, 
strconv.FormatUint(*mm.SequenceId, 10))
+                       mm.Uuid = proto.String(uuid)
+                       mm.NumChunksFromMsg = proto.Int(totalChunks)
+                       mm.TotalChunkMsgSize = proto.Int(compressedSize)
+                       for chunkID := 0; chunkID < totalChunks; chunkID++ {
+                               lhs = chunkID * payloadChunkSize
+                               if rhs = lhs + payloadChunkSize; rhs > 
compressedSize {
+                                       rhs = compressedSize
+                               }
+                               // update chunk id
+                               mm.ChunkId = proto.Int(chunkID)
+                               nsr := &sendRequest{
+                                       ctx:         request.ctx,
+                                       msg:         request.msg,
+                                       callback:    request.callback,
+                                       publishTime: request.publishTime,
+                                       totalChunks: totalChunks,
+                                       chunkID:     chunkID,
+                                       uuid:        uuid,
+                               }
+                               p.internalSingleSend(mm, 
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+                       }
+               } else {
+                       p.internalSingleSend(mm, compressedPayload, request, 
uint32(maxMessageSize))
+               }
+       } else {
+               smm := p.genSingleMetaMessage(msg, uncompressedSize)
+               multiSchemaEnabled := !p.options.DisableMultiSchema
+               added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled)
+               if !added {
+                       // The current batch is full.. flush it and retry
+
+                       p.internalFlushCurrentBatch()
+
+                       // after flushing try again to add the current payload
+                       if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                               msg.ReplicationClusters, deliverAt, 
schemaVersion, multiSchemaEnabled); !ok {
+                               p.publishSemaphore.Release()
+                               request.callback(nil, request.msg, 
errFailAddToBatch)
+                               p.log.WithField("size", uncompressedSize).
+                                       WithField("properties", msg.Properties).
+                                       Error("unable to add message to batch")
+                               return
+                       }
+               }
+               if request.flushImmediately {
+
+                       p.internalFlushCurrentBatch()
+
+               }
+       }
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize 
int) (mm *pb.MessageMetadata) {

Review Comment:
   Could we add some unit tests to test this method?



##########
pulsar/producer_partition.go:
##########
@@ -463,12 +465,12 @@ func (p *partitionProducer) Name() string {
 }
 
 func (p *partitionProducer) internalSend(request *sendRequest) {
-       p.log.Debug("Received send request: ", *request)
+       p.log.Debug("Received send request: ", *request.msg)

Review Comment:
   Why changed here?



##########
pulsar/producer_partition.go:
##########
@@ -843,33 +1018,24 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
                return
        }
 
+       // bc only works when DisableBlockIfQueueFull is false
+       bc := make(chan struct{})
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
                callback:         callback,
                flushImmediately: flushImmediately,
                publishTime:      time.Now(),
+               blockCh:          bc,
        }
        p.options.Interceptors.BeforeSend(p, msg)
 
-       if p.options.DisableBlockIfQueueFull {

Review Comment:
   If we remove these codes. It seems will be blocked at `p.eventsChan <- sr` 
if it reaches `maxPendingMessages` even if the DisableBlockIfQueueFull is true. 
Because the eventsChan is initialized with `make(chan interface{}, 
maxPendingMessages)`.



##########
pulsar/producer.go:
##########
@@ -172,6 +172,15 @@ type ProducerOptions struct {
 
        // Encryption specifies the fields required to encrypt a message
        Encryption *ProducerEncryptionInfo
+
+       // EnableChunking controls whether automatic chunking of messages is 
enabled for the producer. By default, chunking
+       // is disabled.
+       // Chunking can not be enabled when batching is not closed.

Review Comment:
   ```suggestion
        // Chunking can not be enabled when batching is enabled.
   ```



##########
pulsar/producer_partition.go:
##########
@@ -246,42 +256,14 @@ func (p *partitionProducer) grabCnx() error {
 
        p.producerName = res.Response.ProducerSuccess.GetProducerName()
 
-       var encryptor internalcrypto.Encryptor
+       //var encryptor internalcrypto.Encryptor

Review Comment:
   Please remove this comment.



##########
pulsar/producer_partition.go:
##########
@@ -717,6 +886,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                        for _, i := range pi.sendRequests {
                                sr := i.(*sendRequest)
                                if sr.msg != nil {
+                                       // todo: it's not correct. the size 
should be schemaed uncompressed payload size

Review Comment:
   Let's create an issue to track it and remove this comment.



##########
pulsar/producer_partition.go:
##########
@@ -519,29 +526,180 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                }
        }
 
-       // if msg is too large
-       if len(payload) > int(p._getConn().GetMaxMessageSize()) {
+       uncompressedSize := len(uncompressedPayload)
+
+       mm := p.genMetadata(msg, uncompressedSize)
+
+       // set default ReplicationClusters when DisableReplication
+       if msg.DisableReplication {
+               msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       // todo: deliverAt has calculated in genMetadata but it's not a good 
idea to make genMetadata() return it.
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+       }
+
+       sendAsBatch := !p.options.DisableBatching &&
+               msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       maxMessageSize := int(p._getConn().GetMaxMessageSize())
+
+       // compress payload if not batching
+       var compressedPayload []byte
+       var compressedSize int
+       var checkSize int
+       if !sendAsBatch {
+               compressedPayload = p.compressionProvider.Compress(nil, 
uncompressedPayload)
+               compressedSize = len(compressedPayload)
+               checkSize = compressedSize
+       } else {
+               // final check for batching message is in serializeMessage
+               // this is a double check
+               checkSize = uncompressedSize
+       }
+
+       // if msg is too large and chunking is disabled
+       if checkSize > maxMessageSize && !p.options.EnableChunking {
                p.publishSemaphore.Release()
                request.callback(nil, request.msg, errMessageTooLarge)
                p.log.WithError(errMessageTooLarge).
-                       WithField("size", len(payload)).
+                       WithField("size", checkSize).
                        WithField("properties", msg.Properties).
-                       Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       Errorf("MaxMessageSize %d", maxMessageSize)
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }
 
+       var totalChunks int
+       // max chunk payload size
+       var payloadChunkSize int
+       if sendAsBatch || !p.options.EnableChunking {
+               totalChunks = 1
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize())
+       } else {
+               payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
mm.Size()
+               if payloadChunkSize <= 0 {
+                       request.callback(nil, msg, errMetaTooLarge)
+                       p.log.WithError(errMetaTooLarge).
+                               WithField("metadata size", mm.Size()).
+                               WithField("properties", msg.Properties).
+                               Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
+                       p.metrics.PublishErrorsMsgTooLarge.Inc()
+                       return
+               }
+               // set ChunkMaxMessageSize
+               if p.options.ChunkMaxMessageSize != 0 {
+                       payloadChunkSize = 
int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
+               }
+               totalChunks = int(math.Max(1, 
math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
+       }
+
+       // correct limit queue when chunked
+       for i := 0; i < totalChunks-1; i++ {
+               if !p.canAddToQueue(request) {
+                       return
+               }
+       }
+
+       // set total chunks to send request
+       request.totalChunks = totalChunks
+
+       if !sendAsBatch {
+               if msg.SequenceID != nil {
+                       mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+               } else {
+                       mm.SequenceId = 
proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+               }
+               if totalChunks > 1 {
+                       var lhs, rhs int
+                       uuid := fmt.Sprintf("%s-%s", p.producerName, 
strconv.FormatUint(*mm.SequenceId, 10))
+                       mm.Uuid = proto.String(uuid)
+                       mm.NumChunksFromMsg = proto.Int(totalChunks)
+                       mm.TotalChunkMsgSize = proto.Int(compressedSize)
+                       for chunkID := 0; chunkID < totalChunks; chunkID++ {
+                               lhs = chunkID * payloadChunkSize
+                               if rhs = lhs + payloadChunkSize; rhs > 
compressedSize {
+                                       rhs = compressedSize
+                               }
+                               // update chunk id
+                               mm.ChunkId = proto.Int(chunkID)
+                               nsr := &sendRequest{
+                                       ctx:         request.ctx,
+                                       msg:         request.msg,
+                                       callback:    request.callback,
+                                       publishTime: request.publishTime,
+                                       totalChunks: totalChunks,
+                                       chunkID:     chunkID,
+                                       uuid:        uuid,
+                               }
+                               p.internalSingleSend(mm, 
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
+                       }
+               } else {
+                       p.internalSingleSend(mm, compressedPayload, request, 
uint32(maxMessageSize))
+               }
+       } else {
+               smm := p.genSingleMetaMessage(msg, uncompressedSize)
+               multiSchemaEnabled := !p.options.DisableMultiSchema
+               added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled)
+               if !added {
+                       // The current batch is full.. flush it and retry
+
+                       p.internalFlushCurrentBatch()
+
+                       // after flushing try again to add the current payload
+                       if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+                               msg.ReplicationClusters, deliverAt, 
schemaVersion, multiSchemaEnabled); !ok {
+                               p.publishSemaphore.Release()
+                               request.callback(nil, request.msg, 
errFailAddToBatch)
+                               p.log.WithField("size", uncompressedSize).
+                                       WithField("properties", msg.Properties).
+                                       Error("unable to add message to batch")
+                               return
+                       }
+               }
+               if request.flushImmediately {
+
+                       p.internalFlushCurrentBatch()
+
+               }
+       }
+}
+
+func (p *partitionProducer) genMetadata(msg *ProducerMessage, uncompressedSize 
int) (mm *pb.MessageMetadata) {
+       mm = &pb.MessageMetadata{
+               ProducerName:     &p.producerName,
+               PublishTime:      
proto.Uint64(internal.TimestampMillis(time.Now())),
+               ReplicateTo:      msg.ReplicationClusters,
+               UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
+       }
+
+       if msg.Key != "" {
+               mm.PartitionKey = proto.String(msg.Key)
+       }
+
+       if msg.Properties != nil {
+               mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+       }
+
        deliverAt := msg.DeliverAt
        if msg.DeliverAfter.Nanoseconds() > 0 {
                deliverAt = time.Now().Add(msg.DeliverAfter)
        }
+       if deliverAt.UnixNano() > 0 {
+               mm.DeliverAtTime = 
proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+       }
 
-       sendAsBatch := !p.options.DisableBatching &&
-               msg.ReplicationClusters == nil &&
-               deliverAt.UnixNano() < 0
+       return
+}
 
-       smm := &pb.SingleMessageMetadata{
-               PayloadSize: proto.Int(len(payload)),
+func (p *partitionProducer) genSingleMetaMessage(msg *ProducerMessage,

Review Comment:
   Is this method only used for batch sending messages? Not the normal 
message(Neither chunking or batching)?



##########
pulsar/producer_partition.go:
##########
@@ -827,6 +998,10 @@ func (p *partitionProducer) Send(ctx context.Context, msg 
*ProducerMessage) (Mes
 
        // wait for send request to finish
        <-doneCh
+
+       // handle internal error
+       p.internalErrHandle(err)

Review Comment:
   Why do we need to introduce the internal error handler here? Can we handle 
this error inside the sending operation?



##########
pulsar/producer_partition.go:
##########
@@ -911,6 +1077,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                                
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
                                p.metrics.MessagesPublished.Inc()
                                p.metrics.MessagesPending.Dec()
+                               // todo: it's not correct. the size should be 
schemaed uncompressed payload size

Review Comment:
   Let's create an issue to track it and remove this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to