This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 0e9d5b1 [ISSUE #299] offset store read missing fallthrough for
_ReadMemoryThenStore
0e9d5b1 is described below
commit 0e9d5b16fe102d226d6945ced2f0e17045ec4bb1
Author: jalesys <[email protected]>
AuthorDate: Tue Nov 19 11:01:59 2019 +0800
[ISSUE #299] offset store read missing fallthrough for _ReadMemoryThenStore
---
consumer/offset_store.go | 5 ++++-
consumer/offset_store_test.go | 22 +++++++++++++++++-----
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index e86ec3b..1b9a552 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -161,6 +161,7 @@ func (local *localFileOffsetStore) read(mq
*primitive.MessageQueue, t readType)
if off >= 0 || (off == -1 && t == _ReadFromMemory) {
return off
}
+ fallthrough
case _ReadFromStore:
local.load()
return readFromMemory(local.OffsetTable, mq)
@@ -277,14 +278,16 @@ func (r *remoteBrokerOffsetStore) read(mq
*primitive.MessageQueue, t readType) i
r.mutex.RLock()
switch t {
case _ReadFromMemory, _ReadMemoryThenStore:
- defer r.mutex.RUnlock()
off, exist := r.OffsetTable[*mq]
if exist {
+ r.mutex.RUnlock()
return off
}
if t == _ReadFromMemory {
+ r.mutex.RUnlock()
return -1
}
+ fallthrough
case _ReadFromStore:
off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
if err != nil {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index 542c5a2..f8bb3f6 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -18,6 +18,7 @@ limitations under the License.
package consumer
import (
+ "path/filepath"
"testing"
"github.com/golang/mock/gomock"
@@ -41,21 +42,21 @@ func TestNewLocalFileOffsetStore(t *testing.T) {
group: "testGroup",
expectedResult: &localFileOffsetStore{
group: "testGroup",
- path: _LocalOffsetStorePath +
"/testGroup/offset.json",
+ path:
filepath.Join(_LocalOffsetStorePath, "/testGroup/offset.json"),
},
}, {
clientId: "192.168.24.1@default",
group: "",
expectedResult: &localFileOffsetStore{
group: "",
- path: _LocalOffsetStorePath +
"/192.168.24.1@default/offset.json",
+ path:
filepath.Join(_LocalOffsetStorePath, "/192.168.24.1@default/offset.json"),
},
}, {
clientId: "192.168.24.1@default",
group: "testGroup",
expectedResult: &localFileOffsetStore{
group: "testGroup",
- path: _LocalOffsetStorePath +
"/192.168.24.1@default/testGroup/offset.json",
+ path:
filepath.Join(_LocalOffsetStorePath,
"/192.168.24.1@default/testGroup/offset.json"),
},
},
}
@@ -134,6 +135,10 @@ func TestLocalFileOffsetStore(t *testing.T) {
localStore.persist(queues)
offset = localStore.read(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)
+
+ delete(localStore.(*localFileOffsetStore).OffsetTable,
MessageQueueKey(*mq))
+ offset = localStore.read(mq, _ReadMemoryThenStore)
+ So(offset, ShouldEqual, 1)
})
})
}
@@ -204,7 +209,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
Convey("test persist", func() {
queues := []*primitive.MessageQueue{mq}
-
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("192.168.24.1:10911")
+
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("192.168.24.1:10911").MaxTimes(2)
ret := &remote.RemotingCommand{
Code: internal.ResSuccess,
@@ -212,11 +217,18 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
"offset": "1",
},
}
- rmqClient.EXPECT().InvokeSync(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
+ rmqClient.EXPECT().InvokeSync(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2)
remoteStore.persist(queues)
offset := remoteStore.read(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)
+
+ remoteStore.remove(mq)
+ offset = remoteStore.read(mq, _ReadFromMemory)
+ So(offset, ShouldEqual, -1)
+ offset = remoteStore.read(mq, _ReadMemoryThenStore)
+ So(offset, ShouldEqual, 1)
+
})
Convey("test remove", func() {