This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e3f625a Fix producer unable register when cnx closed (#761)
e3f625a is described below
commit e3f625ae8da938f5d147bdddd3a2cadced69c07b
Author: xiaolong ran <[email protected]>
AuthorDate: Wed Apr 20 21:33:07 2022 +0800
Fix producer unable register when cnx closed (#761)
* Fix producer unable register when cnx closed
Signed-off-by: xiaolongran <[email protected]>
* fix code style
Signed-off-by: xiaolongran <[email protected]>
---
pulsar/internal/connection.go | 10 ++++++----
pulsar/producer_partition.go | 7 +++++--
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6055252..11c9d49 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,7 +53,8 @@ type TLSOptions struct {
}
var (
- errConnectionClosed = errors.New("connection closed")
+ errConnectionClosed = errors.New("connection closed")
+ errUnableRegisterListener = errors.New("unable register listener when
con closed")
)
// ConnectionListener is a user of a connection (eg. a producer or
@@ -72,7 +73,7 @@ type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback
func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
- RegisterListener(id uint64, listener ConnectionListener)
+ RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
@@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer
*pb.CommandCloseProducer)
}
}
-func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener)
error {
// do not add if connection is closed
if c.closed() {
c.log.Warnf("Connection closed unable register listener
id=%+v", id)
- return
+ return errUnableRegisterListener
}
c.listenersLock.Lock()
defer c.listenersLock.Unlock()
c.listeners[id] = listener
+ return nil
}
func (c *connection) UnregisterListener(id uint64) {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d37d60e..bc775e9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error {
p.sequenceIDGenerator = &nextSequenceID
}
p._setConn(res.Cnx)
- p._getConn().RegisterListener(p.producerID, p)
+ err = p._getConn().RegisterListener(p.producerID, p)
+ if err != nil {
+ return err
+ }
p.log.WithFields(log.Fields{
"cnx": res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
- }).Debug("Connected producer")
+ }).Info("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)