This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 9df5e28  [ISSUE #65] Fix message missing tag  properties  (#94)
9df5e28 is described below

commit 9df5e28d8c1149138f784a27d639edcc9451a690
Author: xujianhai666 <[email protected]>
AuthorDate: Sun Jul 7 22:38:25 2019 +0800

    [ISSUE #65] Fix message missing tag  properties  (#94)
    
    * fix message properties. fix #65
    
    * simplify commit
    
    * simplify commit
---
 examples/producer/interceptor/main.go |  6 ++++--
 internal/kernel/request.go            |  2 ++
 internal/producer/producer.go         | 13 ++-----------
 primitive/result.go                   | 34 +++++++++++++++++++++++++++++++---
 primitive/result_test.go              | 34 ++++++++++++++++++++++++++++++++++
 5 files changed, 73 insertions(+), 16 deletions(-)

diff --git a/examples/producer/interceptor/main.go 
b/examples/producer/interceptor/main.go
index f7bcf7a..c70eab3 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -22,6 +22,7 @@ import (
        "context"
        "fmt"
        "os"
+       "strconv"
 
        "github.com/apache/rocketmq-client-go/internal/producer"
        "github.com/apache/rocketmq-client-go/primitive"
@@ -41,6 +42,7 @@ func main() {
                        //Topic: "test",
                        Topic: "TopicTest",
                        Body:  []byte("Hello RocketMQ Go Client!"),
+                       Properties: map[string]string{"order": strconv.Itoa(i)},
                })
 
                if err != nil {
@@ -57,7 +59,7 @@ func main() {
 
 func UserFirstInterceptor() primitive.PInterceptor {
        return func(ctx context.Context, req, reply interface{}, next 
primitive.PInvoker) error {
-               fmt.Printf("user first interceptor before invoke: req:%v, 
reply: %v\n", req, reply)
+               fmt.Printf("user first interceptor before invoke: req:%v\n", 
req)
                err := next(ctx, req, reply)
                fmt.Printf("user first interceptor after invoke: req: %v, 
reply: %v \n", req, reply)
                return err
@@ -66,7 +68,7 @@ func UserFirstInterceptor() primitive.PInterceptor {
 
 func UserSecondInterceptor() primitive.PInterceptor {
        return func(ctx context.Context, req, reply interface{}, next 
primitive.PInvoker) error {
-               fmt.Printf("user second interceptor before invoke: req: %v, 
reply: %v\n", req, reply)
+               fmt.Printf("user second interceptor before invoke: req: %v\n", 
req)
                err := next(ctx, req, reply)
                fmt.Printf("user second interceptor after invoke: req: %v, 
reply: %v \n", req, reply)
                return err
diff --git a/internal/kernel/request.go b/internal/kernel/request.go
index 5fe1a44..ad3345b 100644
--- a/internal/kernel/request.go
+++ b/internal/kernel/request.go
@@ -71,6 +71,8 @@ func (request *SendMessageRequest) Encode() map[string]string 
{
        maps["defaultTopic"] = "TBW102"
        maps["defaultTopicQueueNums"] = "4"
        maps["batch"] = strconv.FormatBool(request.Batch)
+       maps["properties"] = request.Properties
+
        return maps
 }
 
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index fbcf5ee..a559a98 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -228,11 +228,12 @@ func (p *defaultProducer) buildSendRequest(mq 
*primitive.MessageQueue,
                SysFlag:        0,
                BornTimestamp:  time.Now().UnixNano() / int64(time.Millisecond),
                Flag:           msg.Flag,
-               Properties:     propertiesToString(msg.Properties),
+               Properties:     primitive.MarshalPropeties(msg.Properties),
                ReconsumeTimes: 0,
                UnitMode:       p.options.UnitMode,
                Batch:          false,
        }
+
        return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
 }
 
@@ -289,13 +290,3 @@ func (p *defaultProducer) IsUnitMode() bool {
        return false
 }
 
-func propertiesToString(properties map[string]string) string {
-       if properties == nil {
-               return ""
-       }
-       var str string
-       for k, v := range properties {
-               str += fmt.Sprintf("%s%v%s%v", k, byte(1), v, byte(2))
-       }
-       return str
-}
diff --git a/primitive/result.go b/primitive/result.go
index 628f243..217a8f3 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -36,6 +36,9 @@ const (
 
        FlagCompressed = 0x1
        MsgIdLength    = 8 + 8
+
+       propertySeparator  = '\002'
+       nameValueSeparator = '\001'
 )
 
 // SendResult RocketMQ send result
@@ -191,10 +194,11 @@ func DecodeMessage(data []byte) []*MessageExt {
                msg.Topic = string(buf.Next(int(_byte)))
                count += 1 + int(_byte)
 
+               // 17. properties
                var propertiesLength int16
                binary.Read(buf, binary.BigEndian, &propertiesLength)
                if propertiesLength > 0 {
-                       msg.Properties = 
parseProperties(buf.Next(int(propertiesLength)))
+                       msg.Properties = 
unmarshalProperties(buf.Next(int(propertiesLength)))
                }
                count += 2 + int(propertiesLength)
 
@@ -211,8 +215,32 @@ func createMessageId(addr []byte, offset int64) string {
        return "msgID" // TODO
 }
 
-func parseProperties(data []byte) map[string]string {
-       return make(map[string]string, 0)
+// unmarshalProperties parse data into property kv pairs.
+func unmarshalProperties(data []byte) map[string]string {
+       m := make(map[string]string)
+       items := bytes.Split(data, []byte{propertySeparator})
+       for _, item := range items {
+               kv := bytes.Split(item, []byte{nameValueSeparator})
+               if len(kv) == 2 {
+                       m[ string(kv[0]) ] = string(kv[1])
+               }
+       }
+       return m
+}
+
+func MarshalPropeties(properties map[string]string) string {
+       if properties == nil {
+               return ""
+       }
+       buffer := bytes.NewBufferString("")
+
+       for k, v := range properties {
+               buffer.WriteString(k)
+               buffer.WriteRune(nameValueSeparator)
+               buffer.WriteString(v)
+               buffer.WriteRune(propertySeparator)
+       }
+       return buffer.String()
 }
 
 func toMessages(messageExts []*MessageExt) []*Message {
diff --git a/primitive/result_test.go b/primitive/result_test.go
new file mode 100644
index 0000000..131db8c
--- /dev/null
+++ b/primitive/result_test.go
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func Test(t *testing.T) {
+       kv := map[string]string{
+               "k1": "v1",
+               "k2": "v2",
+       }
+       str := MarshalPropeties(kv)
+       kv2 := unmarshalProperties([]byte(str))
+       assert.Equal(t, kv, kv2)
+}

Reply via email to