This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9867c29 [fix] [issue 1057]: Fix the producer flush opertion is not
guarantee to flush all messages (#1058)
9867c29 is described below
commit 9867c29ca329302e97ddd9c6a99f66853c7f447f
Author: Jiaqi Shen <[email protected]>
AuthorDate: Thu Jul 20 19:35:05 2023 +0800
[fix] [issue 1057]: Fix the producer flush opertion is not guarantee to
flush all messages (#1058)
Fixes #1057
### Motivation
`dataChan` is introduced by #1029 to fix the problem of reconnectToBroker.
But it missed that if a flush operation excuted, there may still be some
messages in `dataChan`. And these messages can't be flushed.
### Modifications
- Fix the producer flush opertion is not guarantee to flush all messages
---
pulsar/producer_partition.go | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 7d514d5..11d5f65 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1051,6 +1051,15 @@ func (p *partitionProducer)
internalFlushCurrentBatches() {
}
func (p *partitionProducer) internalFlush(fr *flushRequest) {
+ // clear all the messages which have sent to dataChan before flush
+ if len(p.dataChan) != 0 {
+ oldDataChan := p.dataChan
+ p.dataChan = make(chan *sendRequest,
p.options.MaxPendingMessages)
+ for len(oldDataChan) != 0 {
+ pendingData := <-oldDataChan
+ p.internalSend(pendingData)
+ }
+ }
p.internalFlushCurrentBatch()