This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7f91b2b [Issue 1027][producer] fix: split sendRequest and make
reconnectToBroker and other operate in the same coroutine (#1029)
7f91b2b is described below
commit 7f91b2bcd798e1dc6ff27b10a557c8ee5440a83a
Author: grayson <[email protected]>
AuthorDate: Fri Jun 16 17:43:33 2023 +0800
[Issue 1027][producer] fix: split sendRequest and make reconnectToBroker
and other operate in the same coroutine (#1029)
---
pulsar/producer_partition.go | 38 +++++++++++++-------------------------
1 file changed, 13 insertions(+), 25 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c4a460e..6bd9081 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -84,8 +84,8 @@ type partitionProducer struct {
compressionProvider compression.Provider
// Channel where app is posting messages to be published
- eventsChan chan interface{}
- closeCh chan struct{}
+ dataChan chan *sendRequest
+ cmdChan chan interface{}
connectClosedCh chan connectionClosed
publishSemaphore internal.Semaphore
@@ -150,9 +150,9 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
log: logger,
options: options,
producerID: client.rpcClient.NewProducerID(),
- eventsChan: make(chan interface{}, maxPendingMessages),
+ dataChan: make(chan *sendRequest, maxPendingMessages),
+ cmdChan: make(chan interface{}, 10),
connectClosedCh: make(chan connectionClosed, 10),
- closeCh: make(chan struct{}),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider:
internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
@@ -438,31 +438,21 @@ func (p *partitionProducer) reconnectToBroker() {
}
func (p *partitionProducer) runEventsLoop() {
- go func() {
- for {
- select {
- case <-p.closeCh:
- p.log.Info("close producer, exit reconnect")
- return
- case <-p.connectClosedCh:
- p.log.Info("runEventsLoop will reconnect in
producer")
- p.reconnectToBroker()
- }
- }
- }()
-
for {
select {
- case i := <-p.eventsChan:
+ case data := <-p.dataChan:
+ p.internalSend(data)
+ case i := <-p.cmdChan:
switch v := i.(type) {
- case *sendRequest:
- p.internalSend(v)
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
+ case <-p.connectClosedCh:
+ p.log.Info("runEventsLoop will reconnect in producer")
+ p.reconnectToBroker()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
@@ -1165,7 +1155,7 @@ func (p *partitionProducer) internalSendAsync(ctx
context.Context, msg *Producer
}
p.options.Interceptors.BeforeSend(p, msg)
- p.eventsChan <- sr
+ p.dataChan <- sr
if !p.options.DisableBlockIfQueueFull {
// block if queue full
@@ -1304,8 +1294,6 @@ func (p *partitionProducer) internalClose(req
*closeProducer) {
p.setProducerState(producerClosed)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
-
- close(p.closeCh)
}
func (p *partitionProducer) LastSequenceID() int64 {
@@ -1317,7 +1305,7 @@ func (p *partitionProducer) Flush() error {
doneCh: make(chan struct{}),
err: nil,
}
- p.eventsChan <- flushReq
+ p.cmdChan <- flushReq
// wait for the flush request to complete
<-flushReq.doneCh
@@ -1345,7 +1333,7 @@ func (p *partitionProducer) Close() {
}
cp := &closeProducer{doneCh: make(chan struct{})}
- p.eventsChan <- cp
+ p.cmdChan <- cp
// wait for close producer request to complete
<-cp.doneCh