zengguan opened a new pull request, #1059:
URL: https://github.com/apache/pulsar-client-go/pull/1059
Fixes #1057
### Motivation
Data in `dataChan` may not be processed when `internalClose()` was called
### Modifications
1. use `internalSend(data)` to process the remaining data
2. wait for the flush request to complete after flushing data using
`internalFlush(flushReq)`
3. We have `Flush()` but we can not use it because it will be blocked in
`<-flushReq.doneCh` after runEventsLoop() runs `p.internalClose(v)`
### Steps to reproduce
we can add some code in `producer_partition` to reproduce it
Add time.Sleep
```
func (p *partitionProducer) internalSend(request *sendRequest) {
p.log.Debug("Received send request: ", *request.msg)
// reproduce code
time.Sleep(200*time.Millisecond)
msg := request.msg
......
```
Add info log
```
func (p *partitionProducer) internalClose(req *closeProducer) {
p.log.Info("Closing producer")
// reproduce code
p.log.Infof("Remaining data size before Closed producer in dataChan:
%d", len(p.dataChan))
......
```
And set p.options.DisableBlockIfQueueFull = true. After that, we can see
remaining data in `dataChan` will be send
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]