AnonHxy commented on issue #16680:
URL: https://github.com/apache/pulsar/issues/16680#issuecomment-1202172999

   > I expected it at this stage to create a batch for region=us and add M2 to 
it.
   But I see written in your diagram that you "batched msgs in msgsContainer" 
(which container? for which properties? Which msgs are you referring to?) and 
you "and send them" (why? what is the trigger to send them? Size/time?) and 
then you clear and add M2 to the batch.
   
   Let me try to explain the current batch process first. @asafm 
   
   `msgsContainer` in the above diagram refers to  the 
`ProducerImpl#batchMessageContainer`,  which has a list named 
`BatchMessageContainerImpl#messages`.  And the `msgList` in the diagram refers 
to the `BatchMessageContainerImpl#messages`. 
   
    Also the `msgsContainer` has a `BatchMessageContainerImpl#messageMetadata`, 
which will save the first messages metadata in `msgList`. Note that the 
properties field of `BatchMessageContainerImpl#messageMetada` is always empty 
currently.
   
   A `ProducerImpl` instance has only one `msgsContainer`.  When a producer 
publishs messages with batch enable,  the messages will first be put into the 
`msgList`.  When the `msgsContainer` doesn't has enougth space(size or number 
of messages threshold) or flush task scheduled (time threshold), the  the 
`msgsContainer` will be triggered to batch the messages in `msgList` and send 
them. And then the `msgList` will be cleared to prepare the next batch.
   
   The "size or number of messages threshold" check action happens at 
`ProducerImpl#canAddToCurrentBatch` method, before put message into the 
`msgList`.  If this check result is false, the `msgsContainer` batch action 
will be triggered. See line636 and line668
   
https://github.com/apache/pulsar/blob/eddbdd8c5b2bde868758b7cbb3c4e7c11e87e097/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L636-L669
   
   All above is current batch sending process. 
   
   The key of keeping order is that,  there will be only one batch exist at the 
same time,  and  the batch will be send to channel in a single thread pool.  
The single thread pool is the `SingleThreadEventExecutor` attach to the 
eventloop: line2154
   
https://github.com/apache/pulsar/blob/eddbdd8c5b2bde868758b7cbb3c4e7c11e87e097/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2145-L2155
   
   What I will do in this proposal just is:
   -  Extract the first message properties in the batch and fill into the 
`BatchMessageContainerImpl#messageMetada`
   -  Check if the sending message has same properties with the properties in 
`BatchMessageContainerImpl#messageMetada` additionally in 
`ProducerImpl#canAddToCurrentBatch` method.
   
   So regarding the case you gave above, the process will be:
   
   - Sending M1(region=eu): 
         -  `ProducerImpl#canAddToCurrentBatch` return true
         -  Add M1 to `msgList`, and fill `(region=eu):` to 
`BatchMessageContainerImpl#messageMetada`.
   - Sending M2(region=us) 
        - `ProducerImpl#canAddToCurrentBatch` return false. Because it has 
different properties with `BatchMessageContainerImpl#messageMetada`. 
        - This will trigger `doBatchSendAndAdd`
        -  The `msgContainer` will batch M1(`msgList` only has M1)  and send 
it, then clear the `msgList` to prepare next batch.
        -  Add M2(region=us)  to `msgList`, and fill `(region=us):` to 
`BatchMessageContainerImpl#messageMetada`.
   - Sending .....
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to