This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 83f60c1  [ISSUE #759]
83f60c1 is described below

commit 83f60c154236bb92a5d5e3d40276b546b6079f1b
Author: WJL3333 <[email protected]>
AuthorDate: Mon Jan 3 20:50:55 2022 +0800

    [ISSUE #759]
    
    ISSUE #759] Change ResetOffsetBody response parse method to support 
fastjson schema
---
 internal/model.go      | 77 +++++++++++++++++++++++++++++++++++++++++++++++---
 internal/model_test.go | 50 +++++++++++++++++++++++++++++++-
 2 files changed, 122 insertions(+), 5 deletions(-)

diff --git a/internal/model.go b/internal/model.go
index d7f2057..7a011d7 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -295,12 +295,39 @@ type ResetOffsetBody struct {
        OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
 }
 
+// Decode note: the origin implementation for parse json is in gson format.
+// this func should support both gson and fastjson schema.
 func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
+       validJSON := gjson.ValidBytes(body)
+
+       var offsetTable map[primitive.MessageQueue]int64
+
+       if validJSON {
+               offsetTable = parseGsonFormat(body)
+       } else {
+               offsetTable = parseFastJsonFormat(body)
+       }
+
+       resetOffsetBody.OffsetTable = offsetTable
+}
+
+func parseGsonFormat(body []byte) map[primitive.MessageQueue]int64 {
        result := gjson.ParseBytes(body)
+
        rlog.Debug("offset table string "+result.Get("offsetTable").String(), 
nil)
 
        offsetTable := make(map[primitive.MessageQueue]int64, 0)
-       offsetTableArray := strings.Split(result.Get("offsetTable").String(), 
"],[")
+
+       offsetStr := result.Get("offsetTable").String()
+       if len(offsetStr) <= 2 {
+               rlog.Warning("parse reset offset table json get nothing in 
body", map[string]interface{}{
+                       "origin json": offsetStr,
+               })
+               return offsetTable
+       }
+
+       offsetTableArray := strings.Split(offsetStr, "],[")
+
        for index, v := range offsetTableArray {
                kvArray := strings.Split(v, "},")
 
@@ -315,7 +342,7 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) 
{
                        rlog.Error("Unmarshal offset error", 
map[string]interface{}{
                                rlog.LogKeyUnderlayError: err,
                        })
-                       return
+                       return nil
                }
 
                if index == 0 {
@@ -329,9 +356,51 @@ func (resetOffsetBody *ResetOffsetBody) Decode(body 
[]byte) {
                        rlog.Error("Unmarshal message queue error", 
map[string]interface{}{
                                rlog.LogKeyUnderlayError: err,
                        })
-                       return
+                       return nil
                }
                offsetTable[*kObj] = offset
        }
-       resetOffsetBody.OffsetTable = offsetTable
+
+       return offsetTable
+}
+
+func parseFastJsonFormat(body []byte) map[primitive.MessageQueue]int64 {
+       offsetTable := make(map[primitive.MessageQueue]int64)
+
+       jsonStr := string(body)
+       offsetStr := gjson.Get(jsonStr, "offsetTable").String()
+
+       if len(offsetStr) <= 2 {
+               rlog.Warning("parse reset offset table json get nothing in 
body", map[string]interface{}{
+                       "origin json": jsonStr,
+               })
+               return offsetTable
+       }
+
+       trimStr := offsetStr[2 : len(offsetStr)-1]
+
+       split := strings.Split(trimStr, ",{")
+
+       for _, v := range split {
+               tuple := strings.Split(v, "}:")
+
+               queueStr := "{" + tuple[0] + "}"
+
+               var err error
+               // ignore err for now
+               offset, err := strconv.Atoi(tuple[1])
+
+               var queue primitive.MessageQueue
+               err = json.Unmarshal([]byte(queueStr), &queue)
+
+               if err != nil {
+                       rlog.Error("parse reset offset table json get nothing 
in body", map[string]interface{}{
+                               "origin json": jsonStr,
+                       })
+               }
+
+               offsetTable[queue] = int64(offset)
+       }
+
+       return offsetTable
 }
diff --git a/internal/model_test.go b/internal/model_test.go
index a505f3e..a653fa4 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -419,7 +419,7 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t 
*testing.T) {
 }
 
 func TestRestOffsetBody_MarshalJSON(t *testing.T) {
-       Convey("test ResetOffset Body Decode", t, func() {
+       Convey("test ResetOffset Body Decode gson json schema", t, func() {
                body := 
"{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"q
 [...]
                resetOffsetBody := new(ResetOffsetBody)
                resetOffsetBody.Decode([]byte(body))
@@ -433,4 +433,52 @@ func TestRestOffsetBody_MarshalJSON(t *testing.T) {
                }
                So(offsetTable[messageQueue], ShouldEqual, 23354233)
        })
+
+       Convey("test ResetOffset Body Decode fast json schema", t, func() {
+               body := 
"{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110,{\"brokerName\":\"RaftNode00\",\"queueId\":1,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":2,\"topic\":\"topicB\"}:0,{\"brokerName\":\"RaftNode00\",\"queueId\":3,\"topic\":\"topicB\"}:0}}"
+               resetOffsetBody := new(ResetOffsetBody)
+               resetOffsetBody.Decode([]byte(body))
+               offsetTable := resetOffsetBody.OffsetTable
+               So(offsetTable, ShouldNotBeNil)
+               So(len(offsetTable), ShouldEqual, 4)
+               messageQueue := primitive.MessageQueue{
+                       Topic:      "topicB",
+                       BrokerName: "RaftNode00",
+                       QueueId:    0,
+               }
+               So(offsetTable[messageQueue], ShouldEqual, 11110)
+       })
+
+       Convey("test ResetOffset Body Decode fast json schema with one item", 
t, func() {
+               body := 
"{\"offsetTable\":{{\"brokerName\":\"RaftNode00\",\"queueId\":0,\"topic\":\"topicB\"}:11110}}"
+               resetOffsetBody := new(ResetOffsetBody)
+               resetOffsetBody.Decode([]byte(body))
+               offsetTable := resetOffsetBody.OffsetTable
+               So(offsetTable, ShouldNotBeNil)
+               So(len(offsetTable), ShouldEqual, 1)
+               messageQueue := primitive.MessageQueue{
+                       Topic:      "topicB",
+                       BrokerName: "RaftNode00",
+                       QueueId:    0,
+               }
+               So(offsetTable[messageQueue], ShouldEqual, 11110)
+       })
+
+       Convey("test ResetOffset Body Decode empty fast json ", t, func() {
+               body := "{\"offsetTable\":{}}"
+               resetOffsetBody := new(ResetOffsetBody)
+               resetOffsetBody.Decode([]byte(body))
+               offsetTable := resetOffsetBody.OffsetTable
+               So(offsetTable, ShouldNotBeNil)
+               So(len(offsetTable), ShouldEqual, 0)
+       })
+
+       Convey("test ResetOffset Body Decode empty gson json ", t, func() {
+               body := "{\"offsetTable\":[]}"
+               resetOffsetBody := new(ResetOffsetBody)
+               resetOffsetBody.Decode([]byte(body))
+               offsetTable := resetOffsetBody.OffsetTable
+               So(offsetTable, ShouldNotBeNil)
+               So(len(offsetTable), ShouldEqual, 0)
+       })
 }

Reply via email to