This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 58ee244 [issue 516]Add lock for compressionProviders to fix data race
problem (#533)
58ee244 is described below
commit 58ee24465bd19b4cf32ea52c0c3c113f74444029
Author: yukun <[email protected]>
AuthorDate: Mon Jul 12 15:53:44 2021 +0800
[issue 516]Add lock for compressionProviders to fix data race problem (#533)
* Add lock for compressionProviders to fix data race problem
Signed-off-by: fishpenguin <[email protected]>
* Fix the code without releasing the lock
Signed-off-by: fishpenguin <[email protected]>
* Change Mutex to RWMutex
Signed-off-by: fishpenguin <[email protected]>
---
pulsar/consumer_partition.go | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index daaf759..cf92949 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -138,6 +138,7 @@ type partitionConsumer struct {
log log.Logger
+ providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
}
@@ -850,9 +851,11 @@ func (pc *partitionConsumer) internalClose(req
*closeRequest) {
pc.log.Info("Closed consumer")
}
+ pc.providersMutex.Lock()
for _, provider := range pc.compressionProviders {
provider.Close()
}
+ pc.providersMutex.Unlock()
pc.setConsumerState(consumerClosed)
pc._getConn().DeleteConsumeHandler(pc.consumerID)
@@ -1062,7 +1065,9 @@ func getPreviousMessage(mid trackingMessageID)
trackingMessageID {
}
func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload
internal.Buffer) (internal.Buffer, error) {
+ pc.providersMutex.RLock()
provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
+ pc.providersMutex.RUnlock()
if !ok {
var err error
if provider, err =
pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
@@ -1070,7 +1075,9 @@ func (pc *partitionConsumer) Decompress(msgMeta
*pb.MessageMetadata, payload int
return nil, err
}
+ pc.providersMutex.Lock()
pc.compressionProviders[msgMeta.GetCompression()] = provider
+ pc.providersMutex.Unlock()
}
uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(),
int(msgMeta.GetUncompressedSize()))