This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 322e9a2d golang: support zlib decode (#957)
322e9a2d is described below
commit 322e9a2d0b3f6b4b74d4eebb330c5e9238d93b36
Author: guyinyou <[email protected]>
AuthorDate: Mon Mar 10 11:49:56 2025 +0800
golang: support zlib decode (#957)
Co-authored-by: guyinyou <[email protected]>
---
golang/message.go | 2 +-
golang/pkg/utils/utils.go | 21 +++++++++++++++++++++
golang/pkg/utils/utils_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/golang/message.go b/golang/message.go
index 9d1007f1..80675424 100644
--- a/golang/message.go
+++ b/golang/message.go
@@ -231,7 +231,7 @@ func fromProtobuf_MessageView2(message *v2.Message,
messageQueue *v2.MessageQueu
bodyEncoding := systemProperties.GetBodyEncoding()
switch bodyEncoding {
case v2.Encoding_GZIP:
- unCompressBody, err := utils.GZIPDecode(message.GetBody())
+ unCompressBody, err := utils.AutoDecode(message.GetBody())
if err != nil {
sugarBaseLogger.Errorf("failed to uncompress message
body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
corrupted = true
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index db145e4d..a8343ef5 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -20,6 +20,7 @@ package utils
import (
"bytes"
"compress/gzip"
+ "compress/zlib"
"context"
"encoding/hex"
"fmt"
@@ -140,6 +141,26 @@ func MatchMessageType(mq *v2.MessageQueue, messageType
v2.MessageType) bool {
return false
}
+func AutoDecode(in []byte) ([]byte, error) {
+ if len(in) < 2 {
+ return in, fmt.Errorf("unknown format")
+ }
+ if in[0] == 0x1f && in[1] == 0x8b {
+ return GZIPDecode(in)
+ }
+ return ZlibDecode(in)
+}
+
+func ZlibDecode(in []byte) ([]byte, error) {
+ reader, err := zlib.NewReader(bytes.NewReader(in))
+ if err != nil {
+ var out []byte
+ return out, err
+ }
+ defer reader.Close()
+ return ioutil.ReadAll(reader)
+}
+
func GZIPDecode(in []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(in))
if err != nil {
diff --git a/golang/pkg/utils/utils_test.go b/golang/pkg/utils/utils_test.go
index bf43349a..514f3746 100644
--- a/golang/pkg/utils/utils_test.go
+++ b/golang/pkg/utils/utils_test.go
@@ -19,6 +19,7 @@ package utils
import (
"compress/gzip"
+ "compress/zlib"
"testing"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -111,6 +112,33 @@ func TestMatchMessageType(t *testing.T) {
}
}
+func TestAutoDecode(t *testing.T) {
+ _, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+ if err != zlib.ErrHeader {
+ t.Error()
+ }
+ _, err = AutoDecode([]byte{0})
+ if err == nil {
+ t.Error()
+ }
+ // gzip
+ bytes, err := AutoDecode([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 42,
202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7,
0, 0, 0, 255, 255, 1, 0, 0, 255, 255, 97, 36, 132, 114, 18, 0, 0, 0})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
+ // zlib
+ bytes, err = AutoDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 201,
45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 68,
223, 7, 22})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
+}
+
func TestGZIPDecode(t *testing.T) {
_, err := GZIPDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
if err != gzip.ErrHeader {
@@ -125,6 +153,20 @@ func TestGZIPDecode(t *testing.T) {
}
}
+func TestZlibDecode(t *testing.T) {
+ _, err := ZlibDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+ if err != zlib.ErrHeader {
+ t.Error()
+ }
+ bytes, err := ZlibDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45,
201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255,
68, 223, 7, 22})
+ if err != nil {
+ t.Error()
+ }
+ if string(bytes) != "rocketmq-client-go" {
+ t.Error()
+ }
+}
+
func TestSelectAnAddress(t *testing.T) {
if SelectAnAddress(nil) != nil {
t.Error()