zzzming commented on code in PR #717:
URL: https://github.com/apache/pulsar-client-go/pull/717#discussion_r905393855


##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        }
 }
 
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, 
payload []byte) {
+       chunkSize := int(p._getConn().GetMaxMessageSize())
+       totalChunks := (len(payload)+1)/chunkSize + 1
+       uuid := uuidGen.New().String()
+
+       for chunkId := 0; chunkId < chunkSize; chunkId++ {
+               left := chunkId * chunkSize
+               right := left + chunkSize
+               if right > len(payload)-1 {
+                       right = len(payload) - 1
+               }
+               // [left, right)
+               p.internalSendSingleChunk(request, payload[left:right], uuid, 
totalChunks, len(payload), chunkId)
+       }
+}
+
+func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, 
payload []byte,
+       uuid string, totalChunks int, totalSize int, chunkId int) {
+
+       msg := request.msg
+       mm := &pb.MessageMetadata{}
+
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+               mm.DeliverAtTime = 
proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+       }
+
+       if msg.EventTime.UnixNano() != 0 {
+               mm.EventTime = 
proto.Uint64(internal.TimestampMillis(msg.EventTime))
+       }
+
+       if msg.Key != "" {
+               mm.PartitionKey = proto.String(msg.Key)
+       }
+
+       if len(msg.OrderingKey) != 0 {
+               mm.OrderingKey = []byte(msg.OrderingKey)
+       }
+
+       if msg.Properties != nil {
+               mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+       }
+
+       if msg.SequenceID != nil {
+               sequenceID := uint64(*msg.SequenceID)
+               mm.SequenceId = proto.Uint64(sequenceID)
+       }
+
+       // Fields required for chunked data
+       mm.Uuid = proto.String(uuid)
+       mm.NumChunksFromMsg = proto.Int(totalChunks)
+       mm.TotalChunkMsgSize = proto.Int(totalSize)
+       mm.ChunkId = proto.Int(chunkId)
+
+       // Directly construct a buffer and put it to the pending queue
+       newBuffer := p.GetBuffer()
+       internal.ConstructBufferFromMessage(newBuffer, mm, payload)
+
+       callbacks := make([]interface{}, 1)
+       callbacks[0] = request.callback
+
+       p.pendingQueue.Put(&pendingItem{
+               sentAt:       time.Now(),
+               batchData:    newBuffer,
+               sequenceID:   uint64(*msg.SequenceID),
+               sendRequests: callbacks,
+       })
+       p._getConn().WriteData(newBuffer)

Review Comment:
   When a batching is enabled, how does this flush work with batching? 
   One scenario is there is message already in a batch yet to be flushed upon 
batch requirements are fulfilled. There is a large message added requires 
chunking. Is this logic going to flush the chunk message ahead of previously 
batched message? 
   So do you need to flush the batch first before call individual chunking 
flushing? Probably add a logic like this before flush a chunk?
   ```
   batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
   if batchData != nil {
   p.pendingQueue.Put(&pendingItem{
                sentAt:       time.Now(),
                batchData:    batchData,
                sequenceID:   sequenceID,
                sendRequests: callbacks,
        })
        p._getConn().WriteData(batchData)}
   ```



##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        }
 }
 
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, 
payload []byte) {
+       chunkSize := int(p._getConn().GetMaxMessageSize())
+       totalChunks := (len(payload)+1)/chunkSize + 1

Review Comment:
   The last `+1` may not be accurate. Java implementation has a more precise 
way to calculate the number of chunks 
   ```
    int totalChunks = canAddToBatch(msg) ? 1
                   : Math.max(1, compressedPayload.readableBytes()) / 
ClientCnx.getMaxMessageSize()
                           + (Math.max(1, compressedPayload.readableBytes()) % 
ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
   ```



##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        }
 }
 
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, 
payload []byte) {
+       chunkSize := int(p._getConn().GetMaxMessageSize())
+       totalChunks := (len(payload)+1)/chunkSize + 1
+       uuid := uuidGen.New().String()
+
+       for chunkId := 0; chunkId < chunkSize; chunkId++ {
+               left := chunkId * chunkSize
+               right := left + chunkSize
+               if right > len(payload)-1 {
+                       right = len(payload) - 1
+               }
+               // [left, right)

Review Comment:
   Can we make this as a separate function to return a slice of [ {trunk 0 
left,right}, {trunk 1 left,right} ... ]. So that we can write a unit test to 
verify any number is missing from the splitting. WDYT?



##########
pulsar/internal/batch_builder.go:
##########
@@ -207,6 +215,57 @@ func (bc *batchContainer) Add(
        return true
 }
 
+func (bc *batchContainer) AddMessageMetaData(

Review Comment:
   I could not find any code calls this function. Who's supposed to call this 
function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to