This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c1f61  fit(produce): support message compression (#608)
f5c1f61 is described below

commit f5c1f6161d9e7678576aa16fcda6db48e6465c6c
Author: shenhui0509 <[email protected]>
AuthorDate: Tue Mar 16 20:18:21 2021 +0800

    fit(produce): support message compression (#608)
---
 internal/utils/{helper.go => compression.go} | 50 +++++++++++++-
 internal/utils/compression_test.go           | 97 ++++++++++++++++++++++++++++
 internal/utils/helper_test.go                | 37 -----------
 internal/utils/net.go                        |  4 ++
 primitive/message.go                         |  5 ++
 producer/option.go                           | 34 ++++++++--
 producer/producer.go                         | 22 +++++++
 7 files changed, 202 insertions(+), 47 deletions(-)

diff --git a/internal/utils/helper.go b/internal/utils/compression.go
similarity index 50%
rename from internal/utils/helper.go
rename to internal/utils/compression.go
index ccadd3f..4ad2ced 100644
--- a/internal/utils/helper.go
+++ b/internal/utils/compression.go
@@ -20,12 +20,56 @@ package utils
 import (
        "bytes"
        "compress/zlib"
+       "errors"
        "io/ioutil"
-       "net"
+       "sync"
 )
 
-func GetAddressByBytes(data []byte) string {
-       return net.IPv4(data[0], data[1], data[2], data[3]).String()
+var zlibWriterPools []sync.Pool
+
+var bufPool = sync.Pool{
+       New: func() interface{} {
+               return &bytes.Buffer{}
+       },
+}
+
+func init() {
+       zlibWriterPools = make([]sync.Pool, zlib.BestCompression)
+       for i := 0; i < zlib.BestCompression; i++ {
+               compressLevel := i
+               zlibWriterPools[i] = sync.Pool{
+                       New: func() interface{} {
+                               z, _ := zlib.NewWriterLevel(nil, 
compressLevel+1)
+                               return z
+                       },
+               }
+       }
+}
+
+func Compress(raw []byte, compressLevel int) ([]byte, error) {
+       if compressLevel < zlib.BestSpeed || compressLevel > 
zlib.BestCompression {
+               return nil, errors.New("unsupported compress level")
+       }
+
+       buf := bufPool.Get().(*bytes.Buffer)
+       defer bufPool.Put(buf)
+       writerPool := zlibWriterPools[compressLevel-1]
+       writer := writerPool.Get().(*zlib.Writer)
+       defer writerPool.Put(writer)
+       buf.Reset()
+       writer.Reset(buf)
+       _, e := writer.Write(raw)
+       if e != nil {
+               return nil, e
+       }
+
+       e = writer.Close()
+       if e != nil {
+               return nil, e
+       }
+       result := make([]byte, buf.Len())
+       buf.Read(result)
+       return result, nil
 }
 
 func UnCompress(data []byte) []byte {
diff --git a/internal/utils/compression_test.go 
b/internal/utils/compression_test.go
new file mode 100644
index 0000000..4d10893
--- /dev/null
+++ b/internal/utils/compression_test.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+       "bytes"
+       "compress/zlib"
+       "encoding/json"
+       "fmt"
+       "math/rand"
+       "testing"
+)
+
+func TestUnCompress(t *testing.T) {
+       var b bytes.Buffer
+       var oriStr string = "hello, go"
+       zr := zlib.NewWriter(&b)
+       zr.Write([]byte(oriStr))
+       zr.Close()
+
+       retBytes := UnCompress(b.Bytes())
+       if string(retBytes) != oriStr {
+               t.Errorf("UnCompress was incorrect, got %s, want: %s .", 
retBytes, []byte(oriStr))
+       }
+}
+
+func TestCompress(t *testing.T) {
+       raw := []byte("The quick brown fox jumps over the lazy dog")
+       for i := zlib.BestSpeed; i <= zlib.BestCompression; i++ {
+               compressed, e := Compress(raw, i)
+               if e != nil {
+                       t.Errorf("Compress data:%s returns error: %v", 
string(raw), e)
+                       return
+               }
+               decompressed := UnCompress(compressed)
+               if string(decompressed) != string(raw) {
+                       t.Errorf("data is corrupt, got: %s, want: %s", 
string(decompressed), string(raw))
+               }
+       }
+}
+
+func testCase(data []byte, level int, t *testing.T) {
+       compressed, e := Compress(data, level)
+       if e != nil {
+               t.Errorf("Compress data:%v returns error: %v", data, e)
+       }
+       decompressed := UnCompress(compressed)
+       if string(data) != string(decompressed) {
+               t.Errorf("data is corrupt, got: %s, want: %s", 
string(decompressed), string(data))
+       }
+}
+
+func generateRandTestData(n int) []byte {
+       data := make([]byte, n)
+       rand.Read(data)
+       return data
+}
+
+func generateJsonString(n int) []byte {
+       x := make(map[string]string)
+       for i := 0; i < n; i++ {
+               k := fmt.Sprintf("compression_key_%d", i)
+               v := fmt.Sprintf("compression_value_%d", i)
+               x[k] = v
+       }
+       data, _ := json.Marshal(x)
+       return data
+}
+
+func TestCompressThreadSafe(t *testing.T) {
+       for i := 0; i < 100; i++ {
+               data := generateRandTestData(i * 100)
+               level := i%zlib.BestCompression + 1
+               go testCase(data, level, t)
+       }
+
+       for i := 0; i < 100; i++ {
+               data := generateJsonString(i * 100)
+               level := i%zlib.BestCompression + 1
+               go testCase(data, level, t)
+       }
+}
diff --git a/internal/utils/helper_test.go b/internal/utils/helper_test.go
deleted file mode 100644
index 837c5ab..0000000
--- a/internal/utils/helper_test.go
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils
-
-import (
-       "bytes"
-       "compress/zlib"
-       "testing"
-)
-
-func TestUnCompress(t *testing.T) {
-       var b bytes.Buffer
-       var oriStr string = "hello, go"
-       zr := zlib.NewWriter(&b)
-       zr.Write([]byte(oriStr))
-       zr.Close()
-
-       retBytes := UnCompress(b.Bytes())
-       if string(retBytes) != oriStr {
-               t.Errorf("UnCompress was incorrect, got %s, want: %s .", 
retBytes, []byte(oriStr))
-       }
-}
diff --git a/internal/utils/net.go b/internal/utils/net.go
index c9444a9..0dfcff8 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -59,3 +59,7 @@ func FakeIP() []byte {
        
buf.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond),
 10))
        return buf.Bytes()[4:8]
 }
