zzzming commented on code in PR #717:
URL: https://github.com/apache/pulsar-client-go/pull/717#discussion_r905393855
##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest,
payload []byte) {
+ chunkSize := int(p._getConn().GetMaxMessageSize())
+ totalChunks := (len(payload)+1)/chunkSize + 1
+ uuid := uuidGen.New().String()
+
+ for chunkId := 0; chunkId < chunkSize; chunkId++ {
+ left := chunkId * chunkSize
+ right := left + chunkSize
+ if right > len(payload)-1 {
+ right = len(payload) - 1
+ }
+ // [left, right)
+ p.internalSendSingleChunk(request, payload[left:right], uuid,
totalChunks, len(payload), chunkId)
+ }
+}
+
+func (p *partitionProducer) internalSendSingleChunk(request *sendRequest,
payload []byte,
+ uuid string, totalChunks int, totalSize int, chunkId int) {
+
+ msg := request.msg
+ mm := &pb.MessageMetadata{}
+
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ mm.DeliverAtTime =
proto.Int64(int64(internal.TimestampMillis(deliverAt)))
+ }
+
+ if msg.EventTime.UnixNano() != 0 {
+ mm.EventTime =
proto.Uint64(internal.TimestampMillis(msg.EventTime))
+ }
+
+ if msg.Key != "" {
+ mm.PartitionKey = proto.String(msg.Key)
+ }
+
+ if len(msg.OrderingKey) != 0 {
+ mm.OrderingKey = []byte(msg.OrderingKey)
+ }
+
+ if msg.Properties != nil {
+ mm.Properties = internal.ConvertFromStringMap(msg.Properties)
+ }
+
+ if msg.SequenceID != nil {
+ sequenceID := uint64(*msg.SequenceID)
+ mm.SequenceId = proto.Uint64(sequenceID)
+ }
+
+ // Fields required for chunked data
+ mm.Uuid = proto.String(uuid)
+ mm.NumChunksFromMsg = proto.Int(totalChunks)
+ mm.TotalChunkMsgSize = proto.Int(totalSize)
+ mm.ChunkId = proto.Int(chunkId)
+
+ // Directly construct a buffer and put it to the pending queue
+ newBuffer := p.GetBuffer()
+ internal.ConstructBufferFromMessage(newBuffer, mm, payload)
+
+ callbacks := make([]interface{}, 1)
+ callbacks[0] = request.callback
+
+ p.pendingQueue.Put(&pendingItem{
+ sentAt: time.Now(),
+ batchData: newBuffer,
+ sequenceID: uint64(*msg.SequenceID),
+ sendRequests: callbacks,
+ })
+ p._getConn().WriteData(newBuffer)
Review Comment:
When a batching is enabled, how does this flush work with batching?
One scenario is there is message already in a batch yet to be flushed upon
batch requirements are fulfilled. There is a large message added requires
chunking. Is this logic going to flush the chunk message ahead of previously
batched message?
So do you need to flush the batch first before call individual chunking
flushing? Probably add a logic like this before flush a chunk?
```
batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData != nil {
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
batchData: batchData,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(batchData)}
```
##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest,
payload []byte) {
+ chunkSize := int(p._getConn().GetMaxMessageSize())
+ totalChunks := (len(payload)+1)/chunkSize + 1
Review Comment:
The last `+1` may not be accurate. Java implementation has a more precise
way to calculate the number of chunks
```
int totalChunks = canAddToBatch(msg) ? 1
: Math.max(1, compressedPayload.readableBytes()) /
ClientCnx.getMaxMessageSize()
+ (Math.max(1, compressedPayload.readableBytes()) %
ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
```
##########
pulsar/producer_partition.go:
##########
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
+func (p *partitionProducer) internalSendWithTrunks(request *sendRequest,
payload []byte) {
+ chunkSize := int(p._getConn().GetMaxMessageSize())
+ totalChunks := (len(payload)+1)/chunkSize + 1
+ uuid := uuidGen.New().String()
+
+ for chunkId := 0; chunkId < chunkSize; chunkId++ {
+ left := chunkId * chunkSize
+ right := left + chunkSize
+ if right > len(payload)-1 {
+ right = len(payload) - 1
+ }
+ // [left, right)
Review Comment:
Can we make this as a separate function to return a slice of [ {trunk 0
left,right}, {trunk 1 left,right} ... ]. So that we can write a unit test to
verify any number is missing from the splitting. WDYT?
##########
pulsar/internal/batch_builder.go:
##########
@@ -207,6 +215,57 @@ func (bc *batchContainer) Add(
return true
}
+func (bc *batchContainer) AddMessageMetaData(
Review Comment:
I could not find any code calls this function. Who's supposed to call this
function?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]