dferstay commented on a change in pull request #694:
URL: https://github.com/apache/pulsar-client-go/pull/694#discussion_r776845550



##########
File path: pulsar/default_router.go
##########
@@ -80,32 +79,27 @@ func NewDefaultRouter(
                // spread the data on different partitions but not necessarily 
in a specific sequence.
                var now int64
                size := uint32(len(message.Payload))
-               previousMessageCount := atomic.LoadUint32(&state.msgCounter)
-               previousBatchingMaxSize := 
atomic.LoadUint32(&state.cumulativeBatchSize)
-               previousLastChange := 
atomic.LoadInt64(&state.lastChangeTimestamp)
+               messageCount := atomic.AddUint32(&state.msgCounter, 1)
+               batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)
 
-               messageCountReached := previousMessageCount >= 
uint32(maxBatchingMessages-1)
-               sizeReached := (size >= 
uint32(maxBatchingSize)-previousBatchingMaxSize)
+               messageCountReached := messageCount%uint32(maxBatchingMessages) 
== 0

Review comment:
       Upon further reflection, I think the use of a running total may have 
introduced a regression whereby we switch partitions prematurely.  The 
behaviour we want is documented in the comment on L74:
   ```
   // ...sticking with a given partition for a certain amount of messages or 
volume buffered or the max delay to
   // batch is reached so that we ensure having a decent amount of batching of 
the messages.
   
   If the partition switch is triggered by either size or delay, the messages 
routed to the previous partition will be included in the count of messages 
being routed to the new partition as the message count is not reset on switch.
   
   I'm thinking that we can safely reset the message count on partition switch. 
 The use of atomic increment on the message count will still effectively latch 
the `if` expression that protects the clock read on L88.
   
   I'll make this change, gather bench results, and report back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to