zengguan opened a new pull request, #1059:
URL: https://github.com/apache/pulsar-client-go/pull/1059

   Fixes #1057
   
   ### Motivation
   
   Data in `dataChan`  may not be processed when `internalClose()` was called
   
   ### Modifications
   
   1. use `internalSend(data)` to process the remaining data
   2. wait for the flush request to complete after flushing data using 
`internalFlush(flushReq)`
   3. We have `Flush()` but we can not use it because it will be blocked in 
`<-flushReq.doneCh` after runEventsLoop() runs `p.internalClose(v)`
   
   ### Steps to reproduce
   we can add some code in `producer_partition` to reproduce it
   
   Add time.Sleep
   ```
   func (p *partitionProducer) internalSend(request *sendRequest) {
   
        p.log.Debug("Received send request: ", *request.msg)
   
        // reproduce code
        time.Sleep(200*time.Millisecond)
   
        msg := request.msg
   
        ......
   
   ```
   Add info log
   
   ```
   func (p *partitionProducer) internalClose(req *closeProducer) {
   
        p.log.Info("Closing producer")
   
        // reproduce code
        p.log.Infof("Remaining data size before Closed producer in dataChan: 
%d", len(p.dataChan))
   
       ......
   ```
   
   And set p.options.DisableBlockIfQueueFull = true. After that, we can see 
remaining data in `dataChan` will be send 
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to