This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new b3e6c94ef feat: decode hessian from stream(#2840) (#2841)
b3e6c94ef is described below
commit b3e6c94ef79ba71ef7cc791a36c4410da3eef956
Author: Evan <[email protected]>
AuthorDate: Sat Jun 14 18:05:03 2025 +0800
feat: decode hessian from stream(#2840) (#2841)
* feat: decode hessian from stream (#2840)
* fix: assert NoError test case
* rename streaming to stream
* fix ci issues
* fix var format
---
protocol/dubbo/hessian2/hessian_dubbo.go | 129 +++++++++++++++++++-------
protocol/dubbo/hessian2/hessian_dubbo_test.go | 65 +++++++++++++
2 files changed, 161 insertions(+), 33 deletions(-)
diff --git a/protocol/dubbo/hessian2/hessian_dubbo.go
b/protocol/dubbo/hessian2/hessian_dubbo.go
index 3458650f1..ed38db950 100644
--- a/protocol/dubbo/hessian2/hessian_dubbo.go
+++ b/protocol/dubbo/hessian2/hessian_dubbo.go
@@ -67,6 +67,7 @@ type HessianCodec struct {
pkgType PackageType
reader *bufio.Reader
bodyLen int
+ stream bool
}
// NewHessianCodec generate a new hessian codec instance
@@ -85,6 +86,16 @@ func NewHessianCodecCustom(pkgType PackageType, reader
*bufio.Reader, bodyLen in
}
}
+// NewStreamHessianCodecCustom generate a new hessian codec instance
+func NewStreamHessianCodecCustom(pkgType PackageType, reader *bufio.Reader,
bodyLen int) *HessianCodec {
+ return &HessianCodec{
+ pkgType: pkgType,
+ reader: reader,
+ bodyLen: bodyLen,
+ stream: true,
+ }
+}
+
func (h *HessianCodec) Write(service Service, header DubboHeader, body any)
([]byte, error) {
switch header.Type {
case PackageHeartbeat:
@@ -108,18 +119,36 @@ func (h *HessianCodec) Write(service Service, header
DubboHeader, body any) ([]b
// ReadHeader uses hessian codec to read dubbo header
func (h *HessianCodec) ReadHeader(header *DubboHeader) error {
- var err error
-
- if h.reader.Size() < HEADER_LENGTH {
- return ErrHeaderNotEnough
- }
- buf, err := h.reader.Peek(HEADER_LENGTH)
- if err != nil { // this is impossible
- return perrors.WithStack(err)
- }
- _, err = h.reader.Discard(HEADER_LENGTH)
- if err != nil { // this is impossible
- return perrors.WithStack(err)
+ var (
+ err error
+ n int
+ buf []byte
+ )
+
+ if h.stream {
+ buf = make([]byte, HEADER_LENGTH)
+ n, err = h.reader.Read(buf)
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if n < HEADER_LENGTH {
+ _, err = h.reader.Read(buf[n:])
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
+ }
+ } else {
+ if h.reader.Size() < HEADER_LENGTH {
+ return ErrHeaderNotEnough
+ }
+ buf, err = h.reader.Peek(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
+ _, err = h.reader.Discard(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
}
//// read header
@@ -164,7 +193,7 @@ func (h *HessianCodec) ReadHeader(header *DubboHeader)
error {
h.pkgType = header.Type
h.bodyLen = header.BodyLen
- if h.reader.Buffered() < h.bodyLen {
+ if h.reader.Buffered() < h.bodyLen && !h.stream {
return ErrBodyNotEnough
}
@@ -173,16 +202,33 @@ func (h *HessianCodec) ReadHeader(header *DubboHeader)
error {
// ReadBody uses hessian codec to read response body
func (h *HessianCodec) ReadBody(rspObj any) error {
- if h.reader.Buffered() < h.bodyLen {
- return ErrBodyNotEnough
- }
- buf, err := h.reader.Peek(h.bodyLen)
- if err != nil {
- return perrors.WithStack(err)
- }
- _, err = h.reader.Discard(h.bodyLen)
- if err != nil { // this is impossible
- return perrors.WithStack(err)
+ var (
+ err error
+ buf []byte
+ )
+
+ if h.stream {
+ buf = make([]byte, h.bodyLen)
+ readLen, n := 0, 0
+ for readLen < h.bodyLen {
+ n, err = h.reader.Read(buf[readLen:])
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ readLen += n
+ }
+ } else {
+ if h.reader.Buffered() < h.bodyLen {
+ return ErrBodyNotEnough
+ }
+ buf, err = h.reader.Peek(h.bodyLen)
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ _, err = h.reader.Discard(h.bodyLen)
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
}
switch h.pkgType & PackageType_BitSize {
@@ -218,16 +264,33 @@ func (h *HessianCodec) ReadBody(rspObj any) error {
// ignore body, but only read attachments
func (h *HessianCodec) ReadAttachments() (map[string]any, error) {
- if h.reader.Buffered() < h.bodyLen {
- return nil, ErrBodyNotEnough
- }
- buf, err := h.reader.Peek(h.bodyLen)
- if err != nil {
- return nil, perrors.WithStack(err)
- }
- _, err = h.reader.Discard(h.bodyLen)
- if err != nil { // this is impossible
- return nil, perrors.WithStack(err)
+ var (
+ err error
+ buf []byte
+ )
+
+ if h.stream {
+ buf = make([]byte, h.bodyLen)
+ readLen, n := 0, 0
+ for readLen < h.bodyLen {
+ n, err = h.reader.Read(buf[readLen:])
+ if err != nil {
+ return nil, perrors.WithStack(err)
+ }
+ readLen += n
+ }
+ } else {
+ if h.reader.Buffered() < h.bodyLen {
+ return nil, ErrBodyNotEnough
+ }
+ buf, err = h.reader.Peek(h.bodyLen)
+ if err != nil {
+ return nil, perrors.WithStack(err)
+ }
+ _, err = h.reader.Discard(h.bodyLen)
+ if err != nil { // this is impossible
+ return nil, perrors.WithStack(err)
+ }
}
switch h.pkgType & PackageType_BitSize {
diff --git a/protocol/dubbo/hessian2/hessian_dubbo_test.go
b/protocol/dubbo/hessian2/hessian_dubbo_test.go
index cd441824e..436c527f7 100644
--- a/protocol/dubbo/hessian2/hessian_dubbo_test.go
+++ b/protocol/dubbo/hessian2/hessian_dubbo_test.go
@@ -245,3 +245,68 @@ type AttachTestObject struct {
func (AttachTestObject) JavaClassName() string {
return "com.test.Test"
}
+
+type CaseStream struct {
+ Payload string
+}
+
+func (CaseStream) JavaClassName() string {
+ return "com.test.CaseStream"
+}
+
+func TestDecodeFromTcpStream(t *testing.T) {
+ payload := make([]byte, 1024)
+ alphabet := "abcdefghijklmnopqrstuvwxyz"
+ for i := range payload {
+ payload[i] = alphabet[i%26]
+ }
+ cs := &CaseStream{
+ Payload: string(payload),
+ }
+
+ hessian.RegisterPOJO(cs)
+ codecW := NewHessianCodec(nil)
+ service := Service{
+ Path: "test",
+ Interface: "ITest",
+ Version: "v1.0",
+ Method: "test",
+ Timeout: time.Second * 10,
+ }
+ header := DubboHeader{
+ SerialID: 2,
+ Type: PackageRequest,
+ ID: 1,
+ ResponseStatus: Zero,
+ }
+ resp, err := codecW.Write(service, header, []interface{}{cs})
+ assert.NoError(t, err)
+
+ // set reader buffer = 1024 to split resp into two parts
+ codec := NewStreamHessianCodecCustom(0,
bufio.NewReaderSize(bytes.NewReader(resp), 1024), 0)
+ h := &DubboHeader{}
+ assert.NoError(t, codec.ReadHeader(h))
+ assert.Equal(t, h.SerialID, header.SerialID)
+ assert.Equal(t, h.Type, header.Type)
+ assert.Equal(t, h.ID, header.ID)
+ assert.Equal(t, h.ResponseStatus, header.ResponseStatus)
+
+ reqBody := make([]interface{}, 7)
+
+ err = codec.ReadBody(reqBody)
+ assert.NoError(t, err)
+ assert.Equal(t, reqBody[1], service.Path)
+ assert.Equal(t, reqBody[2], service.Version)
+ assert.Equal(t, reqBody[3], service.Method)
+
+ if list, ok := reqBody[5].([]interface{}); ok {
+ assert.Len(t, list, 1)
+ if infoPtr, ok2 := list[0].(*CaseStream); ok2 {
+ assert.Equal(t, len(infoPtr.Payload), 1024)
+ }
+ }
+
+ codec = NewHessianCodecCustom(0,
bufio.NewReaderSize(bytes.NewReader(resp), 1024), 0)
+ err = codec.ReadHeader(h)
+ assert.ErrorIs(t, err, ErrBodyNotEnough)
+}