This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git


The following commit(s) were added to refs/heads/main by this push:
     new af6e8b25e feat(go): add go desrialization support via io streams 
(#3374)
af6e8b25e is described below

commit af6e8b25e8c2d188cba2d99ccb276c47ec6c9c31
Author: Ayush Kumar <[email protected]>
AuthorDate: Fri Mar 6 20:15:47 2026 +0530

    feat(go): add go desrialization support via io streams (#3374)
    
    ## Why?
    
    To enable stream-based deserialization in Fory's Go library, allowing
    for direct reading from `io.Reader` without pre-buffering the entire
    payload. This improves efficiency for network and file-based transport
    and brings the Go implementation into feature-parity with the python and
    C++ libraries.
    
    ## What does this PR do?
    
    ### 1. Stream Infrastructure in `go/fory/buffer.go`
    
    Enhanced `ByteBuffer` to support `io.Reader` with an internal sliding
    window and automatic filling.
    
    - Added `reader io.Reader` and `minCap int` fields.
    - Implemented `fill(n int, err *Error) bool` for on-demand data fetching
    and buffer compaction.
    - Added `CheckReadable(n)` and `Skip(n)` memory-safe routines that pull
    from the underlying stream when necessary to avoid out-of-bounds panics.
    - Updated `ReadBinary` and `ReadBytes` to safely copy slices when
    streaming to prevent silent data corruption on compaction.
    - Updated all `Read*` methods (fixed-size, varint, tagged) to fetch data
    from the reader safely if not cached.
    
    ### 2. Stateful InputStream in `go/fory/stream.go`
    
    Added the `InputStream` feature to support true, stateful sequential
    stream reads.
    
    - Introduced `InputStream` which persists the buffered byte window and
    TypeResolver metadata (Meta Sharing) across multiple object decodes on
    the same stream, decoupled from `Fory` to mirror the C++
    `ForyInputStream` implementation.
    - Added `fory.DeserializeFromStream(is, target)` method to process
    continuous streamed data.
    - Added `Shrink()` method to compact the internal buffer and reclaim
    memory during long-lived streams.
    - Added `DeserializeFromReader` method as an API for simple *one-off*
    stream object reads.
    
    ### 3. Stream-Safe Deserialization Paths
    
    Updated internal deserialization pipelines in `struct.go` and
    `type_def.go` to be stream-safe:
    - Integrated `CheckReadable` bounds-checking into the `struct.go` fast
    paths for fixed-size primitives.
    - Safely rewrote schema-evolution skips (`skipTypeDef`) in `type_def.go`
    to use bounds-checked `Skip()` rather than unbounded `readerIndex`
    overrides.
    
    ### 4. Comprehensive Stream Tests
    
    - Built a custom `oneByteReader` wrapper (`go/fory/test_helper_test.go`)
    that artificially feeds the deserialization engine exactly 1 byte at a
    time.
    - Migrated the global test suite (`struct_test.go`, `primitive_test.go`,
    `slice_primitive_test.go`, etc.) to run all standard tests through this
    aggressive 1-byte fragmented stream reader via a new `testDeserialize`
    helper to guarantee total stream robustness.
    
    ## Related issues
    
    Closes #3302
    
    ## Does this PR introduce any user-facing change?
    
    - [x] Does this PR introduce any public API change? (`NewInputStream`,
    `DeserializeFromStream`, `DeserializeFromReader`,
    `NewByteBufferFromReader`)
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    ## Benchmark
    Main branch -
    <img width="1304" height="390" alt="image"
    
src="https://github.com/user-attachments/assets/c07e1f07-9284-4164-a166-73bb1f82018e";
    />
    
    This branch -
    <img width="1304" height="390" alt="image"
    
src="https://github.com/user-attachments/assets/c716d01e-7738-4958-8d80-25d3d84ab906";
    />
---
 go/fory/array_primitive_test.go |  18 +--
 go/fory/buffer.go               | 298 +++++++++++++++++++++++++++++-----------
 go/fory/primitive_test.go       |   8 +-
 go/fory/reader.go               |   1 +
 go/fory/slice_primitive_test.go |  66 ++++-----
 go/fory/stream.go               | 164 ++++++++++++++++++++++
 go/fory/stream_test.go          | 238 ++++++++++++++++++++++++++++++++
 go/fory/struct.go               |   5 +
 go/fory/struct_test.go          |  10 +-
 go/fory/test_helper_test.go     |  82 +++++++++++
 go/fory/type_def.go             |   2 +-
 11 files changed, 759 insertions(+), 133 deletions(-)

diff --git a/go/fory/array_primitive_test.go b/go/fory/array_primitive_test.go
index e8c99fb2f..c1b989efc 100644
--- a/go/fory/array_primitive_test.go
+++ b/go/fory/array_primitive_test.go
@@ -36,7 +36,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
                assert.NoError(t, err)
 
                var result [3]uint16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, arr, result)
        })
@@ -48,7 +48,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
                assert.NoError(t, err)
 
                var result [3]uint32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, arr, result)
        })
@@ -60,7 +60,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
                assert.NoError(t, err)
 
                var result [3]uint64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, arr, result)
        })
@@ -86,7 +86,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
                assert.NoError(t, err)
 
                var result [3]bfloat16.BFloat16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, arr, result)
        })
@@ -103,7 +103,7 @@ func TestArraySliceInteroperability(t *testing.T) {
 
                // Deserialize into Slice []int32
                var slice []int32
-               err = f.Deserialize(data, &slice)
+               err = testDeserialize(t, f, data, &slice)
                assert.NoError(t, err)
                assert.Equal(t, []int32{1, 2, 3}, slice)
        })
@@ -116,7 +116,7 @@ func TestArraySliceInteroperability(t *testing.T) {
 
                // Deserialize into Array [3]int32
                var arr [3]int32
-               err = f.Deserialize(data, &arr)
+               err = testDeserialize(t, f, data, &arr)
                assert.NoError(t, err)
                assert.Equal(t, [3]int32{4, 5, 6}, arr)
        })
