This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9867c29  [fix] [issue 1057]:  Fix the producer flush opertion is not 
guarantee to flush all messages (#1058)
9867c29 is described below

commit 9867c29ca329302e97ddd9c6a99f66853c7f447f
Author: Jiaqi Shen <[email protected]>
AuthorDate: Thu Jul 20 19:35:05 2023 +0800

    [fix] [issue 1057]:  Fix the producer flush opertion is not guarantee to 
flush all messages (#1058)
    
    Fixes #1057
    
    ### Motivation
    
    `dataChan` is introduced by #1029  to fix the problem of reconnectToBroker. 
But it missed that if a flush operation excuted, there may still be some 
messages in `dataChan`. And these messages can't be flushed.
    
    ### Modifications
    
    - Fix the producer flush opertion is not guarantee to flush all messages
---
 pulsar/producer_partition.go | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 7d514d5..11d5f65 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1051,6 +1051,15 @@ func (p *partitionProducer) 
internalFlushCurrentBatches() {
 }
 
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
+       // clear all the messages which have sent to dataChan before flush
+       if len(p.dataChan) != 0 {
+               oldDataChan := p.dataChan
+               p.dataChan = make(chan *sendRequest, 
p.options.MaxPendingMessages)
+               for len(oldDataChan) != 0 {
+                       pendingData := <-oldDataChan
+                       p.internalSend(pendingData)
+               }
+       }
 
        p.internalFlushCurrentBatch()
 

Reply via email to