This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ad16fa2 Fix concurrent map read and map write (#179)
ad16fa2 is described below
commit ad16fa296bdd6aec3090f9f4b682ba3510face2e
Author: 冉小龙 <[email protected]>
AuthorDate: Sun Feb 2 16:22:50 2020 +0800
Fix concurrent map read and map write (#179)
Signed-off-by: xiaolong.ran <[email protected]>
### Modifications
Fix concurrent map read and map write
---
pulsar/internal/connection.go | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6007909..b31e70f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -509,6 +509,10 @@ func (c *connection) handleResponseError(serverError
*pb.CommandError) {
func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
producerID := response.GetProducerId()
+
+ c.Lock()
+ defer c.Unlock()
+
if producer, ok := c.listeners[producerID]; ok {
producer.ReceivedSendReceipt(response)
} else {
@@ -582,6 +586,10 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
func (c *connection) handleCloseConsumer(closeConsumer
*pb.CommandCloseConsumer) {
c.log.Infof("Broker notification of Closed consumer: %d",
closeConsumer.GetConsumerId())
consumerID := closeConsumer.GetConsumerId()
+
+ c.Lock()
+ defer c.Unlock()
+
if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
delete(c.listeners, consumerID)
@@ -594,6 +602,8 @@ func (c *connection) handleCloseProducer(closeProducer
*pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d",
closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()
+ c.Lock()
+ defer c.Unlock()
if producer, ok := c.listeners[producerID]; ok {
producer.ConnectionClosed()
delete(c.listeners, producerID)