This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cd210a1  fix: batch flush method (#476)
cd210a1 is described below

commit cd210a1a88575eb7fd66c1bf03c04877af5f72e9
Author: jony montana <[email protected]>
AuthorDate: Fri Mar 5 17:36:58 2021 +0800

    fix: batch flush method (#476)
    
    Signed-off-by: jonyhy96 <[email protected]>
    
    Fixes #475
    
    ### Motivation
    
    Fix wrong batch flush method bug.
    
    ### Modifications
    
    Check if batchBuilder is multi batches before flush.
---
 pulsar/producer_partition.go | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 41a1b9b..02ee7cf 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -397,7 +397,11 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                msg.ReplicationClusters, deliverAt)
        if !added {
                // The current batch is full.. flush it and retry
-               p.internalFlushCurrentBatch()
+               if p.batchBuilder.IsMultiBatches() {
+                       p.internalFlushCurrentBatches()
+               } else {
+                       p.internalFlushCurrentBatch()
+               }
 
                // after flushing try again to add the current payload
                if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
payload, request,

Reply via email to