@@ -128,7 +128,7 @@ func TestArraySliceInteroperability(t *testing.T) {
                assert.NoError(t, err)
 
                var slice []int64 // different type
-               err = f.Deserialize(data, &slice)
+               err = testDeserialize(t, f, data, &slice)
                // Strict checking means this should error immediately upon 
reading wrong TypeID
                assert.Error(t, err)
        })
@@ -141,7 +141,7 @@ func TestArraySliceInteroperability(t *testing.T) {
 
                // Deserialize into Array [3]int32 - should fail size check
                var arr [3]int32
-               err = f.Deserialize(data, &arr)
+               err = testDeserialize(t, f, data, &arr)
                // Serialized as list with len 2. Array expects 3.
                assert.Error(t, err)
                assert.Contains(t, err.Error(), "array length")
@@ -161,7 +161,7 @@ func TestFloat16Array(t *testing.T) {
                assert.NoError(t, err)
 
                var result [3]float16.Float16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, arr, result)
        })
diff --git a/go/fory/buffer.go b/go/fory/buffer.go
index f00833300..70d2fc535 100644
--- a/go/fory/buffer.go
+++ b/go/fory/buffer.go
@@ -20,6 +20,7 @@ package fory
 import (
        "encoding/binary"
        "fmt"
+       "io"
        "math"
        "unsafe"
 )
@@ -28,12 +29,86 @@ type ByteBuffer struct {
        data        []byte // Most accessed field first for cache locality
        writerIndex int
        readerIndex int
+       reader      io.Reader
+       bufferSize  int
 }
 
 func NewByteBuffer(data []byte) *ByteBuffer {
        return &ByteBuffer{data: data}
 }
 
+func NewByteBufferFromReader(r io.Reader, bufferSize int) *ByteBuffer {
+       if bufferSize <= 0 {
+               bufferSize = 4096
+       }
+       return &ByteBuffer{
+               data:       make([]byte, 0, bufferSize),
+               reader:     r,
+               bufferSize: bufferSize,
+       }
+}
+
+//go:noinline
+func (b *ByteBuffer) fill(n int, errOut *Error) bool {
+       if b.reader == nil {
+               if errOut != nil {
+                       *errOut = BufferOutOfBoundError(b.readerIndex, n, 
len(b.data))
+               }
+               return false
+       }
+
+       available := len(b.data) - b.readerIndex
+       if available >= n {
+               return true
+       }
+
+       if b.readerIndex > 0 {
+               copy(b.data, b.data[b.readerIndex:])
+               b.writerIndex -= b.readerIndex
+               b.readerIndex = 0
+               b.data = b.data[:b.writerIndex]
+       }
+
+       if cap(b.data) < n {
+               newCap := cap(b.data) * 2
+               if newCap < n {
+                       newCap = n
+               }
+               if newCap < b.bufferSize {
+                       newCap = b.bufferSize
+               }
+               newData := make([]byte, len(b.data), newCap)
+               copy(newData, b.data)
+               b.data = newData
+       }
+
+       for len(b.data) < n {
+               spare := b.data[len(b.data):cap(b.data)]
+               if len(spare) == 0 {
+                       return false
+               }
+               readBytes, err := b.reader.Read(spare)
+               if readBytes > 0 {
+                       b.data = b.data[:len(b.data)+readBytes]
+                       b.writerIndex += readBytes
+               }
+               if err != nil {
+                       if len(b.data) >= n {
+                               return true
+                       }
+                       if errOut != nil {
+                               if err == io.EOF {
+                                       *errOut = 
BufferOutOfBoundError(b.readerIndex, n, len(b.data))
+                               } else {
+                                       *errOut = 
DeserializationError(fmt.Sprintf("stream read error: %v", err))
+                               }
+                       }
+                       return false
+               }
+       }
+       return true
+}
+
 // grow ensures there's space for n more bytes. Hot path is inlined.
 //
 //go:inline
@@ -185,8 +260,9 @@ func (b *ByteBuffer) WriteBinary(p []byte) {
 //go:inline
 func (b *ByteBuffer) ReadBool(err *Error) bool {
        if b.readerIndex+1 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return false
+               if !b.fill(1, err) {
+                       return false
+               }
        }
        v := b.data[b.readerIndex]
        b.readerIndex++
@@ -198,8 +274,9 @@ func (b *ByteBuffer) ReadBool(err *Error) bool {
 //go:inline
 func (b *ByteBuffer) ReadByte(err *Error) byte {
        if b.readerIndex+1 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return 0
+               if !b.fill(1, err) {
+                       return 0
+               }
        }
        v := b.data[b.readerIndex]
        b.readerIndex++
@@ -211,8 +288,9 @@ func (b *ByteBuffer) ReadByte(err *Error) byte {
 //go:inline
 func (b *ByteBuffer) ReadInt8(err *Error) int8 {
        if b.readerIndex+1 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return 0
+               if !b.fill(1, err) {
+                       return 0
+               }
        }
        v := int8(b.data[b.readerIndex])
        b.readerIndex++
@@ -224,8 +302,9 @@ func (b *ByteBuffer) ReadInt8(err *Error) int8 {
 //go:inline
 func (b *ByteBuffer) ReadInt16(err *Error) int16 {
        if b.readerIndex+2 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data))
-               return 0
+               if !b.fill(2, err) {
+                       return 0
+               }
        }
        v := int16(binary.LittleEndian.Uint16(b.data[b.readerIndex:]))
        b.readerIndex += 2
@@ -237,8 +316,9 @@ func (b *ByteBuffer) ReadInt16(err *Error) int16 {
 //go:inline
 func (b *ByteBuffer) ReadUint16(err *Error) uint16 {
        if b.readerIndex+2 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data))
-               return 0
+               if !b.fill(2, err) {
+                       return 0
+               }
        }
        v := binary.LittleEndian.Uint16(b.data[b.readerIndex:])
        b.readerIndex += 2
@@ -250,8 +330,9 @@ func (b *ByteBuffer) ReadUint16(err *Error) uint16 {
 //go:inline
 func (b *ByteBuffer) ReadUint32(err *Error) uint32 {
        if b.readerIndex+4 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data))
-               return 0
+               if !b.fill(4, err) {
+                       return 0
+               }
        }
        i := binary.LittleEndian.Uint32(b.data[b.readerIndex:])
        b.readerIndex += 4
