This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f91b2b  [Issue 1027][producer] fix: split sendRequest and make 
reconnectToBroker and other operate in the same coroutine (#1029)
7f91b2b is described below

commit 7f91b2bcd798e1dc6ff27b10a557c8ee5440a83a
Author: grayson <[email protected]>
AuthorDate: Fri Jun 16 17:43:33 2023 +0800

    [Issue 1027][producer] fix: split sendRequest and make reconnectToBroker 
and other operate in the same coroutine (#1029)
---
 pulsar/producer_partition.go | 38 +++++++++++++-------------------------
 1 file changed, 13 insertions(+), 25 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c4a460e..6bd9081 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -84,8 +84,8 @@ type partitionProducer struct {
        compressionProvider      compression.Provider
 
        // Channel where app is posting messages to be published
-       eventsChan      chan interface{}
-       closeCh         chan struct{}
+       dataChan        chan *sendRequest
+       cmdChan         chan interface{}
        connectClosedCh chan connectionClosed
 
        publishSemaphore internal.Semaphore
@@ -150,9 +150,9 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                log:              logger,
                options:          options,
                producerID:       client.rpcClient.NewProducerID(),
-               eventsChan:       make(chan interface{}, maxPendingMessages),
+               dataChan:         make(chan *sendRequest, maxPendingMessages),
+               cmdChan:          make(chan interface{}, 10),
                connectClosedCh:  make(chan connectionClosed, 10),
-               closeCh:          make(chan struct{}),
                batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
                compressionProvider: 
internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
                        compression.Level(options.CompressionLevel)),
@@ -438,31 +438,21 @@ func (p *partitionProducer) reconnectToBroker() {
 }
 
 func (p *partitionProducer) runEventsLoop() {
-       go func() {
-               for {
-                       select {
-                       case <-p.closeCh:
-                               p.log.Info("close producer, exit reconnect")
-                               return
-                       case <-p.connectClosedCh:
-                               p.log.Info("runEventsLoop will reconnect in 
producer")
-                               p.reconnectToBroker()
-                       }
-               }
-       }()
-
        for {
                select {
-               case i := <-p.eventsChan:
+               case data := <-p.dataChan:
+                       p.internalSend(data)
+               case i := <-p.cmdChan:
                        switch v := i.(type) {
-                       case *sendRequest:
-                               p.internalSend(v)
                        case *flushRequest:
                                p.internalFlush(v)
                        case *closeProducer:
                                p.internalClose(v)
                                return
                        }
+               case <-p.connectClosedCh:
+                       p.log.Info("runEventsLoop will reconnect in producer")
+                       p.reconnectToBroker()
                case <-p.batchFlushTicker.C:
                        p.internalFlushCurrentBatch()
                }
@@ -1165,7 +1155,7 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
        }
        p.options.Interceptors.BeforeSend(p, msg)
 
-       p.eventsChan <- sr
+       p.dataChan <- sr
 
        if !p.options.DisableBlockIfQueueFull {
                // block if queue full
@@ -1304,8 +1294,6 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
        p.setProducerState(producerClosed)
        p._getConn().UnregisterListener(p.producerID)
        p.batchFlushTicker.Stop()
-
-       close(p.closeCh)
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {
@@ -1317,7 +1305,7 @@ func (p *partitionProducer) Flush() error {
                doneCh: make(chan struct{}),
                err:    nil,
        }
-       p.eventsChan <- flushReq
+       p.cmdChan <- flushReq
 
        // wait for the flush request to complete
        <-flushReq.doneCh
@@ -1345,7 +1333,7 @@ func (p *partitionProducer) Close() {
        }
 
        cp := &closeProducer{doneCh: make(chan struct{})}
-       p.eventsChan <- cp
+       p.cmdChan <- cp
 
        // wait for close producer request to complete
        <-cp.doneCh

Reply via email to