This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 08cd71e  [ISSUE #139]add locker for updating data (#146)
08cd71e is described below

commit 08cd71ef98c146e87ce44c9c7af0bee136d18047
Author: wolftankk <[email protected]>
AuthorDate: Tue Aug 13 11:10:40 2019 +0800

    [ISSUE #139]add locker for updating data (#146)
---
 consumer/offset_store.go | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 8bc5b5a..1eb246c 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -122,6 +122,8 @@ func (local *localFileOffsetStore) read(mq 
*primitive.MessageQueue, t readType)
 }
 
 func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset 
int64, increaseOnly bool) {
+       local.mutex.Lock()
+       defer local.mutex.Unlock()
        rlog.Debugf("update offset: %s to %d", mq, offset)
        localOffset, exist := local.OffsetTable[mq.Topic]
        if !exist {
@@ -149,6 +151,8 @@ func (local *localFileOffsetStore) persist(mqs 
[]*primitive.MessageQueue) {
        if len(mqs) == 0 {
                return
        }
+       local.mutex.Lock()
+       defer local.mutex.Unlock()
        table := make(map[string]map[int]*queueOffset)
        for idx := range mqs {
                mq := mqs[idx]

Reply via email to