This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 83f60c1 [ISSUE #759]
83f60c1 is described below
commit 83f60c154236bb92a5d5e3d40276b546b6079f1b
Author: WJL3333 <[email protected]>
AuthorDate: Mon Jan 3 20:50:55 2022 +0800
[ISSUE #759]
ISSUE #759] Change ResetOffsetBody response parse method to support
fastjson schema
---
internal/model.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++---
internal/model_test.go | 50 +++++++++++++++++++++++++++++++-
2 files changed, 122 insertions(+), 5 deletions(-)
diff --git a/internal/model.go b/internal/model.go
index d7f2057..7a011d7 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -295,12 +295,39 @@ type ResetOffsetBody struct {
OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
}
+// Decode note: the origin implementation for parse json is in gson format.
+// this func should support both gson and fastjson schema.
func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
+ validJSON := gjson.ValidBytes(body)
+
+ var offsetTable map[primitive.MessageQueue]int64
+
+ if validJSON {
+ offsetTable = parseGsonFormat(body)
+ } else {
+ offsetTable = parseFastJsonFormat(body)
+ }
+
+ resetOffsetBody.OffsetTable = offsetTable
+}
+
+func parseGsonFormat(body []byte) map[primitive.MessageQueue]int64 {
result := gjson.ParseBytes(body)
+
rlog.Debug("offset table string "+result.Get("offsetTable").String(),
nil)
offsetTable := make(map[primitive.MessageQueue]int64, 0)
- offsetTableArray := strings.Split(result.Get("offsetTable").String(),
"],[")
+
+ offsetStr := result.Get("offsetTable").String()
+ if len(offsetStr) <= 2 {
+ rlog.Warning("parse reset offset table json get nothing in
body", map[string]interface{}{
+ "origin json": offsetStr,
+ })
+ return offsetTable
+ }
+
+ offsetTableArray := strings.Split(offsetStr, "],[")
+
for index, v := range offsetTableArray {
kvArray := strings.Split(v, "},")
@@ -315,7 +342,7 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte)
{
rlog.Error("Unmarshal offset error",
map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
- return
+ return nil
}
if index == 0 {
@@ -329,9 +356,51 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body
[]byte) {
rlog.Error("Unmarshal message queue error",
map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
- return
+ return nil
}
offsetTable[*kObj] = offset
}
- resetOffsetBody.OffsetTable = offsetTable
+
+ return offsetTable
+}
+
+func parseFastJsonFormat(body []byte) map[primitive.MessageQueue]int64 {
+ offsetTable := make(map[primitive.MessageQueue]int64)
+
+ jsonStr := string(body)
+ offsetStr := gjson.Get(jsonStr, "offsetTable").String()
+
+ if len(offsetStr) <= 2 {
+ rlog.Warning("parse reset offset table json get nothing in
body", map[string]interface{}{
+ "origin json": jsonStr,
+ })
+ return offsetTable
+ }
+
+ trimStr := offsetStr[2 : len(offsetStr)-1]
+
+ split := strings.Split(trimStr, ",{")
+
+ for _, v := range split {
+ tuple := strings.Split(v, "}:")
+
+ queueStr := "{" + tuple[0] + "}"
+
+ var err error
+ // ignore err for now
+ offset, err := strconv.Atoi(tuple[1])
+
+ var queue primitive.MessageQueue
+ err = json.Unmarshal([]byte(queueStr), &queue)
+
+ if err != nil {
+ rlog.Error("parse reset offset table json get nothing
in body", map[string]interface{}{
+ "origin json": jsonStr,
+ })
+ }
+
+ offsetTable[queue] = int64(offset)
+ }
+
+ return offsetTable
}
diff --git a/internal/model_test.go b/internal/model_test.go
index a505f3e..a653fa4 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -419,7 +419,7 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t
*testing.T) {
}
func TestRestOffsetBody_MarshalJSON(t *testing.T) {
- Convey("test ResetOffset Body Decode", t, func() {
+ Convey("test ResetOffset Body Decode gson json schema", t, func() {
body :=
"{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"q
[...]
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
@@ -433,4 +433,52 @@ func TestRestOffsetBody_MarshalJSON(t *testing.T) {
}
So(offsetTable[messageQueue], ShouldEqual, 23354233)
})
+
+ Convey("test ResetOffset Body Decode fast json schema", t, func() {
+ body :=
"{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110,{\"brokerName\":\"RaftNode00\",\"queueId\":1,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":2,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":3,\"topic\":\"topicB\"}:0}}"
+ resetOffsetBody := new(ResetOffsetBody)
+ resetOffsetBody.Decode([]byte(body))
+ offsetTable := resetOffsetBody.OffsetTable
+ So(offsetTable, ShouldNotBeNil)
+ So(len(offsetTable), ShouldEqual, 4)
+ messageQueue := primitive.MessageQueue{
+ Topic: "topicB",
+ BrokerName: "RaftNode00",
+ QueueId: 0,
+ }
+ So(offsetTable[messageQueue], ShouldEqual, 11110)
+ })
+
+ Convey("test ResetOffset Body Decode fast json schema with one item",
t, func() {
+ body :=
"{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110}}"
+ resetOffsetBody := new(ResetOffsetBody)
+ resetOffsetBody.Decode([]byte(body))
+ offsetTable := resetOffsetBody.OffsetTable
+ So(offsetTable, ShouldNotBeNil)
+ So(len(offsetTable), ShouldEqual, 1)
+ messageQueue := primitive.MessageQueue{
+ Topic: "topicB",
+ BrokerName: "RaftNode00",
+ QueueId: 0,
+ }
+ So(offsetTable[messageQueue], ShouldEqual, 11110)
+ })
+
+ Convey("test ResetOffset Body Decode empty fast json ", t, func() {
+ body := "{\"offsetTable\":{}}"
+ resetOffsetBody := new(ResetOffsetBody)
+ resetOffsetBody.Decode([]byte(body))
+ offsetTable := resetOffsetBody.OffsetTable
+ So(offsetTable, ShouldNotBeNil)
+ So(len(offsetTable), ShouldEqual, 0)
+ })
+
+ Convey("test ResetOffset Body Decode empty gson json ", t, func() {
+ body := "{\"offsetTable\":[]}"
+ resetOffsetBody := new(ResetOffsetBody)
+ resetOffsetBody.Decode([]byte(body))
+ offsetTable := resetOffsetBody.OffsetTable
+ So(offsetTable, ShouldNotBeNil)
+ So(len(offsetTable), ShouldEqual, 0)
+ })
}