This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 58ee244  [issue 516]Add lock for compressionProviders to fix data race 
problem (#533)
58ee244 is described below

commit 58ee24465bd19b4cf32ea52c0c3c113f74444029
Author: yukun <[email protected]>
AuthorDate: Mon Jul 12 15:53:44 2021 +0800

    [issue 516]Add lock for compressionProviders to fix data race problem (#533)
    
    * Add lock for compressionProviders to fix data race problem
    
    Signed-off-by: fishpenguin <[email protected]>
    
    * Fix the code without releasing the lock
    
    Signed-off-by: fishpenguin <[email protected]>
    
    * Change Mutex to RWMutex
    
    Signed-off-by: fishpenguin <[email protected]>
---
 pulsar/consumer_partition.go | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index daaf759..cf92949 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -138,6 +138,7 @@ type partitionConsumer struct {
 
        log log.Logger
 
+       providersMutex       sync.RWMutex
        compressionProviders map[pb.CompressionType]compression.Provider
        metrics              *internal.TopicMetrics
 }
@@ -850,9 +851,11 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
                pc.log.Info("Closed consumer")
        }
 
+       pc.providersMutex.Lock()
        for _, provider := range pc.compressionProviders {
                provider.Close()
        }
+       pc.providersMutex.Unlock()
 
        pc.setConsumerState(consumerClosed)
        pc._getConn().DeleteConsumeHandler(pc.consumerID)
@@ -1062,7 +1065,9 @@ func getPreviousMessage(mid trackingMessageID) 
trackingMessageID {
 }
 
 func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload 
internal.Buffer) (internal.Buffer, error) {
+       pc.providersMutex.RLock()
        provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
+       pc.providersMutex.RUnlock()
        if !ok {
                var err error
                if provider, err = 
pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
@@ -1070,7 +1075,9 @@ func (pc *partitionConsumer) Decompress(msgMeta 
*pb.MessageMetadata, payload int
                        return nil, err
                }
 
+               pc.providersMutex.Lock()
                pc.compressionProviders[msgMeta.GetCompression()] = provider
+               pc.providersMutex.Unlock()
        }
 
        uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), 
int(msgMeta.GetUncompressedSize()))

Reply via email to