+
+func GetAddressByBytes(data []byte) string {
+       return net.IPv4(data[0], data[1], data[2], data[3]).String()
+}
diff --git a/primitive/message.go b/primitive/message.go
index 6a84477..fd7e9c6 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -67,6 +67,7 @@ type Message struct {
        Flag          int32
        TransactionId string
        Batch         bool
+       Compress      bool
        // Queue is the queue that messages will be sent to. the value must be 
set if want to custom the queue of message,
        // just ignore if not.
        Queue *MessageQueue
@@ -498,6 +499,10 @@ func ClearCompressedFlag(flag int) int {
        return flag & (^CompressedFlag)
 }
 
+func SetCompressedFlag(flag int) int {
+       return flag | CompressedFlag
+}
+
 var (
        counter        int16 = 0
        startTimestamp int64 = 0
diff --git a/producer/option.go b/producer/option.go
index bacef7b..5839402 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -26,12 +26,14 @@ import (
 
 func defaultProducerOptions() producerOptions {
        opts := producerOptions{
-               ClientOptions:         internal.DefaultClientOptions(),
-               Selector:              NewRoundRobinQueueSelector(),
-               SendMsgTimeout:        3 * time.Second,
-               DefaultTopicQueueNums: 4,
-               CreateTopicKey:        "TBW102",
-               Resolver:              primitive.NewHttpResolver("DEFAULT"),
+               ClientOptions:              internal.DefaultClientOptions(),
+               Selector:                   NewRoundRobinQueueSelector(),
+               SendMsgTimeout:             3 * time.Second,
+               DefaultTopicQueueNums:      4,
+               CreateTopicKey:             "TBW102",
+               Resolver:                   
primitive.NewHttpResolver("DEFAULT"),
+               CompressMsgBodyOverHowmuch: 4096,
+               CompressLevel:              5,
        }
        opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
        return opts
@@ -44,7 +46,9 @@ type producerOptions struct {
        DefaultTopicQueueNums int
        CreateTopicKey        string // "TBW102" Will be created at broker when 
isAutoCreateTopicEnable. when topic is not created,
        // and broker open isAutoCreateTopicEnable, topic will use "TBW102" 
config to create topic
-       Resolver primitive.NsResolver
+       Resolver                   primitive.NsResolver
+       CompressMsgBodyOverHowmuch int
+       CompressLevel              int
 }
 
 type Option func(*producerOptions)
@@ -142,3 +146,19 @@ func WithNameServerDomain(nameServerUrl string) Option {
                opts.Resolver = primitive.NewHttpResolver("DEFAULT", 
nameServerUrl)
        }
 }
+
+// WithCompressMsgBodyOverHowmuch set compression threshold
+func WithCompressMsgBodyOverHowmuch(threshold int) Option {
+       return func(opts *producerOptions) {
+               opts.CompressMsgBodyOverHowmuch = threshold
+       }
+}
+
+// WithCompressLevel set compress level (0~9)
+// 0 stands for best speed
+// 9 stands for best compression ratio
+func WithCompressLevel(level int) Option {
+       return func(opts *producerOptions) {
+               opts.CompressLevel = level
+       }
+}
diff --git a/producer/producer.go b/producer/producer.go
index 7b33960..65e39c2 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -302,12 +302,34 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, 
msg *primitive.Message
        return err
 }
 
+func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
+       if msg.Compress {
+               return true
+       }
+       if msg.Batch {
+               return false
+       }
+       if len(msg.Body) < p.options.CompressMsgBodyOverHowmuch {
+               return false
+       }
+       compressedBody, e := utils.Compress(msg.Body, p.options.CompressLevel)
+       if e != nil {
+               return false
+       }
+       msg.Body = compressedBody
+       msg.Compress = true
+       return true
+}
+
 func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
        msg *primitive.Message) *remote.RemotingCommand {
        if !msg.Batch && 
msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" {
                
msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, 
primitive.CreateUniqID())
        }
        sysFlag := 0
+       if p.tryCompressMsg(msg) {
+               sysFlag = primitive.SetCompressedFlag(sysFlag)
+       }
        v := msg.GetProperty(primitive.PropertyTransactionPrepared)
        if v != "" {
                tranMsg, err := strconv.ParseBool(v)

Reply via email to