This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new af6e8b25e feat(go): add go desrialization support via io streams
(#3374)
af6e8b25e is described below
commit af6e8b25e8c2d188cba2d99ccb276c47ec6c9c31
Author: Ayush Kumar <[email protected]>
AuthorDate: Fri Mar 6 20:15:47 2026 +0530
feat(go): add go desrialization support via io streams (#3374)
## Why?
To enable stream-based deserialization in Fory's Go library, allowing
for direct reading from `io.Reader` without pre-buffering the entire
payload. This improves efficiency for network and file-based transport
and brings the Go implementation into feature-parity with the python and
C++ libraries.
## What does this PR do?
### 1. Stream Infrastructure in `go/fory/buffer.go`
Enhanced `ByteBuffer` to support `io.Reader` with an internal sliding
window and automatic filling.
- Added `reader io.Reader` and `minCap int` fields.
- Implemented `fill(n int, err *Error) bool` for on-demand data fetching
and buffer compaction.
- Added `CheckReadable(n)` and `Skip(n)` memory-safe routines that pull
from the underlying stream when necessary to avoid out-of-bounds panics.
- Updated `ReadBinary` and `ReadBytes` to safely copy slices when
streaming to prevent silent data corruption on compaction.
- Updated all `Read*` methods (fixed-size, varint, tagged) to fetch data
from the reader safely if not cached.
### 2. Stateful InputStream in `go/fory/stream.go`
Added the `InputStream` feature to support true, stateful sequential
stream reads.
- Introduced `InputStream` which persists the buffered byte window and
TypeResolver metadata (Meta Sharing) across multiple object decodes on
the same stream, decoupled from `Fory` to mirror the C++
`ForyInputStream` implementation.
- Added `fory.DeserializeFromStream(is, target)` method to process
continuous streamed data.
- Added `Shrink()` method to compact the internal buffer and reclaim
memory during long-lived streams.
- Added `DeserializeFromReader` method as an API for simple *one-off*
stream object reads.
### 3. Stream-Safe Deserialization Paths
Updated internal deserialization pipelines in `struct.go` and
`type_def.go` to be stream-safe:
- Integrated `CheckReadable` bounds-checking into the `struct.go` fast
paths for fixed-size primitives.
- Safely rewrote schema-evolution skips (`skipTypeDef`) in `type_def.go`
to use bounds-checked `Skip()` rather than unbounded `readerIndex`
overrides.
### 4. Comprehensive Stream Tests
- Built a custom `oneByteReader` wrapper (`go/fory/test_helper_test.go`)
that artificially feeds the deserialization engine exactly 1 byte at a
time.
- Migrated the global test suite (`struct_test.go`, `primitive_test.go`,
`slice_primitive_test.go`, etc.) to run all standard tests through this
aggressive 1-byte fragmented stream reader via a new `testDeserialize`
helper to guarantee total stream robustness.
## Related issues
Closes #3302
## Does this PR introduce any user-facing change?
- [x] Does this PR introduce any public API change? (`NewInputStream`,
`DeserializeFromStream`, `DeserializeFromReader`,
`NewByteBufferFromReader`)
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
Main branch -
<img width="1304" height="390" alt="image"
src="https://github.com/user-attachments/assets/c07e1f07-9284-4164-a166-73bb1f82018e"
/>
This branch -
<img width="1304" height="390" alt="image"
src="https://github.com/user-attachments/assets/c716d01e-7738-4958-8d80-25d3d84ab906"
/>
---
go/fory/array_primitive_test.go | 18 +--
go/fory/buffer.go | 298 +++++++++++++++++++++++++++++-----------
go/fory/primitive_test.go | 8 +-
go/fory/reader.go | 1 +
go/fory/slice_primitive_test.go | 66 ++++-----
go/fory/stream.go | 164 ++++++++++++++++++++++
go/fory/stream_test.go | 238 ++++++++++++++++++++++++++++++++
go/fory/struct.go | 5 +
go/fory/struct_test.go | 10 +-
go/fory/test_helper_test.go | 82 +++++++++++
go/fory/type_def.go | 2 +-
11 files changed, 759 insertions(+), 133 deletions(-)
diff --git a/go/fory/array_primitive_test.go b/go/fory/array_primitive_test.go
index e8c99fb2f..c1b989efc 100644
--- a/go/fory/array_primitive_test.go
+++ b/go/fory/array_primitive_test.go
@@ -36,7 +36,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
assert.NoError(t, err)
var result [3]uint16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, arr, result)
})
@@ -48,7 +48,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
assert.NoError(t, err)
var result [3]uint32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, arr, result)
})
@@ -60,7 +60,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
assert.NoError(t, err)
var result [3]uint64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, arr, result)
})
@@ -86,7 +86,7 @@ func TestPrimitiveArraySerializer(t *testing.T) {
assert.NoError(t, err)
var result [3]bfloat16.BFloat16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, arr, result)
})
@@ -103,7 +103,7 @@ func TestArraySliceInteroperability(t *testing.T) {
// Deserialize into Slice []int32
var slice []int32
- err = f.Deserialize(data, &slice)
+ err = testDeserialize(t, f, data, &slice)
assert.NoError(t, err)
assert.Equal(t, []int32{1, 2, 3}, slice)
})
@@ -116,7 +116,7 @@ func TestArraySliceInteroperability(t *testing.T) {
// Deserialize into Array [3]int32
var arr [3]int32
- err = f.Deserialize(data, &arr)
+ err = testDeserialize(t, f, data, &arr)
assert.NoError(t, err)
assert.Equal(t, [3]int32{4, 5, 6}, arr)
})
@@ -128,7 +128,7 @@ func TestArraySliceInteroperability(t *testing.T) {
assert.NoError(t, err)
var slice []int64 // different type
- err = f.Deserialize(data, &slice)
+ err = testDeserialize(t, f, data, &slice)
// Strict checking means this should error immediately upon
reading wrong TypeID
assert.Error(t, err)
})
@@ -141,7 +141,7 @@ func TestArraySliceInteroperability(t *testing.T) {
// Deserialize into Array [3]int32 - should fail size check
var arr [3]int32
- err = f.Deserialize(data, &arr)
+ err = testDeserialize(t, f, data, &arr)
// Serialized as list with len 2. Array expects 3.
assert.Error(t, err)
assert.Contains(t, err.Error(), "array length")
@@ -161,7 +161,7 @@ func TestFloat16Array(t *testing.T) {
assert.NoError(t, err)
var result [3]float16.Float16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, arr, result)
})
diff --git a/go/fory/buffer.go b/go/fory/buffer.go
index f00833300..70d2fc535 100644
--- a/go/fory/buffer.go
+++ b/go/fory/buffer.go
@@ -20,6 +20,7 @@ package fory
import (
"encoding/binary"
"fmt"
+ "io"
"math"
"unsafe"
)
@@ -28,12 +29,86 @@ type ByteBuffer struct {
data []byte // Most accessed field first for cache locality
writerIndex int
readerIndex int
+ reader io.Reader
+ bufferSize int
}
func NewByteBuffer(data []byte) *ByteBuffer {
return &ByteBuffer{data: data}
}
+func NewByteBufferFromReader(r io.Reader, bufferSize int) *ByteBuffer {
+ if bufferSize <= 0 {
+ bufferSize = 4096
+ }
+ return &ByteBuffer{
+ data: make([]byte, 0, bufferSize),
+ reader: r,
+ bufferSize: bufferSize,
+ }
+}
+
+//go:noinline
+func (b *ByteBuffer) fill(n int, errOut *Error) bool {
+ if b.reader == nil {
+ if errOut != nil {
+ *errOut = BufferOutOfBoundError(b.readerIndex, n,
len(b.data))
+ }
+ return false
+ }
+
+ available := len(b.data) - b.readerIndex
+ if available >= n {
+ return true
+ }
+
+ if b.readerIndex > 0 {
+ copy(b.data, b.data[b.readerIndex:])
+ b.writerIndex -= b.readerIndex
+ b.readerIndex = 0
+ b.data = b.data[:b.writerIndex]
+ }
+
+ if cap(b.data) < n {
+ newCap := cap(b.data) * 2
+ if newCap < n {
+ newCap = n
+ }
+ if newCap < b.bufferSize {
+ newCap = b.bufferSize
+ }
+ newData := make([]byte, len(b.data), newCap)
+ copy(newData, b.data)
+ b.data = newData
+ }
+
+ for len(b.data) < n {
+ spare := b.data[len(b.data):cap(b.data)]
+ if len(spare) == 0 {
+ return false
+ }
+ readBytes, err := b.reader.Read(spare)
+ if readBytes > 0 {
+ b.data = b.data[:len(b.data)+readBytes]
+ b.writerIndex += readBytes
+ }
+ if err != nil {
+ if len(b.data) >= n {
+ return true
+ }
+ if errOut != nil {
+ if err == io.EOF {
+ *errOut =
BufferOutOfBoundError(b.readerIndex, n, len(b.data))
+ } else {
+ *errOut =
DeserializationError(fmt.Sprintf("stream read error: %v", err))
+ }
+ }
+ return false
+ }
+ }
+ return true
+}
+
// grow ensures there's space for n more bytes. Hot path is inlined.
//
//go:inline
@@ -185,8 +260,9 @@ func (b *ByteBuffer) WriteBinary(p []byte) {
//go:inline
func (b *ByteBuffer) ReadBool(err *Error) bool {
if b.readerIndex+1 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return false
+ if !b.fill(1, err) {
+ return false
+ }
}
v := b.data[b.readerIndex]
b.readerIndex++
@@ -198,8 +274,9 @@ func (b *ByteBuffer) ReadBool(err *Error) bool {
//go:inline
func (b *ByteBuffer) ReadByte(err *Error) byte {
if b.readerIndex+1 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
v := b.data[b.readerIndex]
b.readerIndex++
@@ -211,8 +288,9 @@ func (b *ByteBuffer) ReadByte(err *Error) byte {
//go:inline
func (b *ByteBuffer) ReadInt8(err *Error) int8 {
if b.readerIndex+1 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
v := int8(b.data[b.readerIndex])
b.readerIndex++
@@ -224,8 +302,9 @@ func (b *ByteBuffer) ReadInt8(err *Error) int8 {
//go:inline
func (b *ByteBuffer) ReadInt16(err *Error) int16 {
if b.readerIndex+2 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data))
- return 0
+ if !b.fill(2, err) {
+ return 0
+ }
}
v := int16(binary.LittleEndian.Uint16(b.data[b.readerIndex:]))
b.readerIndex += 2
@@ -237,8 +316,9 @@ func (b *ByteBuffer) ReadInt16(err *Error) int16 {
//go:inline
func (b *ByteBuffer) ReadUint16(err *Error) uint16 {
if b.readerIndex+2 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data))
- return 0
+ if !b.fill(2, err) {
+ return 0
+ }
}
v := binary.LittleEndian.Uint16(b.data[b.readerIndex:])
b.readerIndex += 2
@@ -250,8 +330,9 @@ func (b *ByteBuffer) ReadUint16(err *Error) uint16 {
//go:inline
func (b *ByteBuffer) ReadUint32(err *Error) uint32 {
if b.readerIndex+4 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data))
- return 0
+ if !b.fill(4, err) {
+ return 0
+ }
}
i := binary.LittleEndian.Uint32(b.data[b.readerIndex:])
b.readerIndex += 4
@@ -263,8 +344,9 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 {
//go:inline
func (b *ByteBuffer) ReadUint64(err *Error) uint64 {
if b.readerIndex+8 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data))
- return 0
+ if !b.fill(8, err) {
+ return 0
+ }
}
i := binary.LittleEndian.Uint64(b.data[b.readerIndex:])
b.readerIndex += 8
@@ -300,17 +382,46 @@ func (b *ByteBuffer) ReadFloat64(err *Error) float64 {
}
func (b *ByteBuffer) Read(p []byte) (n int, err error) {
+ if len(p) == 0 {
+ return 0, nil
+ }
+
+ if b.readerIndex+len(p) > len(b.data) && b.reader != nil {
+ var errOut Error
+ if !b.fill(len(p), &errOut) {
+ copied := copy(p, b.data[b.readerIndex:])
+ b.readerIndex += copied
+ if errOut.Kind() == ErrKindBufferOutOfBound {
+ return copied, io.EOF
+ }
+ return copied, errOut
+ }
+ }
+
copied := copy(p, b.data[b.readerIndex:])
b.readerIndex += copied
+ if copied == 0 {
+ return 0, io.EOF
+ }
return copied, nil
}
// ReadBinary reads n bytes and sets error on bounds violation
func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte {
if b.readerIndex+length > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data))
- return nil
+ if !b.fill(length, err) {
+ return nil
+ }
}
+
+ if b.reader != nil {
+ // In stream mode, compaction might overwrite these bytes, so
we must copy
+ result := make([]byte, length)
+ copy(result, b.data[b.readerIndex:b.readerIndex+length])
+ b.readerIndex += length
+ return result
+ }
+
v := b.data[b.readerIndex : b.readerIndex+length]
b.readerIndex += length
return v
@@ -360,13 +471,27 @@ func (b *ByteBuffer) SetReaderIndex(index int) {
func (b *ByteBuffer) Reset() {
b.readerIndex = 0
b.writerIndex = 0
+ b.reader = nil
// Keep the underlying buffer if it's reasonable sized to reduce
allocations
// Only nil it out if we want to release memory
if cap(b.data) > 64*1024 {
b.data = nil
}
- // Note: We keep b.data as-is (with its current length) to avoid issues
- // with grow() needing to expand the slice on first write
+}
+
+func (b *ByteBuffer) ResetWithReader(r io.Reader, bufferSize int) {
+ b.readerIndex = 0
+ b.writerIndex = 0
+ b.reader = r
+ if bufferSize <= 0 {
+ bufferSize = 4096
+ }
+ b.bufferSize = bufferSize
+ if cap(b.data) < bufferSize {
+ b.data = make([]byte, 0, bufferSize)
+ } else {
+ b.data = b.data[:0]
+ }
}
// Reserve ensures buffer has at least n bytes available for writing from
current position.
@@ -921,7 +1046,12 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error)
uint64 {
var result uint64
var shift uint
- for b.readerIndex < len(b.data) {
+ for {
+ if b.readerIndex >= len(b.data) {
+ if !b.fill(1, err) {
+ return 0
+ }
+ }
byteVal := b.data[b.readerIndex]
b.readerIndex++
result |= uint64(byteVal&0x7F) << shift
@@ -934,8 +1064,6 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error)
uint64 {
return 0
}
}
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
}
// ReadVarint64 reads the varint encoded with zig-zag (compatible with Java's
readVarint64).
@@ -975,8 +1103,9 @@ func (b *ByteBuffer) WriteTaggedInt64(value int64) {
// Otherwise, skip flag byte and read 8 bytes as int64.
func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 {
if b.readerIndex+4 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data))
- return 0
+ if !b.fill(4, err) {
+ return 0
+ }
}
var i int32
if isLittleEndian {
@@ -989,8 +1118,9 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 {
return int64(i >> 1) // arithmetic right shift
}
if b.readerIndex+9 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data))
- return 0
+ if !b.fill(9, err) {
+ return 0
+ }
}
var value int64
if isLittleEndian {
@@ -1026,8 +1156,9 @@ func (b *ByteBuffer) WriteTaggedUint64(value uint64) {
// Otherwise, skip flag byte and read 8 bytes as uint64.
func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 {
if b.readerIndex+4 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data))
- return 0
+ if !b.fill(4, err) {
+ return 0
+ }
}
var i uint32
if isLittleEndian {
@@ -1040,8 +1171,9 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 {
return uint64(i >> 1)
}
if b.readerIndex+9 > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data))
- return 0
+ if !b.fill(9, err) {
+ return 0
+ }
}
var value uint64
if isLittleEndian {
@@ -1122,8 +1254,9 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64
{
var shift uint
for i := 0; i < 8; i++ {
if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1,
len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
byteVal := b.data[b.readerIndex]
b.readerIndex++
@@ -1134,8 +1267,9 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64
{
shift += 7
}
if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
byteVal := b.data[b.readerIndex]
b.readerIndex++
@@ -1154,8 +1288,9 @@ func (b *ByteBuffer) remaining() int {
//go:inline
func (b *ByteBuffer) ReadUint8(err *Error) uint8 {
if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
v := b.data[b.readerIndex]
b.readerIndex++
@@ -1276,8 +1411,9 @@ func (b *ByteBuffer) readVarUint32Slow(err *Error) uint32
{
var shift uint
for {
if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1,
len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
byteVal := b.data[b.readerIndex]
b.readerIndex++
@@ -1437,8 +1573,9 @@ func (b *ByteBuffer) UnsafePutTaggedUint64(offset int,
value uint64) int {
// ReadVarUint32Small7 reads a VarUint32 in small-7 format with error checking
func (b *ByteBuffer) ReadVarUint32Small7(err *Error) uint32 {
if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
+ if !b.fill(1, err) {
+ return 0
+ }
}
readIdx := b.readerIndex
v := b.data[readIdx]
@@ -1486,47 +1623,24 @@ func (b *ByteBuffer) continueReadVarUint32(readIdx int,
bulkRead, value uint32)
}
func (b *ByteBuffer) readVaruint36Slow(err *Error) uint64 {
- if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
- }
- b0 := b.data[b.readerIndex]
- b.readerIndex++
- result := uint64(b0 & 0x7F)
- if b0&0x80 != 0 {
+ var shift uint
+ var result uint64
+ for {
if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1,
len(b.data))
- return 0
- }
- b1 := b.data[b.readerIndex]
- b.readerIndex++
- result |= uint64(b1&0x7F) << 7
- if b1&0x80 != 0 {
- if b.readerIndex >= len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, 1,
len(b.data))
+ if !b.fill(1, err) {
return 0
}
- b2 := b.data[b.readerIndex]
- b.readerIndex++
- result |= uint64(b2&0x7F) << 14
- if b2&0x80 != 0 {
- if b.readerIndex >= len(b.data) {
- *err =
BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
- }
- b3 := b.data[b.readerIndex]
- b.readerIndex++
- result |= uint64(b3&0x7F) << 21
- if b3&0x80 != 0 {
- if b.readerIndex >= len(b.data) {
- *err =
BufferOutOfBoundError(b.readerIndex, 1, len(b.data))
- return 0
- }
- b4 := b.data[b.readerIndex]
- b.readerIndex++
- result |= uint64(b4) << 28
- }
- }
+ }
+ b0 := b.data[b.readerIndex]
+ b.readerIndex++
+ result |= uint64(b0&0x7F) << shift
+ if b0&0x80 == 0 {
+ break
+ }
+ shift += 7
+ if shift >= 35 {
+ *err = DeserializationError("varuint36 overflow")
+ return 0
}
}
return result
@@ -1549,9 +1663,19 @@ func (b *ByteBuffer) IncreaseReaderIndex(n int) {
// ReadBytes reads n bytes and sets error on bounds violation
func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte {
if b.readerIndex+n > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data))
- return nil
+ if !b.fill(n, err) {
+ return nil
+ }
}
+
+ if b.reader != nil {
+ // In stream mode, compaction might overwrite these bytes, so
we must copy
+ result := make([]byte, n)
+ copy(result, b.data[b.readerIndex:b.readerIndex+n])
+ b.readerIndex += n
+ return result
+ }
+
p := b.data[b.readerIndex : b.readerIndex+n]
b.readerIndex += n
return p
@@ -1560,8 +1684,20 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte
{
// Skip skips n bytes and sets error on bounds violation
func (b *ByteBuffer) Skip(length int, err *Error) {
if b.readerIndex+length > len(b.data) {
- *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data))
- return
+ if !b.fill(length, err) {
+ return
+ }
}
b.readerIndex += length
}
+
+// CheckReadable ensures that at least n bytes are available to read.
+// In stream mode, it will attempt to fill the buffer if necessary.
+//
+//go:inline
+func (b *ByteBuffer) CheckReadable(n int, err *Error) bool {
+ if b.readerIndex+n > len(b.data) {
+ return b.fill(n, err)
+ }
+ return true
+}
diff --git a/go/fory/primitive_test.go b/go/fory/primitive_test.go
index 0f478590e..f5bf7b08c 100644
--- a/go/fory/primitive_test.go
+++ b/go/fory/primitive_test.go
@@ -34,7 +34,7 @@ func TestFloat16Primitive(t *testing.T) {
require.NoError(t, err)
var res float16.Float16
- err = f.Deserialize(data, &res)
+ err = testDeserialize(t, f, data, &res)
require.NoError(t, err)
require.True(t, f16.Equal(res))
@@ -53,7 +53,7 @@ func TestFloat16PrimitiveSliceDirect(t *testing.T) {
require.NoError(t, err)
var resSlice []float16.Float16
- err = f.Deserialize(data, &resSlice)
+ err = testDeserialize(t, f, data, &resSlice)
require.NoError(t, err)
require.Equal(t, slice, resSlice)
}
@@ -67,7 +67,7 @@ func TestBFloat16Primitive(t *testing.T) {
require.NoError(t, err)
var res bfloat16.BFloat16
- err = f.Deserialize(data, &res)
+ err = testDeserialize(t, f, data, &res)
require.NoError(t, err)
require.Equal(t, bf16.Bits(), res.Bits())
@@ -86,7 +86,7 @@ func TestBFloat16PrimitiveSliceDirect(t *testing.T) {
require.NoError(t, err)
var resSlice []bfloat16.BFloat16
- err = f.Deserialize(data, &resSlice)
+ err = testDeserialize(t, f, data, &resSlice)
require.NoError(t, err)
require.Equal(t, slice, resSlice)
}
diff --git a/go/fory/reader.go b/go/fory/reader.go
index e7a1df171..a0a37d92f 100644
--- a/go/fory/reader.go
+++ b/go/fory/reader.go
@@ -83,6 +83,7 @@ func (c *ReadContext) SetData(data []byte) {
c.buffer.data = data
c.buffer.readerIndex = 0
c.buffer.writerIndex = len(data)
+ c.buffer.reader = nil
}
}
diff --git a/go/fory/slice_primitive_test.go b/go/fory/slice_primitive_test.go
index e03a4cb00..b072078e3 100644
--- a/go/fory/slice_primitive_test.go
+++ b/go/fory/slice_primitive_test.go
@@ -39,7 +39,7 @@ func TestFloat16Slice(t *testing.T) {
assert.NoError(t, err)
var result []float16.Float16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -50,7 +50,7 @@ func TestFloat16Slice(t *testing.T) {
assert.NoError(t, err)
var result []float16.Float16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -62,7 +62,7 @@ func TestFloat16Slice(t *testing.T) {
assert.NoError(t, err)
var result []float16.Float16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -81,7 +81,7 @@ func TestBFloat16Slice(t *testing.T) {
assert.NoError(t, err)
var result []bfloat16.BFloat16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -92,7 +92,7 @@ func TestBFloat16Slice(t *testing.T) {
assert.NoError(t, err)
var result []bfloat16.BFloat16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -104,7 +104,7 @@ func TestBFloat16Slice(t *testing.T) {
assert.NoError(t, err)
var result []bfloat16.BFloat16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -119,7 +119,7 @@ func TestIntSlice(t *testing.T) {
assert.NoError(t, err)
var result []int
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -130,7 +130,7 @@ func TestIntSlice(t *testing.T) {
assert.NoError(t, err)
var result []int
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -142,7 +142,7 @@ func TestIntSlice(t *testing.T) {
assert.NoError(t, err)
var result []int
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -157,7 +157,7 @@ func TestUintSlice(t *testing.T) {
assert.NoError(t, err)
var result []uint
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -168,7 +168,7 @@ func TestUintSlice(t *testing.T) {
assert.NoError(t, err)
var result []uint
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -180,7 +180,7 @@ func TestUintSlice(t *testing.T) {
assert.NoError(t, err)
var result []uint
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -195,7 +195,7 @@ func TestInt8Slice(t *testing.T) {
assert.NoError(t, err)
var result []int8
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -206,7 +206,7 @@ func TestInt8Slice(t *testing.T) {
assert.NoError(t, err)
var result []int8
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -218,7 +218,7 @@ func TestInt8Slice(t *testing.T) {
assert.NoError(t, err)
var result []int8
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -233,7 +233,7 @@ func TestInt16Slice(t *testing.T) {
assert.NoError(t, err)
var result []int16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -244,7 +244,7 @@ func TestInt16Slice(t *testing.T) {
assert.NoError(t, err)
var result []int16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -256,7 +256,7 @@ func TestInt16Slice(t *testing.T) {
assert.NoError(t, err)
var result []int16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -271,7 +271,7 @@ func TestInt32Slice(t *testing.T) {
assert.NoError(t, err)
var result []int32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -282,7 +282,7 @@ func TestInt32Slice(t *testing.T) {
assert.NoError(t, err)
var result []int32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -294,7 +294,7 @@ func TestInt32Slice(t *testing.T) {
assert.NoError(t, err)
var result []int32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -309,7 +309,7 @@ func TestInt64Slice(t *testing.T) {
assert.NoError(t, err)
var result []int64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -320,7 +320,7 @@ func TestInt64Slice(t *testing.T) {
assert.NoError(t, err)
var result []int64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -332,7 +332,7 @@ func TestInt64Slice(t *testing.T) {
assert.NoError(t, err)
var result []int64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -347,7 +347,7 @@ func TestUint16Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -358,7 +358,7 @@ func TestUint16Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -370,7 +370,7 @@ func TestUint16Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint16
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -385,7 +385,7 @@ func TestUint32Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -396,7 +396,7 @@ func TestUint32Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -408,7 +408,7 @@ func TestUint32Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint32
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
@@ -423,7 +423,7 @@ func TestUint64Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Equal(t, slice, result)
})
@@ -434,7 +434,7 @@ func TestUint64Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Empty(t, result)
@@ -446,7 +446,7 @@ func TestUint64Slice(t *testing.T) {
assert.NoError(t, err)
var result []uint64
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
assert.NoError(t, err)
assert.Nil(t, result)
})
diff --git a/go/fory/stream.go b/go/fory/stream.go
new file mode 100644
index 000000000..420a6d482
--- /dev/null
+++ b/go/fory/stream.go
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fory
+
+import (
+ "io"
+ "reflect"
+)
+
+// InputStream supports robust sequential deserialization from a stream.
+// It maintains the ByteBuffer and ReadContext state across multiple
Deserialize calls,
+// preventing data loss from prefetched buffers and preserving TypeResolver
metadata
+// (Meta Sharing) across object boundaries.
+type InputStream struct {
+ buffer *ByteBuffer
+}
+
+// NewInputStream creates a new InputStream that reads from the provided
io.Reader.
+// The InputStream owns the buffer and maintains state across sequential
Deserialize calls.
+func NewInputStream(r io.Reader) *InputStream {
+ return NewInputStreamWithBufferSize(r, 0)
+}
+
+// NewInputStreamWithBufferSize creates a new InputStream with a specified
minimum buffer size.
+func NewInputStreamWithBufferSize(r io.Reader, bufferSize int) *InputStream {
+ buf := NewByteBufferFromReader(r, bufferSize)
+ return &InputStream{
+ buffer: buf,
+ }
+}
+
+// Shrink compacts the internal buffer, dropping already-read bytes to reclaim
memory.
+// It applies a heuristic to avoid tiny frequent compactions and reallocates
the backing
+// slice if the capacity becomes excessively large compared to the remaining
data.
+func (is *InputStream) Shrink() {
+ b := is.buffer
+ if b == nil {
+ return
+ }
+
+ readPos := b.readerIndex
+ // Best-effort policy: keep a 4096-byte floor to avoid tiny frequent
compactions
+ if readPos <= 4096 || readPos < b.bufferSize {
+ return
+ }
+
+ remaining := b.writerIndex - readPos
+ currentCapacity := cap(b.data)
+ targetCapacity := currentCapacity
+
+ if currentCapacity > b.bufferSize {
+ if remaining == 0 {
+ targetCapacity = b.bufferSize
+ } else if remaining <= currentCapacity/4 {
+ doubled := remaining * 2
+ targetCapacity = doubled
+ if targetCapacity < b.bufferSize {
+ targetCapacity = b.bufferSize
+ }
+ }
+ }
+
+ if targetCapacity < currentCapacity {
+ // Actually reclaim memory by copying to a new, smaller slice
+ newData := make([]byte, remaining, targetCapacity)
+ copy(newData, b.data[readPos:b.writerIndex])
+ b.data = newData
+ b.writerIndex = remaining
+ b.readerIndex = 0
+ } else if readPos > 0 {
+ // Just compact without reallocating
+ copy(b.data, b.data[readPos:b.writerIndex])
+ b.writerIndex = remaining
+ b.readerIndex = 0
+ b.data = b.data[:remaining]
+ }
+}
+
+// DeserializeFromStream reads the next object from the stream into the
provided value.
+// It uses a shared ReadContext for the lifetime of the InputStream, clearing
+// temporary state between calls but preserving the buffer and TypeResolver
state.
+func (f *Fory) DeserializeFromStream(is *InputStream, v any) error {
+
+ // We only reset the temporary read state (like refTracker and
outOfBand buffers),
+ // NOT the buffer or the type mapping, which must persist.
+ defer func() {
+ f.readCtx.refReader.Reset()
+ f.readCtx.outOfBandBuffers = nil
+ f.readCtx.outOfBandIndex = 0
+ f.readCtx.err = Error{}
+ if f.readCtx.refResolver != nil {
+ f.readCtx.refResolver.resetRead()
+ }
+ }()
+
+ // Temporarily swap buffer
+ origBuffer := f.readCtx.buffer
+ f.readCtx.buffer = is.buffer
+
+ isNull := readHeader(f.readCtx)
+ if f.readCtx.HasError() {
+ f.readCtx.buffer = origBuffer
+ return f.readCtx.TakeError()
+ }
+
+ if isNull {
+ f.readCtx.buffer = origBuffer
+ return nil
+ }
+
+ target := reflect.ValueOf(v).Elem()
+ f.readCtx.ReadValue(target, RefModeTracking, true)
+ if f.readCtx.HasError() {
+ f.readCtx.buffer = origBuffer
+ return f.readCtx.TakeError()
+ }
+
+ // Restore original buffer
+ f.readCtx.buffer = origBuffer
+
+ return nil
+}
+
+// DeserializeFromReader deserializes a single object from a stream.
+// It is strictly stateless: the buffer and all read state are always reset
before
+// each call, discarding any prefetched data and type metadata.
+// For sequential multi-object reads on the same stream, use NewInputStream
instead.
+func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
+ defer f.resetReadState()
+ // Always reset to enforce stateless semantics.
+ f.readCtx.buffer.ResetWithReader(r, 0)
+
+ isNull := readHeader(f.readCtx)
+ if f.readCtx.HasError() {
+ return f.readCtx.TakeError()
+ }
+
+ if isNull {
+ return nil
+ }
+
+ target := reflect.ValueOf(v).Elem()
+ f.readCtx.ReadValue(target, RefModeTracking, true)
+ if f.readCtx.HasError() {
+ return f.readCtx.TakeError()
+ }
+
+ return nil
+}
diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go
new file mode 100644
index 000000000..098254587
--- /dev/null
+++ b/go/fory/stream_test.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fory
+
+import (
+ "bytes"
+ "io"
+ "testing"
+)
+
+type StreamTestStruct struct {
+ ID int32
+ Name string
+ Data []byte
+}
+
+func TestStreamDeserialization(t *testing.T) {
+ f := New()
+ f.RegisterStruct(&StreamTestStruct{}, 100)
+
+ original := &StreamTestStruct{
+ ID: 42,
+ Name: "Stream Test",
+ Data: []byte{1, 2, 3, 4, 5},
+ }
+
+ data, err := f.Serialize(original)
+ if err != nil {
+ t.Fatalf("Serialize failed: %v", err)
+ }
+
+ // 1. Test normal reader
+ reader := bytes.NewReader(data)
+ var decoded StreamTestStruct
+ err = f.DeserializeFromReader(reader, &decoded)
+ if err != nil {
+ t.Fatalf("DeserializeFromReader failed: %v", err)
+ }
+
+ if decoded.ID != original.ID || decoded.Name != original.Name ||
!bytes.Equal(decoded.Data, original.Data) {
+ t.Errorf("Decoded value mismatch. Got: %+v, Want: %+v",
decoded, original)
+ }
+}
+
+// slowReader returns data byte by byte to test fill() logic and compaction
+type slowReader struct {
+ data []byte
+ pos int
+}
+
+func (r *slowReader) Read(p []byte) (n int, err error) {
+ if r.pos >= len(r.data) {
+ return 0, io.EOF
+ }
+ if len(p) == 0 {
+ return 0, nil
+ }
+ p[0] = r.data[r.pos]
+ r.pos++
+ return 1, nil
+}
+
+func TestStreamDeserializationSlow(t *testing.T) {
+ f := New()
+ f.RegisterStruct(&StreamTestStruct{}, 100)
+
+ original := &StreamTestStruct{
+ ID: 42,
+ Name: "Slow Stream Test with a reasonably long string and some
data to trigger multiple fills",
+ Data: bytes.Repeat([]byte{0xAA}, 100),
+ }
+
+ data, err := f.Serialize(original)
+ if err != nil {
+ t.Fatalf("Serialize failed: %v", err)
+ }
+
+ // Test with slow reader and small bufferSize to force compaction/growth
+ reader := &slowReader{data: data}
+ var decoded StreamTestStruct
+
+ // Create an InputStream with a small bufferSize (16) to force frequent
fills and compactions
+ stream := NewInputStreamWithBufferSize(reader, 16)
+ err = f.DeserializeFromStream(stream, &decoded)
+ if err != nil {
+ t.Fatalf("DeserializeFromReader (slow) failed: %v", err)
+ }
+
+ if decoded.ID != original.ID || decoded.Name != original.Name ||
!bytes.Equal(decoded.Data, original.Data) {
+ t.Errorf("Decoded value mismatch (slow). Got: %+v, Want: %+v",
decoded, original)
+ }
+}
+
+func TestStreamDeserializationEOF(t *testing.T) {
+ f := New()
+ f.RegisterStruct(&StreamTestStruct{}, 100)
+
+ original := &StreamTestStruct{
+ ID: 42,
+ Name: "EOF Test",
+ }
+
+ data, err := f.Serialize(original)
+ if err != nil {
+ t.Fatalf("Serialize failed: %v", err)
+ }
+
+ // Truncate data to cause unexpected EOF during reading Name
+ truncated := data[:len(data)-2]
+ reader := bytes.NewReader(truncated)
+ var decoded StreamTestStruct
+ err = f.DeserializeFromReader(reader, &decoded)
+ if err == nil {
+ t.Fatal("Expected error on truncated stream, got nil")
+ }
+
+ // Ideally it should be a BufferOutOfBoundError
+ if _, ok := err.(Error); !ok {
+ t.Errorf("Expected fory.Error, got %T: %v", err, err)
+ }
+}
+
+func TestInputStreamSequential(t *testing.T) {
+ f := New()
+ // Register type in compatible mode to test Meta Sharing across
sequential reads
+ f.config.Compatible = true
+ f.RegisterStruct(&StreamTestStruct{}, 100)
+
+ msg1 := &StreamTestStruct{ID: 1, Name: "Msg 1", Data: []byte{1, 1}}
+ msg2 := &StreamTestStruct{ID: 2, Name: "Msg 2", Data: []byte{2, 2}}
+ msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}}
+
+ var buf bytes.Buffer
+
+ // Serialize sequentially into one stream
+ data1, _ := f.Serialize(msg1)
+ buf.Write(data1)
+ data2, _ := f.Serialize(msg2)
+ buf.Write(data2)
+ data3, _ := f.Serialize(msg3)
+ buf.Write(data3)
+
+ fDec := New()
+ fDec.config.Compatible = true
+ fDec.RegisterStruct(&StreamTestStruct{}, 100)
+
+ // Create a InputStream
+ sr := NewInputStream(&buf)
+
+ // Deserialize sequentially
+ var out1, out2, out3 StreamTestStruct
+
+ err := fDec.DeserializeFromStream(sr, &out1)
+ if err != nil {
+ t.Fatalf("Deserialize 1 failed: %v", err)
+ }
+ if out1.ID != msg1.ID || out1.Name != msg1.Name ||
!bytes.Equal(out1.Data, msg1.Data) {
+ t.Errorf("Msg 1 mismatch. Got: %+v, Want: %+v", out1, msg1)
+ }
+
+ err = fDec.DeserializeFromStream(sr, &out2)
+ if err != nil {
+ t.Fatalf("Deserialize 2 failed: %v", err)
+ }
+ if out2.ID != msg2.ID || out2.Name != msg2.Name ||
!bytes.Equal(out2.Data, msg2.Data) {
+ t.Errorf("Msg 2 mismatch. Got: %+v, Want: %+v", out2, msg2)
+ }
+
+ err = fDec.DeserializeFromStream(sr, &out3)
+ if err != nil {
+ t.Fatalf("Deserialize 3 failed: %v", err)
+ }
+ if out3.ID != msg3.ID || out3.Name != msg3.Name ||
!bytes.Equal(out3.Data, msg3.Data) {
+ t.Errorf("Msg 3 mismatch. Got: %+v, Want: %+v", out3, msg3)
+ }
+}
+
+func TestInputStreamShrink(t *testing.T) {
+ // Create a large payload that easily escapes the bufferSize (4096)
+ data := make([]byte, 10000)
+ for i := range data {
+ data[i] = byte(i % 256)
+ }
+
+ // Create a stream reader with a tiny bufferSize so we can trigger
Shrink reliably
+ buf := bytes.NewReader(data)
+ sr := NewInputStreamWithBufferSize(buf, 100)
+
+ // Force a read/fill to pull a chunk into memory
+ err := sr.buffer.fill(5000, nil)
+ if !err {
+ t.Fatalf("Failed to fill buffer")
+ }
+
+ // Fake an artificial read that consumed a massive portion of the buffer
+ originalCapacity := cap(sr.buffer.data)
+ sr.buffer.readerIndex = 4500
+
+ // Trigger Shrink
+ sr.Shrink()
+
+ // 1. Validate reader index was successfully reset
+ if sr.buffer.readerIndex != 0 {
+ t.Errorf("Expected readerIndex to reset to 0, got %d",
sr.buffer.readerIndex)
+ }
+
+ // 2. Validate the capacity actually shrank (reclaimed memory)
+ newCapacity := cap(sr.buffer.data)
+ if newCapacity >= originalCapacity {
+ t.Errorf("Expected capacity to shrink (was %d, now %d)",
originalCapacity, newCapacity)
+ }
+
+ // 3. Validate the remaining unread data remained intact
+ if sr.buffer.writerIndex != 500 {
+ t.Errorf("Expected writerIndex to be 500 remaining bytes, got
%d", sr.buffer.writerIndex)
+ }
+ for i := 0; i < 500; i++ {
+ if sr.buffer.data[i] != byte((4500+i)%256) {
+ t.Errorf("Data corruption post-shrink at index %d", i)
+ break
+ }
+ }
+}
diff --git a/go/fory/struct.go b/go/fory/struct.go
index 2cd21991b..52e53ad99 100644
--- a/go/fory/struct.go
+++ b/go/fory/struct.go
@@ -1424,6 +1424,11 @@ func (s *structSerializer) ReadData(ctx *ReadContext,
value reflect.Value) {
// Phase 1: Fixed-size primitives (inline unsafe reads with endian
handling)
if s.fieldGroup.FixedSize > 0 {
+ var errOut Error
+ if !buf.CheckReadable(int(s.fieldGroup.FixedSize), &errOut) {
+ ctx.SetError(errOut)
+ return
+ }
baseOffset := buf.ReaderIndex()
data := buf.GetData()
diff --git a/go/fory/struct_test.go b/go/fory/struct_test.go
index d97bd1f3b..430babdcf 100644
--- a/go/fory/struct_test.go
+++ b/go/fory/struct_test.go
@@ -56,7 +56,7 @@ func TestUnsignedTypeSerialization(t *testing.T) {
}
var result any
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
if err != nil {
t.Fatalf("Deserialize failed: %v", err)
}
@@ -104,7 +104,7 @@ func TestOptionFieldSerialization(t *testing.T) {
require.NoError(t, err)
var result any
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
require.NoError(t, err)
out := result.(*OptionStruct)
@@ -357,7 +357,7 @@ func TestSetFieldSerializationSchemaConsistent(t
*testing.T) {
// Deserialize
var result any
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
require.NoError(t, err, "Deserialize failed")
resultObj := result.(*SetFieldsStruct)
@@ -404,7 +404,7 @@ func TestSetFieldSerializationCompatible(t *testing.T) {
// Deserialize
var result any
- err = f.Deserialize(data, &result)
+ err = testDeserialize(t, f, data, &result)
require.NoError(t, err, "Deserialize failed")
resultObj := result.(*SetFieldsStruct)
@@ -525,7 +525,7 @@ func TestFloat16StructField(t *testing.T) {
// Create new instance
res := &StructWithFloat16{}
- err = f.Deserialize(data, res)
+ err = testDeserialize(t, f, data, res)
require.NoError(t, err)
// Verify
diff --git a/go/fory/test_helper_test.go b/go/fory/test_helper_test.go
new file mode 100644
index 000000000..73106acf2
--- /dev/null
+++ b/go/fory/test_helper_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fory
+
+import (
+ "io"
+ "reflect"
+ "testing"
+)
+
+// oneByteReader returns data byte by byte to ensure aggressively that all
+// `fill()` boundaries, loops, and buffering conditions are tested.
+type oneByteReader struct {
+ data []byte
+ pos int
+}
+
+func (r *oneByteReader) Read(p []byte) (n int, err error) {
+ if r.pos >= len(r.data) {
+ return 0, io.EOF
+ }
+ if len(p) == 0 {
+ return 0, nil
+ }
+ p[0] = r.data[r.pos]
+ r.pos++
+ return 1, nil
+}
+
+// testDeserialize is a testing helper that performs standard in-memory
+// deserialization and additionally wraps the payload in a slow one-byte
+// stream reader to verify that stream decoding handles fragmented reads
correctly.
+func testDeserialize(t *testing.T, f *Fory, data []byte, v any) error {
+ t.Helper()
+
+ // 1. First, deserialize from bytes (the fast path)
+ err := f.Deserialize(data, v)
+ if err != nil {
+ return err
+ }
+
+ // 2. Deserialize from oneByteReader (the slow stream path)
+ // We create a new instance of the same target type to ensure clean
state
+ vType := reflect.TypeOf(v)
+ if vType == nil || vType.Kind() != reflect.Ptr {
+ t.Fatalf("testDeserialize requires a pointer to a value, got
%v", vType)
+ }
+
+ freshV := reflect.New(vType.Elem()).Interface()
+
+ stream := &oneByteReader{data: data, pos: 0}
+
+ // Create a new stream reader. The stream context handles boundaries
and compactions.
+ streamReader := NewInputStream(stream)
+ err = f.DeserializeFromStream(streamReader, freshV)
+ if err != nil {
+ t.Fatalf("Stream deserialization via OneByteStream failed: %v",
err)
+ }
+
+ // 3. Compare the two results
+ if !reflect.DeepEqual(v, freshV) {
+ t.Fatalf("Stream deserialization mismatched byte
deserialization.\nExpected: %+v\nGot: %+v", v, freshV)
+ }
+
+ // Returns the original error from standard deserialization
+ return err
+}
diff --git a/go/fory/type_def.go b/go/fory/type_def.go
index da669ccc3..bd55de459 100644
--- a/go/fory/type_def.go
+++ b/go/fory/type_def.go
@@ -291,7 +291,7 @@ func skipTypeDef(buffer *ByteBuffer, header int64, err
*Error) {
if sz == META_SIZE_MASK {
sz += int(buffer.ReadVarUint32(err))
}
- buffer.IncreaseReaderIndex(sz)
+ buffer.Skip(sz, err)
}
const BIG_NAME_THRESHOLD = 0b111111 // 6 bits for size when using 2 bits for
encoding
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]