This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 5c5af6ada9e7352cb6fd5bef09cc23d8d880cf39 Author: Jiaqi Shen <[email protected]> AuthorDate: Thu Jul 20 19:35:05 2023 +0800 [fix] [issue 1057]: Fix the producer flush opertion is not guarantee to flush all messages (#1058) Fixes #1057 ### Motivation `dataChan` is introduced by #1029 to fix the problem of reconnectToBroker. But it missed that if a flush operation excuted, there may still be some messages in `dataChan`. And these messages can't be flushed. ### Modifications - Fix the producer flush opertion is not guarantee to flush all messages (cherry picked from commit 9867c29ca329302e97ddd9c6a99f66853c7f447f) --- pulsar/producer_partition.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e8ad1953..00d4e6f0 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1048,6 +1048,15 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { + // clear all the messages which have sent to dataChan before flush + if len(p.dataChan) != 0 { + oldDataChan := p.dataChan + p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages) + for len(oldDataChan) != 0 { + pendingData := <-oldDataChan + p.internalSend(pendingData) + } + } p.internalFlushCurrentBatch()