@@ -263,8 +344,9 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 {
 //go:inline
 func (b *ByteBuffer) ReadUint64(err *Error) uint64 {
        if b.readerIndex+8 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data))
-               return 0
+               if !b.fill(8, err) {
+                       return 0
+               }
        }
        i := binary.LittleEndian.Uint64(b.data[b.readerIndex:])
        b.readerIndex += 8
@@ -300,17 +382,46 @@ func (b *ByteBuffer) ReadFloat64(err *Error) float64 {
 }
 
 func (b *ByteBuffer) Read(p []byte) (n int, err error) {
+       if len(p) == 0 {
+               return 0, nil
+       }
+
+       if b.readerIndex+len(p) > len(b.data) && b.reader != nil {
+               var errOut Error
+               if !b.fill(len(p), &errOut) {
+                       copied := copy(p, b.data[b.readerIndex:])
+                       b.readerIndex += copied
+                       if errOut.Kind() == ErrKindBufferOutOfBound {
+                               return copied, io.EOF
+                       }
+                       return copied, errOut
+               }
+       }
+
        copied := copy(p, b.data[b.readerIndex:])
        b.readerIndex += copied
+       if copied == 0 {
+               return 0, io.EOF
+       }
        return copied, nil
 }
 
 // ReadBinary reads n bytes and sets error on bounds violation
 func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte {
        if b.readerIndex+length > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data))
-               return nil
+               if !b.fill(length, err) {
+                       return nil
+               }
        }
+
+       if b.reader != nil {
+               // In stream mode, compaction might overwrite these bytes, so 
we must copy
+               result := make([]byte, length)
+               copy(result, b.data[b.readerIndex:b.readerIndex+length])
+               b.readerIndex += length
+               return result
+       }
+
        v := b.data[b.readerIndex : b.readerIndex+length]
        b.readerIndex += length
        return v
@@ -360,13 +471,27 @@ func (b *ByteBuffer) SetReaderIndex(index int) {
 func (b *ByteBuffer) Reset() {
        b.readerIndex = 0
        b.writerIndex = 0
+       b.reader = nil
        // Keep the underlying buffer if it's reasonable sized to reduce 
allocations
        // Only nil it out if we want to release memory
        if cap(b.data) > 64*1024 {
                b.data = nil
        }
-       // Note: We keep b.data as-is (with its current length) to avoid issues
-       // with grow() needing to expand the slice on first write
+}
+
+func (b *ByteBuffer) ResetWithReader(r io.Reader, bufferSize int) {
+       b.readerIndex = 0
+       b.writerIndex = 0
+       b.reader = r
+       if bufferSize <= 0 {
+               bufferSize = 4096
+       }
+       b.bufferSize = bufferSize
+       if cap(b.data) < bufferSize {
+               b.data = make([]byte, 0, bufferSize)
+       } else {
+               b.data = b.data[:0]
+       }
 }
 
 // Reserve ensures buffer has at least n bytes available for writing from 
current position.
@@ -921,7 +1046,12 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) 
uint64 {
        var result uint64
        var shift uint
 
-       for b.readerIndex < len(b.data) {
+       for {
+               if b.readerIndex >= len(b.data) {
+                       if !b.fill(1, err) {
+                               return 0
+                       }
+               }
                byteVal := b.data[b.readerIndex]
                b.readerIndex++
                result |= uint64(byteVal&0x7F) << shift
@@ -934,8 +1064,6 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) 
uint64 {
                        return 0
                }
        }
-       *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-       return 0
 }
 
 // ReadVarint64 reads the varint encoded with zig-zag (compatible with Java's 
readVarint64).
@@ -975,8 +1103,9 @@ func (b *ByteBuffer) WriteTaggedInt64(value int64) {
 // Otherwise, skip flag byte and read 8 bytes as int64.
 func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 {
        if b.readerIndex+4 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data))
-               return 0
+               if !b.fill(4, err) {
+                       return 0
+               }
        }
        var i int32
        if isLittleEndian {
@@ -989,8 +1118,9 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 {
                return int64(i >> 1) // arithmetic right shift
        }
        if b.readerIndex+9 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data))
-               return 0
+               if !b.fill(9, err) {
+                       return 0
+               }
        }
        var value int64
        if isLittleEndian {
@@ -1026,8 +1156,9 @@ func (b *ByteBuffer) WriteTaggedUint64(value uint64) {
 // Otherwise, skip flag byte and read 8 bytes as uint64.
 func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 {
        if b.readerIndex+4 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data))
-               return 0
+               if !b.fill(4, err) {
+                       return 0
+               }
        }
        var i uint32
        if isLittleEndian {
@@ -1040,8 +1171,9 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 {
                return uint64(i >> 1)
        }
        if b.readerIndex+9 > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data))
