This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 0e9d5b1  [ISSUE #299] offset store read missing fallthrough for 
_ReadMemoryThenStore
0e9d5b1 is described below

commit 0e9d5b16fe102d226d6945ced2f0e17045ec4bb1
Author: jalesys <[email protected]>
AuthorDate: Tue Nov 19 11:01:59 2019 +0800

    [ISSUE #299] offset store read missing fallthrough for _ReadMemoryThenStore
---
 consumer/offset_store.go      |  5 ++++-
 consumer/offset_store_test.go | 22 +++++++++++++++++-----
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index e86ec3b..1b9a552 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -161,6 +161,7 @@ func (local *localFileOffsetStore) read(mq 
*primitive.MessageQueue, t readType)
                if off >= 0 || (off == -1 && t == _ReadFromMemory) {
                        return off
                }
+               fallthrough
        case _ReadFromStore:
                local.load()
                return readFromMemory(local.OffsetTable, mq)
@@ -277,14 +278,16 @@ func (r *remoteBrokerOffsetStore) read(mq 
*primitive.MessageQueue, t readType) i
        r.mutex.RLock()
        switch t {
        case _ReadFromMemory, _ReadMemoryThenStore:
-               defer r.mutex.RUnlock()
                off, exist := r.OffsetTable[*mq]
                if exist {
+                       r.mutex.RUnlock()
                        return off
                }
                if t == _ReadFromMemory {
+                       r.mutex.RUnlock()
                        return -1
                }
+               fallthrough
        case _ReadFromStore:
                off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
                if err != nil {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index 542c5a2..f8bb3f6 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -18,6 +18,7 @@ limitations under the License.
 package consumer
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/golang/mock/gomock"
@@ -41,21 +42,21 @@ func TestNewLocalFileOffsetStore(t *testing.T) {
                                group:    "testGroup",
                                expectedResult: &localFileOffsetStore{
                                        group: "testGroup",
-                                       path:  _LocalOffsetStorePath + 
"/testGroup/offset.json",
+                                       path:  
filepath.Join(_LocalOffsetStorePath, "/testGroup/offset.json"),
                                },
                        }, {
                                clientId: "192.168.24.1@default",
                                group:    "",
                                expectedResult: &localFileOffsetStore{
                                        group: "",
-                                       path:  _LocalOffsetStorePath + 
"/192.168.24.1@default/offset.json",
+                                       path:  
filepath.Join(_LocalOffsetStorePath, "/192.168.24.1@default/offset.json"),
                                },
                        }, {
                                clientId: "192.168.24.1@default",
                                group:    "testGroup",
                                expectedResult: &localFileOffsetStore{
                                        group: "testGroup",
-                                       path:  _LocalOffsetStorePath + 
"/192.168.24.1@default/testGroup/offset.json",
+                                       path:  
filepath.Join(_LocalOffsetStorePath, 
"/192.168.24.1@default/testGroup/offset.json"),
                                },
                        },
                }
@@ -134,6 +135,10 @@ func TestLocalFileOffsetStore(t *testing.T) {
                        localStore.persist(queues)
                        offset = localStore.read(mq, _ReadFromStore)
                        So(offset, ShouldEqual, 1)
+
+                       delete(localStore.(*localFileOffsetStore).OffsetTable, 
MessageQueueKey(*mq))
+                       offset = localStore.read(mq, _ReadMemoryThenStore)
+                       So(offset, ShouldEqual, 1)
                })
        })
 }
@@ -204,7 +209,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
                Convey("test persist", func() {
                        queues := []*primitive.MessageQueue{mq}
 
-                       
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("192.168.24.1:10911")
+                       
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("192.168.24.1:10911").MaxTimes(2)
 
                        ret := &remote.RemotingCommand{
                                Code: internal.ResSuccess,
@@ -212,11 +217,18 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
                                        "offset": "1",
                                },
                        }
-                       rmqClient.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
+                       rmqClient.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2)
 
                        remoteStore.persist(queues)
                        offset := remoteStore.read(mq, _ReadFromStore)
                        So(offset, ShouldEqual, 1)
+
+                       remoteStore.remove(mq)
+                       offset = remoteStore.read(mq, _ReadFromMemory)
+                       So(offset, ShouldEqual, -1)
+                       offset = remoteStore.read(mq, _ReadMemoryThenStore)
+                       So(offset, ShouldEqual, 1)
+
                })
 
                Convey("test remove", func() {

Reply via email to