This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ad16fa2  Fix concurrent map read and map write (#179)
ad16fa2 is described below

commit ad16fa296bdd6aec3090f9f4b682ba3510face2e
Author: 冉小龙 <[email protected]>
AuthorDate: Sun Feb 2 16:22:50 2020 +0800

    Fix concurrent map read and map write (#179)
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    ### Modifications
    
    Fix concurrent map read and map write
---
 pulsar/internal/connection.go | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6007909..b31e70f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -509,6 +509,10 @@ func (c *connection) handleResponseError(serverError 
*pb.CommandError) {
 
 func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
        producerID := response.GetProducerId()
+
+       c.Lock()
+       defer c.Unlock()
+
        if producer, ok := c.listeners[producerID]; ok {
                producer.ReceivedSendReceipt(response)
        } else {
@@ -582,6 +586,10 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
 func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer) {
        c.log.Infof("Broker notification of Closed consumer: %d", 
closeConsumer.GetConsumerId())
        consumerID := closeConsumer.GetConsumerId()
+
+       c.Lock()
+       defer c.Unlock()
+
        if consumer, ok := c.consumerHandler(consumerID); ok {
                consumer.ConnectionClosed()
                delete(c.listeners, consumerID)
@@ -594,6 +602,8 @@ func (c *connection) handleCloseProducer(closeProducer 
*pb.CommandCloseProducer)
        c.log.Infof("Broker notification of Closed producer: %d", 
closeProducer.GetProducerId())
        producerID := closeProducer.GetProducerId()
 
+       c.Lock()
+       defer c.Unlock()
        if producer, ok := c.listeners[producerID]; ok {
                producer.ConnectionClosed()
                delete(c.listeners, producerID)

Reply via email to