This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new b3e6c94ef feat: decode hessian from stream(#2840) (#2841)
b3e6c94ef is described below

commit b3e6c94ef79ba71ef7cc791a36c4410da3eef956
Author: Evan <[email protected]>
AuthorDate: Sat Jun 14 18:05:03 2025 +0800

    feat: decode hessian from stream(#2840) (#2841)
    
    * feat: decode hessian from stream (#2840)
    
    * fix: assert NoError test case
    
    * rename streaming to stream
    
    * fix ci issues
    
    * fix var format
---
 protocol/dubbo/hessian2/hessian_dubbo.go      | 129 +++++++++++++++++++-------
 protocol/dubbo/hessian2/hessian_dubbo_test.go |  65 +++++++++++++
 2 files changed, 161 insertions(+), 33 deletions(-)

diff --git a/protocol/dubbo/hessian2/hessian_dubbo.go 
b/protocol/dubbo/hessian2/hessian_dubbo.go
index 3458650f1..ed38db950 100644
--- a/protocol/dubbo/hessian2/hessian_dubbo.go
+++ b/protocol/dubbo/hessian2/hessian_dubbo.go
@@ -67,6 +67,7 @@ type HessianCodec struct {
        pkgType PackageType
        reader  *bufio.Reader
        bodyLen int
+       stream  bool
 }
 
 // NewHessianCodec generate a new hessian codec instance
@@ -85,6 +86,16 @@ func NewHessianCodecCustom(pkgType PackageType, reader 
*bufio.Reader, bodyLen in
        }
 }
 
+// NewStreamHessianCodecCustom generate a new hessian codec instance
+func NewStreamHessianCodecCustom(pkgType PackageType, reader *bufio.Reader, 
bodyLen int) *HessianCodec {
+       return &HessianCodec{
+               pkgType: pkgType,
+               reader:  reader,
+               bodyLen: bodyLen,
+               stream:  true,
+       }
+}
+
 func (h *HessianCodec) Write(service Service, header DubboHeader, body any) 
([]byte, error) {
        switch header.Type {
        case PackageHeartbeat:
@@ -108,18 +119,36 @@ func (h *HessianCodec) Write(service Service, header 
DubboHeader, body any) ([]b
 
 // ReadHeader uses hessian codec to read dubbo header
 func (h *HessianCodec) ReadHeader(header *DubboHeader) error {
-       var err error
-
-       if h.reader.Size() < HEADER_LENGTH {
-               return ErrHeaderNotEnough
-       }
-       buf, err := h.reader.Peek(HEADER_LENGTH)
-       if err != nil { // this is impossible
-               return perrors.WithStack(err)
-       }
-       _, err = h.reader.Discard(HEADER_LENGTH)
-       if err != nil { // this is impossible
-               return perrors.WithStack(err)
+       var (
+               err error
+               n   int
+               buf []byte
+       )
+
+       if h.stream {
+               buf = make([]byte, HEADER_LENGTH)
+               n, err = h.reader.Read(buf)
+               if err != nil {
+                       return perrors.WithStack(err)
+               }
+               if n < HEADER_LENGTH {
+                       _, err = h.reader.Read(buf[n:])
+                       if err != nil { // this is impossible
+                               return perrors.WithStack(err)
+                       }
+               }
+       } else {
+               if h.reader.Size() < HEADER_LENGTH {
+                       return ErrHeaderNotEnough
+               }
+               buf, err = h.reader.Peek(HEADER_LENGTH)
+               if err != nil { // this is impossible
+                       return perrors.WithStack(err)
+               }
+               _, err = h.reader.Discard(HEADER_LENGTH)
+               if err != nil { // this is impossible
+                       return perrors.WithStack(err)
+               }
        }
 
        //// read header
@@ -164,7 +193,7 @@ func (h *HessianCodec) ReadHeader(header *DubboHeader) 
error {
        h.pkgType = header.Type
        h.bodyLen = header.BodyLen
 
-       if h.reader.Buffered() < h.bodyLen {
+       if h.reader.Buffered() < h.bodyLen && !h.stream {
                return ErrBodyNotEnough
        }
 
@@ -173,16 +202,33 @@ func (h *HessianCodec) ReadHeader(header *DubboHeader) 
error {
 
 // ReadBody uses hessian codec to read response body
 func (h *HessianCodec) ReadBody(rspObj any) error {
-       if h.reader.Buffered() < h.bodyLen {
-               return ErrBodyNotEnough
-       }
-       buf, err := h.reader.Peek(h.bodyLen)
-       if err != nil {
-               return perrors.WithStack(err)
-       }
-       _, err = h.reader.Discard(h.bodyLen)
-       if err != nil { // this is impossible
-               return perrors.WithStack(err)
+       var (
+               err error
+               buf []byte
+       )
+
+       if h.stream {
+               buf = make([]byte, h.bodyLen)
+               readLen, n := 0, 0
+               for readLen < h.bodyLen {
+                       n, err = h.reader.Read(buf[readLen:])
+                       if err != nil {
+                               return perrors.WithStack(err)
+                       }
+                       readLen += n
+               }
+       } else {
+               if h.reader.Buffered() < h.bodyLen {
+                       return ErrBodyNotEnough
+               }
+               buf, err = h.reader.Peek(h.bodyLen)
+               if err != nil {
+                       return perrors.WithStack(err)
+               }
+               _, err = h.reader.Discard(h.bodyLen)
+               if err != nil { // this is impossible
+                       return perrors.WithStack(err)
+               }
        }
 
        switch h.pkgType & PackageType_BitSize {
@@ -218,16 +264,33 @@ func (h *HessianCodec) ReadBody(rspObj any) error {
 
 // ignore body, but only read attachments
 func (h *HessianCodec) ReadAttachments() (map[string]any, error) {
-       if h.reader.Buffered() < h.bodyLen {
-               return nil, ErrBodyNotEnough
-       }
-       buf, err := h.reader.Peek(h.bodyLen)
-       if err != nil {
-               return nil, perrors.WithStack(err)
-       }
-       _, err = h.reader.Discard(h.bodyLen)
-       if err != nil { // this is impossible
-               return nil, perrors.WithStack(err)
+       var (
+               err error
+               buf []byte
+       )
+
+       if h.stream {
+               buf = make([]byte, h.bodyLen)
+               readLen, n := 0, 0
+               for readLen < h.bodyLen {
+                       n, err = h.reader.Read(buf[readLen:])
+                       if err != nil {
+                               return nil, perrors.WithStack(err)
+                       }
+                       readLen += n
+               }
+       } else {
+               if h.reader.Buffered() < h.bodyLen {
+                       return nil, ErrBodyNotEnough
+               }
+               buf, err = h.reader.Peek(h.bodyLen)
+               if err != nil {
+                       return nil, perrors.WithStack(err)
+               }
+               _, err = h.reader.Discard(h.bodyLen)
+               if err != nil { // this is impossible
+                       return nil, perrors.WithStack(err)
+               }
        }
 
        switch h.pkgType & PackageType_BitSize {
diff --git a/protocol/dubbo/hessian2/hessian_dubbo_test.go 
b/protocol/dubbo/hessian2/hessian_dubbo_test.go
index cd441824e..436c527f7 100644
--- a/protocol/dubbo/hessian2/hessian_dubbo_test.go
+++ b/protocol/dubbo/hessian2/hessian_dubbo_test.go
@@ -245,3 +245,68 @@ type AttachTestObject struct {
 func (AttachTestObject) JavaClassName() string {
        return "com.test.Test"
 }
+
+type CaseStream struct {
+       Payload string
+}
+
+func (CaseStream) JavaClassName() string {
+       return "com.test.CaseStream"
+}
+
+func TestDecodeFromTcpStream(t *testing.T) {
+       payload := make([]byte, 1024)
+       alphabet := "abcdefghijklmnopqrstuvwxyz"
+       for i := range payload {
+               payload[i] = alphabet[i%26]
+       }
+       cs := &CaseStream{
+               Payload: string(payload),
+       }
+
+       hessian.RegisterPOJO(cs)
+       codecW := NewHessianCodec(nil)
+       service := Service{
+               Path:      "test",
+               Interface: "ITest",
+               Version:   "v1.0",
+               Method:    "test",
+               Timeout:   time.Second * 10,
+       }
+       header := DubboHeader{
+               SerialID:       2,
+               Type:           PackageRequest,
+               ID:             1,
+               ResponseStatus: Zero,
+       }
+       resp, err := codecW.Write(service, header, []interface{}{cs})
+       assert.NoError(t, err)
+
+       // set reader buffer = 1024 to split resp into two parts
+       codec := NewStreamHessianCodecCustom(0, 
bufio.NewReaderSize(bytes.NewReader(resp), 1024), 0)
+       h := &DubboHeader{}
+       assert.NoError(t, codec.ReadHeader(h))
+       assert.Equal(t, h.SerialID, header.SerialID)
+       assert.Equal(t, h.Type, header.Type)
+       assert.Equal(t, h.ID, header.ID)
+       assert.Equal(t, h.ResponseStatus, header.ResponseStatus)
+
+       reqBody := make([]interface{}, 7)
+
+       err = codec.ReadBody(reqBody)
+       assert.NoError(t, err)
+       assert.Equal(t, reqBody[1], service.Path)
+       assert.Equal(t, reqBody[2], service.Version)
+       assert.Equal(t, reqBody[3], service.Method)
+
+       if list, ok := reqBody[5].([]interface{}); ok {
+               assert.Len(t, list, 1)
+               if infoPtr, ok2 := list[0].(*CaseStream); ok2 {
+                       assert.Equal(t, len(infoPtr.Payload), 1024)
+               }
+       }
+
+       codec = NewHessianCodecCustom(0, 
bufio.NewReaderSize(bytes.NewReader(resp), 1024), 0)
+       err = codec.ReadHeader(h)
+       assert.ErrorIs(t, err, ErrBodyNotEnough)
+}

Reply via email to