-               return 0
+               if !b.fill(9, err) {
+                       return 0
+               }
        }
        var value uint64
        if isLittleEndian {
@@ -1122,8 +1254,9 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 
{
        var shift uint
        for i := 0; i < 8; i++ {
                if b.readerIndex >= len(b.data) {
-                       *err = BufferOutOfBoundError(b.readerIndex, 1, 
len(b.data))
-                       return 0
+                       if !b.fill(1, err) {
+                               return 0
+                       }
                }
                byteVal := b.data[b.readerIndex]
                b.readerIndex++
@@ -1134,8 +1267,9 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 
{
                shift += 7
        }
        if b.readerIndex >= len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return 0
+               if !b.fill(1, err) {
+                       return 0
+               }
        }
        byteVal := b.data[b.readerIndex]
        b.readerIndex++
@@ -1154,8 +1288,9 @@ func (b *ByteBuffer) remaining() int {
 //go:inline
 func (b *ByteBuffer) ReadUint8(err *Error) uint8 {
        if b.readerIndex >= len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return 0
+               if !b.fill(1, err) {
+                       return 0
+               }
        }
        v := b.data[b.readerIndex]
        b.readerIndex++
@@ -1276,8 +1411,9 @@ func (b *ByteBuffer) readVarUint32Slow(err *Error) uint32 
{
        var shift uint
        for {
                if b.readerIndex >= len(b.data) {
-                       *err = BufferOutOfBoundError(b.readerIndex, 1, 
len(b.data))
-                       return 0
+                       if !b.fill(1, err) {
+                               return 0
+                       }
                }
                byteVal := b.data[b.readerIndex]
                b.readerIndex++
@@ -1437,8 +1573,9 @@ func (b *ByteBuffer) UnsafePutTaggedUint64(offset int, 
value uint64) int {
 // ReadVarUint32Small7 reads a VarUint32 in small-7 format with error checking
 func (b *ByteBuffer) ReadVarUint32Small7(err *Error) uint32 {
        if b.readerIndex >= len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return 0
+               if !b.fill(1, err) {
+                       return 0
+               }
        }
        readIdx := b.readerIndex
        v := b.data[readIdx]
@@ -1486,47 +1623,24 @@ func (b *ByteBuffer) continueReadVarUint32(readIdx int, 
bulkRead, value uint32)
 }
 
 func (b *ByteBuffer) readVaruint36Slow(err *Error) uint64 {
-       if b.readerIndex >= len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-               return 0
-       }
-       b0 := b.data[b.readerIndex]
-       b.readerIndex++
-       result := uint64(b0 & 0x7F)
-       if b0&0x80 != 0 {
+       var shift uint
+       var result uint64
+       for {
                if b.readerIndex >= len(b.data) {
-                       *err = BufferOutOfBoundError(b.readerIndex, 1, 
len(b.data))
-                       return 0
-               }
-               b1 := b.data[b.readerIndex]
-               b.readerIndex++
-               result |= uint64(b1&0x7F) << 7
-               if b1&0x80 != 0 {
-                       if b.readerIndex >= len(b.data) {
-                               *err = BufferOutOfBoundError(b.readerIndex, 1, 
len(b.data))
+                       if !b.fill(1, err) {
                                return 0
                        }
-                       b2 := b.data[b.readerIndex]
-                       b.readerIndex++
-                       result |= uint64(b2&0x7F) << 14
-                       if b2&0x80 != 0 {
-                               if b.readerIndex >= len(b.data) {
-                                       *err = 
BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-                                       return 0
-                               }
-                               b3 := b.data[b.readerIndex]
-                               b.readerIndex++
-                               result |= uint64(b3&0x7F) << 21
-                               if b3&0x80 != 0 {
-                                       if b.readerIndex >= len(b.data) {
-                                               *err = 
BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
-                                               return 0
-                                       }
-                                       b4 := b.data[b.readerIndex]
-                                       b.readerIndex++
-                                       result |= uint64(b4) << 28
-                               }
-                       }
+               }
+               b0 := b.data[b.readerIndex]
+               b.readerIndex++
+               result |= uint64(b0&0x7F) << shift
+               if b0&0x80 == 0 {
+                       break
+               }
+               shift += 7
+               if shift >= 35 {
+                       *err = DeserializationError("varuint36 overflow")
+                       return 0
                }
        }
        return result
@@ -1549,9 +1663,19 @@ func (b *ByteBuffer) IncreaseReaderIndex(n int) {
 // ReadBytes reads n bytes and sets error on bounds violation
 func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte {
        if b.readerIndex+n > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data))
-               return nil
+               if !b.fill(n, err) {
+                       return nil
+               }
        }
+
+       if b.reader != nil {
+               // In stream mode, compaction might overwrite these bytes, so 
we must copy
+               result := make([]byte, n)
+               copy(result, b.data[b.readerIndex:b.readerIndex+n])
+               b.readerIndex += n
+               return result
+       }
+
        p := b.data[b.readerIndex : b.readerIndex+n]
        b.readerIndex += n
        return p
@@ -1560,8 +1684,20 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte 
{
 // Skip skips n bytes and sets error on bounds violation
 func (b *ByteBuffer) Skip(length int, err *Error) {
        if b.readerIndex+length > len(b.data) {
-               *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data))
-               return
+               if !b.fill(length, err) {
+                       return
+               }
        }
        b.readerIndex += length
 }
+
+// CheckReadable ensures that at least n bytes are available to read.
+// In stream mode, it will attempt to fill the buffer if necessary.
+//
+//go:inline
+func (b *ByteBuffer) CheckReadable(n int, err *Error) bool {
+       if b.readerIndex+n > len(b.data) {
+               return b.fill(n, err)
+       }
+       return true
+}
diff --git a/go/fory/primitive_test.go b/go/fory/primitive_test.go
index 0f478590e..f5bf7b08c 100644
--- a/go/fory/primitive_test.go
+++ b/go/fory/primitive_test.go
@@ -34,7 +34,7 @@ func TestFloat16Primitive(t *testing.T) {
        require.NoError(t, err)
 
        var res float16.Float16
-       err = f.Deserialize(data, &res)
+       err = testDeserialize(t, f, data, &res)
        require.NoError(t, err)
 
        require.True(t, f16.Equal(res))
@@ -53,7 +53,7 @@ func TestFloat16PrimitiveSliceDirect(t *testing.T) {
        require.NoError(t, err)
 
        var resSlice []float16.Float16
-       err = f.Deserialize(data, &resSlice)
+       err = testDeserialize(t, f, data, &resSlice)
        require.NoError(t, err)
        require.Equal(t, slice, resSlice)
 }
@@ -67,7 +67,7 @@ func TestBFloat16Primitive(t *testing.T) {
        require.NoError(t, err)
 
        var res bfloat16.BFloat16
-       err = f.Deserialize(data, &res)
+       err = testDeserialize(t, f, data, &res)
        require.NoError(t, err)
 
        require.Equal(t, bf16.Bits(), res.Bits())
@@ -86,7 +86,7 @@ func TestBFloat16PrimitiveSliceDirect(t *testing.T) {
        require.NoError(t, err)
 
        var resSlice []bfloat16.BFloat16
-       err = f.Deserialize(data, &resSlice)
+       err = testDeserialize(t, f, data, &resSlice)
        require.NoError(t, err)
        require.Equal(t, slice, resSlice)
 }
diff --git a/go/fory/reader.go b/go/fory/reader.go
index e7a1df171..a0a37d92f 100644
--- a/go/fory/reader.go
+++ b/go/fory/reader.go
@@ -83,6 +83,7 @@ func (c *ReadContext) SetData(data []byte) {
                c.buffer.data = data
                c.buffer.readerIndex = 0
                c.buffer.writerIndex = len(data)
+               c.buffer.reader = nil
        }
 }
 
diff --git a/go/fory/slice_primitive_test.go b/go/fory/slice_primitive_test.go
index e03a4cb00..b072078e3 100644
--- a/go/fory/slice_primitive_test.go
+++ b/go/fory/slice_primitive_test.go
@@ -39,7 +39,7 @@ func TestFloat16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []float16.Float16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -50,7 +50,7 @@ func TestFloat16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []float16.Float16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -62,7 +62,7 @@ func TestFloat16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []float16.Float16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -81,7 +81,7 @@ func TestBFloat16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []bfloat16.BFloat16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -92,7 +92,7 @@ func TestBFloat16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []bfloat16.BFloat16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -104,7 +104,7 @@ func TestBFloat16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []bfloat16.BFloat16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -119,7 +119,7 @@ func TestIntSlice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -130,7 +130,7 @@ func TestIntSlice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -142,7 +142,7 @@ func TestIntSlice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -157,7 +157,7 @@ func TestUintSlice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -168,7 +168,7 @@ func TestUintSlice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -180,7 +180,7 @@ func TestUintSlice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -195,7 +195,7 @@ func TestInt8Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int8
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -206,7 +206,7 @@ func TestInt8Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int8
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -218,7 +218,7 @@ func TestInt8Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int8
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -233,7 +233,7 @@ func TestInt16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -244,7 +244,7 @@ func TestInt16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -256,7 +256,7 @@ func TestInt16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -271,7 +271,7 @@ func TestInt32Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -282,7 +282,7 @@ func TestInt32Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -294,7 +294,7 @@ func TestInt32Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -309,7 +309,7 @@ func TestInt64Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -320,7 +320,7 @@ func TestInt64Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -332,7 +332,7 @@ func TestInt64Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []int64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -347,7 +347,7 @@ func TestUint16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -358,7 +358,7 @@ func TestUint16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -370,7 +370,7 @@ func TestUint16Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint16
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -385,7 +385,7 @@ func TestUint32Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -396,7 +396,7 @@ func TestUint32Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -408,7 +408,7 @@ func TestUint32Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint32
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
@@ -423,7 +423,7 @@ func TestUint64Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Equal(t, slice, result)
        })
@@ -434,7 +434,7 @@ func TestUint64Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.NotNil(t, result)
                assert.Empty(t, result)
@@ -446,7 +446,7 @@ func TestUint64Slice(t *testing.T) {
                assert.NoError(t, err)
 
                var result []uint64
-               err = f.Deserialize(data, &result)
+               err = testDeserialize(t, f, data, &result)
                assert.NoError(t, err)
                assert.Nil(t, result)
        })
diff --git a/go/fory/stream.go b/go/fory/stream.go
new file mode 100644
index 000000000..420a6d482
--- /dev/null
+++ b/go/fory/stream.go
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fory
+
+import (
+       "io"
+       "reflect"
+)
+
+// InputStream supports robust sequential deserialization from a stream.
+// It maintains the ByteBuffer and ReadContext state across multiple 
Deserialize calls,
+// preventing data loss from prefetched buffers and preserving TypeResolver 
metadata
+// (Meta Sharing) across object boundaries.
+type InputStream struct {
+       buffer *ByteBuffer
+}
+
+// NewInputStream creates a new InputStream that reads from the provided 
io.Reader.
+// The InputStream owns the buffer and maintains state across sequential 
Deserialize calls.
+func NewInputStream(r io.Reader) *InputStream {
+       return NewInputStreamWithBufferSize(r, 0)
+}
+
+// NewInputStreamWithBufferSize creates a new InputStream with a specified 
minimum buffer size.
+func NewInputStreamWithBufferSize(r io.Reader, bufferSize int) *InputStream {
+       buf := NewByteBufferFromReader(r, bufferSize)
+       return &InputStream{
+               buffer: buf,
+       }
+}
+
+// Shrink compacts the internal buffer, dropping already-read bytes to reclaim 
memory.
+// It applies a heuristic to avoid tiny frequent compactions and reallocates 
the backing
+// slice if the capacity becomes excessively large compared to the remaining 
data.
+func (is *InputStream) Shrink() {
+       b := is.buffer
+       if b == nil {
+               return
+       }
+
+       readPos := b.readerIndex
+       // Best-effort policy: keep a 4096-byte floor to avoid tiny frequent 
compactions
+       if readPos <= 4096 || readPos < b.bufferSize {
+               return
+       }
+
+       remaining := b.writerIndex - readPos
+       currentCapacity := cap(b.data)
+       targetCapacity := currentCapacity
+
+       if currentCapacity > b.bufferSize {
+               if remaining == 0 {
+                       targetCapacity = b.bufferSize
+               } else if remaining <= currentCapacity/4 {
+                       doubled := remaining * 2
+                       targetCapacity = doubled
+                       if targetCapacity < b.bufferSize {
+                               targetCapacity = b.bufferSize
+                       }
+               }
+       }
+
+       if targetCapacity < currentCapacity {
+               // Actually reclaim memory by copying to a new, smaller slice
+               newData := make([]byte, remaining, targetCapacity)
+               copy(newData, b.data[readPos:b.writerIndex])
+               b.data = newData
+               b.writerIndex = remaining
+               b.readerIndex = 0
+       } else if readPos > 0 {
+               // Just compact without reallocating
+               copy(b.data, b.data[readPos:b.writerIndex])
+               b.writerIndex = remaining
+               b.readerIndex = 0
+               b.data = b.data[:remaining]
+       }
+}
+
+// DeserializeFromStream reads the next object from the stream into the 
provided value.
+// It uses a shared ReadContext for the lifetime of the InputStream, clearing
+// temporary state between calls but preserving the buffer and TypeResolver 
state.
+func (f *Fory) DeserializeFromStream(is *InputStream, v any) error {
+
+       // We only reset the temporary read state (like refTracker and 
outOfBand buffers),
+       // NOT the buffer or the type mapping, which must persist.
+       defer func() {
+               f.readCtx.refReader.Reset()
+               f.readCtx.outOfBandBuffers = nil
+               f.readCtx.outOfBandIndex = 0
+               f.readCtx.err = Error{}
+               if f.readCtx.refResolver != nil {
+                       f.readCtx.refResolver.resetRead()
+               }
+       }()
+
+       // Temporarily swap buffer
+       origBuffer := f.readCtx.buffer
+       f.readCtx.buffer = is.buffer
+
+       isNull := readHeader(f.readCtx)
+       if f.readCtx.HasError() {
+               f.readCtx.buffer = origBuffer
+               return f.readCtx.TakeError()
+       }
+
+       if isNull {
+               f.readCtx.buffer = origBuffer
+               return nil
+       }
+
+       target := reflect.ValueOf(v).Elem()
+       f.readCtx.ReadValue(target, RefModeTracking, true)
+       if f.readCtx.HasError() {
+               f.readCtx.buffer = origBuffer
+               return f.readCtx.TakeError()
+       }
+
+       // Restore original buffer
+       f.readCtx.buffer = origBuffer
+
+       return nil
+}
+
+// DeserializeFromReader deserializes a single object from a stream.
+// It is strictly stateless: the buffer and all read state are always reset 
before
+// each call, discarding any prefetched data and type metadata.
+// For sequential multi-object reads on the same stream, use NewInputStream 
instead.
+func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
+       defer f.resetReadState()
+       // Always reset to enforce stateless semantics.
+       f.readCtx.buffer.ResetWithReader(r, 0)
+
+       isNull := readHeader(f.readCtx)
+       if f.readCtx.HasError() {
+               return f.readCtx.TakeError()
+       }
+
+       if isNull {
+               return nil
+       }
+
+       target := reflect.ValueOf(v).Elem()
+       f.readCtx.ReadValue(target, RefModeTracking, true)
+       if f.readCtx.HasError() {
+               return f.readCtx.TakeError()
+       }
+
+       return nil
+}
diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go
new file mode 100644
index 000000000..098254587
--- /dev/null
+++ b/go/fory/stream_test.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fory
+
+import (
+       "bytes"
+       "io"
+       "testing"
+)
+
+type StreamTestStruct struct {
+       ID   int32
+       Name string
+       Data []byte
+}
+
+func TestStreamDeserialization(t *testing.T) {
+       f := New()
+       f.RegisterStruct(&StreamTestStruct{}, 100)
+
+       original := &StreamTestStruct{
+               ID:   42,
+               Name: "Stream Test",
+               Data: []byte{1, 2, 3, 4, 5},
+       }
+
+       data, err := f.Serialize(original)
+       if err != nil {
+               t.Fatalf("Serialize failed: %v", err)
+       }
+
+       // 1. Test normal reader
+       reader := bytes.NewReader(data)
+       var decoded StreamTestStruct
+       err = f.DeserializeFromReader(reader, &decoded)
+       if err != nil {
+               t.Fatalf("DeserializeFromReader failed: %v", err)
+       }
+
+       if decoded.ID != original.ID || decoded.Name != original.Name || 
!bytes.Equal(decoded.Data, original.Data) {
+               t.Errorf("Decoded value mismatch. Got: %+v, Want: %+v", 
decoded, original)
+       }
+}
+
+// slowReader returns data byte by byte to test fill() logic and compaction
+type slowReader struct {
+       data []byte
+       pos  int
+}
+
+func (r *slowReader) Read(p []byte) (n int, err error) {
+       if r.pos >= len(r.data) {
+               return 0, io.EOF
+       }
+       if len(p) == 0 {
+               return 0, nil
+       }
+       p[0] = r.data[r.pos]
+       r.pos++
+       return 1, nil
+}
+
+func TestStreamDeserializationSlow(t *testing.T) {
+       f := New()
+       f.RegisterStruct(&StreamTestStruct{}, 100)
+
+       original := &StreamTestStruct{
+               ID:   42,
+               Name: "Slow Stream Test with a reasonably long string and some 
data to trigger multiple fills",
+               Data: bytes.Repeat([]byte{0xAA}, 100),
+       }
+
+       data, err := f.Serialize(original)
+       if err != nil {
+               t.Fatalf("Serialize failed: %v", err)
+       }
+
+       // Test with slow reader and small bufferSize to force compaction/growth
+       reader := &slowReader{data: data}
+       var decoded StreamTestStruct
+
+       // Create an InputStream with a small bufferSize (16) to force frequent 
fills and compactions
+       stream := NewInputStreamWithBufferSize(reader, 16)
+       err = f.DeserializeFromStream(stream, &decoded)
+       if err != nil {
+               t.Fatalf("DeserializeFromReader (slow) failed: %v", err)
+       }
+
+       if decoded.ID != original.ID || decoded.Name != original.Name || 
!bytes.Equal(decoded.Data, original.Data) {
+               t.Errorf("Decoded value mismatch (slow). Got: %+v, Want: %+v", 
decoded, original)
+       }
+}
+
+func TestStreamDeserializationEOF(t *testing.T) {
+       f := New()
+       f.RegisterStruct(&StreamTestStruct{}, 100)
+
+       original := &StreamTestStruct{
+               ID:   42,
+               Name: "EOF Test",
+       }
+
+       data, err := f.Serialize(original)
+       if err != nil {
+               t.Fatalf("Serialize failed: %v", err)
+       }
+
+       // Truncate data to cause unexpected EOF during reading Name
+       truncated := data[:len(data)-2]
+       reader := bytes.NewReader(truncated)
+       var decoded StreamTestStruct
+       err = f.DeserializeFromReader(reader, &decoded)
+       if err == nil {
+               t.Fatal("Expected error on truncated stream, got nil")
+       }
+
+       // Ideally it should be a BufferOutOfBoundError
+       if _, ok := err.(Error); !ok {
+               t.Errorf("Expected fory.Error, got %T: %v", err, err)
+       }
+}
+
+func TestInputStreamSequential(t *testing.T) {
+       f := New()
+       // Register type in compatible mode to test Meta Sharing across 
sequential reads
+       f.config.Compatible = true
+       f.RegisterStruct(&StreamTestStruct{}, 100)
+
+       msg1 := &StreamTestStruct{ID: 1, Name: "Msg 1", Data: []byte{1, 1}}
+       msg2 := &StreamTestStruct{ID: 2, Name: "Msg 2", Data: []byte{2, 2}}
+       msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}}
+
+       var buf bytes.Buffer
+
+       // Serialize sequentially into one stream
+       data1, _ := f.Serialize(msg1)
+       buf.Write(data1)
+       data2, _ := f.Serialize(msg2)
+       buf.Write(data2)
+       data3, _ := f.Serialize(msg3)
+       buf.Write(data3)
+
+       fDec := New()
+       fDec.config.Compatible = true
+       fDec.RegisterStruct(&StreamTestStruct{}, 100)
+
+       // Create a InputStream
+       sr := NewInputStream(&buf)
+
+       // Deserialize sequentially
+       var out1, out2, out3 StreamTestStruct
+
+       err := fDec.DeserializeFromStream(sr, &out1)
+       if err != nil {
+               t.Fatalf("Deserialize 1 failed: %v", err)
+       }
+       if out1.ID != msg1.ID || out1.Name != msg1.Name || 
!bytes.Equal(out1.Data, msg1.Data) {
+               t.Errorf("Msg 1 mismatch. Got: %+v, Want: %+v", out1, msg1)
+       }
+
+       err = fDec.DeserializeFromStream(sr, &out2)
+       if err != nil {
+               t.Fatalf("Deserialize 2 failed: %v", err)
+       }
+       if out2.ID != msg2.ID || out2.Name != msg2.Name || 
!bytes.Equal(out2.Data, msg2.Data) {
+               t.Errorf("Msg 2 mismatch. Got: %+v, Want: %+v", out2, msg2)
+       }
+
+       err = fDec.DeserializeFromStream(sr, &out3)
+       if err != nil {
+               t.Fatalf("Deserialize 3 failed: %v", err)
+       }
+       if out3.ID != msg3.ID || out3.Name != msg3.Name || 
!bytes.Equal(out3.Data, msg3.Data) {
+               t.Errorf("Msg 3 mismatch. Got: %+v, Want: %+v", out3, msg3)
+       }
+}
+
+func TestInputStreamShrink(t *testing.T) {
+       // Create a large payload that easily escapes the bufferSize (4096)
+       data := make([]byte, 10000)
+       for i := range data {
+               data[i] = byte(i % 256)
+       }
+
+       // Create a stream reader with a tiny bufferSize so we can trigger 
Shrink reliably
+       buf := bytes.NewReader(data)
+       sr := NewInputStreamWithBufferSize(buf, 100)
+
+       // Force a read/fill to pull a chunk into memory
+       err := sr.buffer.fill(5000, nil)
+       if !err {
+               t.Fatalf("Failed to fill buffer")
+       }
+
+       // Fake an artificial read that consumed a massive portion of the buffer
+       originalCapacity := cap(sr.buffer.data)
+       sr.buffer.readerIndex = 4500
+
+       // Trigger Shrink
+       sr.Shrink()
+
+       // 1. Validate reader index was successfully reset
+       if sr.buffer.readerIndex != 0 {
+               t.Errorf("Expected readerIndex to reset to 0, got %d", 
sr.buffer.readerIndex)
+       }
+
+       // 2. Validate the capacity actually shrank (reclaimed memory)
+       newCapacity := cap(sr.buffer.data)
+       if newCapacity >= originalCapacity {
+               t.Errorf("Expected capacity to shrink (was %d, now %d)", 
originalCapacity, newCapacity)
+       }
+
+       // 3. Validate the remaining unread data remained intact
+       if sr.buffer.writerIndex != 500 {
+               t.Errorf("Expected writerIndex to be 500 remaining bytes, got 
%d", sr.buffer.writerIndex)
+       }
+       for i := 0; i < 500; i++ {
+               if sr.buffer.data[i] != byte((4500+i)%256) {
+                       t.Errorf("Data corruption post-shrink at index %d", i)
+                       break
+               }
+       }
+}
diff --git a/go/fory/struct.go b/go/fory/struct.go
index 2cd21991b..52e53ad99 100644
--- a/go/fory/struct.go
+++ b/go/fory/struct.go
@@ -1424,6 +1424,11 @@ func (s *structSerializer) ReadData(ctx *ReadContext, 
value reflect.Value) {
 
        // Phase 1: Fixed-size primitives (inline unsafe reads with endian 
handling)
        if s.fieldGroup.FixedSize > 0 {
+               var errOut Error
+               if !buf.CheckReadable(int(s.fieldGroup.FixedSize), &errOut) {
+                       ctx.SetError(errOut)
+                       return
+               }
                baseOffset := buf.ReaderIndex()
                data := buf.GetData()
 
diff --git a/go/fory/struct_test.go b/go/fory/struct_test.go
index d97bd1f3b..430babdcf 100644
--- a/go/fory/struct_test.go
+++ b/go/fory/struct_test.go
@@ -56,7 +56,7 @@ func TestUnsignedTypeSerialization(t *testing.T) {
        }
 
        var result any
-       err = f.Deserialize(data, &result)
+       err = testDeserialize(t, f, data, &result)
        if err != nil {
                t.Fatalf("Deserialize failed: %v", err)
        }
@@ -104,7 +104,7 @@ func TestOptionFieldSerialization(t *testing.T) {
        require.NoError(t, err)
 
        var result any
-       err = f.Deserialize(data, &result)
+       err = testDeserialize(t, f, data, &result)
        require.NoError(t, err)
 
        out := result.(*OptionStruct)
@@ -357,7 +357,7 @@ func TestSetFieldSerializationSchemaConsistent(t 
*testing.T) {
 
        // Deserialize
        var result any
-       err = f.Deserialize(data, &result)
+       err = testDeserialize(t, f, data, &result)
        require.NoError(t, err, "Deserialize failed")
 
        resultObj := result.(*SetFieldsStruct)
@@ -404,7 +404,7 @@ func TestSetFieldSerializationCompatible(t *testing.T) {
 
        // Deserialize
        var result any
-       err = f.Deserialize(data, &result)
+       err = testDeserialize(t, f, data, &result)
        require.NoError(t, err, "Deserialize failed")
 
        resultObj := result.(*SetFieldsStruct)
@@ -525,7 +525,7 @@ func TestFloat16StructField(t *testing.T) {
 
        // Create new instance
        res := &StructWithFloat16{}
-       err = f.Deserialize(data, res)
+       err = testDeserialize(t, f, data, res)
        require.NoError(t, err)
 
        // Verify
diff --git a/go/fory/test_helper_test.go b/go/fory/test_helper_test.go
new file mode 100644
index 000000000..73106acf2
--- /dev/null
+++ b/go/fory/test_helper_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fory
+
+import (
+       "io"
+       "reflect"
+       "testing"
+)
+
+// oneByteReader returns data byte by byte to ensure aggressively that all
+// `fill()` boundaries, loops, and buffering conditions are tested.
+type oneByteReader struct {
+       data []byte
+       pos  int
+}
+
+func (r *oneByteReader) Read(p []byte) (n int, err error) {
+       if r.pos >= len(r.data) {
+               return 0, io.EOF
+       }
+       if len(p) == 0 {
+               return 0, nil
+       }
+       p[0] = r.data[r.pos]
+       r.pos++
+       return 1, nil
+}
+
+// testDeserialize is a testing helper that performs standard in-memory
+// deserialization and additionally wraps the payload in a slow one-byte
+// stream reader to verify that stream decoding handles fragmented reads 
correctly.
+func testDeserialize(t *testing.T, f *Fory, data []byte, v any) error {
+       t.Helper()
+
+       // 1. First, deserialize from bytes (the fast path)
+       err := f.Deserialize(data, v)
+       if err != nil {
+               return err
+       }
+
+       // 2. Deserialize from oneByteReader (the slow stream path)
+       // We create a new instance of the same target type to ensure clean 
state
+       vType := reflect.TypeOf(v)
+       if vType == nil || vType.Kind() != reflect.Ptr {
+               t.Fatalf("testDeserialize requires a pointer to a value, got 
%v", vType)
+       }
+
+       freshV := reflect.New(vType.Elem()).Interface()
+
+       stream := &oneByteReader{data: data, pos: 0}
+
+       // Create a new stream reader. The stream context handles boundaries 
and compactions.
+       streamReader := NewInputStream(stream)
+       err = f.DeserializeFromStream(streamReader, freshV)
+       if err != nil {
+               t.Fatalf("Stream deserialization via OneByteStream failed: %v", 
err)
+       }
+
+       // 3. Compare the two results
+       if !reflect.DeepEqual(v, freshV) {
+               t.Fatalf("Stream deserialization mismatched byte 
deserialization.\nExpected: %+v\nGot:      %+v", v, freshV)
+       }
+
+       // Returns the original error from standard deserialization
+       return err
+}
diff --git a/go/fory/type_def.go b/go/fory/type_def.go
index da669ccc3..bd55de459 100644
--- a/go/fory/type_def.go
+++ b/go/fory/type_def.go
@@ -291,7 +291,7 @@ func skipTypeDef(buffer *ByteBuffer, header int64, err 
*Error) {
        if sz == META_SIZE_MASK {
                sz += int(buffer.ReadVarUint32(err))
        }
-       buffer.IncreaseReaderIndex(sz)
+       buffer.Skip(sz, err)
 }
 
 const BIG_NAME_THRESHOLD = 0b111111 // 6 bits for size when using 2 bits for 
encoding


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to