This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new 1e2bb638e feat(python/c++): shrink stream buffers after struct
deserialization (#3453)
1e2bb638e is described below
commit 1e2bb638e699b1c487cc9dd5bd2cdd83497e545e
Author: Shawn Yang <[email protected]>
AuthorDate: Wed Mar 4 12:15:16 2026 +0800
feat(python/c++): shrink stream buffers after struct deserialization (#3453)
## Why?
## What does this PR do?
## Related issues
#3449
#3307
## Does this PR introduce any user-facing change?
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
---
cpp/fory/serialization/stream_test.cc | 53 +++++++++++++++++++++++++++--
cpp/fory/serialization/struct_serializer.h | 8 +++--
cpp/fory/util/buffer.h | 12 ++++---
cpp/fory/util/buffer_test.cc | 33 ++++++++++++++++--
cpp/fory/util/stream.cc | 8 +++++
cpp/fory/util/stream.h | 3 ++
python/pyfory/_fory.py | 4 +++
python/pyfory/buffer.pxi | 6 ++++
python/pyfory/cpp/pyfory.cc | 12 +++++--
python/pyfory/includes/libutil.pxd | 4 +++
python/pyfory/serialization.pyx | 4 +++
python/pyfory/struct.pxi | 1 +
python/pyfory/struct.py | 1 +
python/pyfory/tests/test_stream.py | 54 ++++++++++++++++++++++++++++++
14 files changed, 190 insertions(+), 13 deletions(-)
diff --git a/cpp/fory/serialization/stream_test.cc
b/cpp/fory/serialization/stream_test.cc
index 98783a37e..0c73bb7ac 100644
--- a/cpp/fory/serialization/stream_test.cc
+++ b/cpp/fory/serialization/stream_test.cc
@@ -21,6 +21,7 @@
#include <istream>
#include <map>
#include <memory>
+#include <sstream>
#include <streambuf>
#include <string>
#include <utility>
@@ -218,17 +219,20 @@ TEST(StreamSerializationTest,
SequentialDeserializeFromSingleStream) {
auto first = fory.deserialize<int32_t>(stream);
ASSERT_TRUE(first.ok()) << first.error().to_string();
EXPECT_EQ(first.value(), 12345);
- EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+ const uint32_t first_reader_index = stream.get_buffer().reader_index();
+ EXPECT_GT(first_reader_index, 0U);
auto second = fory.deserialize<std::string>(stream);
ASSERT_TRUE(second.ok()) << second.error().to_string();
EXPECT_EQ(second.value(), "next-value");
- EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+ const uint32_t second_reader_index = stream.get_buffer().reader_index();
+ EXPECT_GT(second_reader_index, first_reader_index);
auto third = fory.deserialize<StreamEnvelope>(stream);
ASSERT_TRUE(third.ok()) << third.error().to_string();
EXPECT_EQ(third.value(), envelope);
- EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+ const uint32_t third_reader_index = stream.get_buffer().reader_index();
+ EXPECT_GT(third_reader_index, second_reader_index);
EXPECT_EQ(stream.get_buffer().remaining_size(), 0U);
}
@@ -310,6 +314,49 @@ TEST(StreamSerializationTest,
SerializeToOStreamOverloadParity) {
EXPECT_EQ(out.data(), expected.value());
}
+TEST(StreamSerializationTest,
+ StructDeserializeFromStreamBackedBufferShrinksAfterEachStruct) {
+ auto fory = Fory::builder().xlang(true).track_ref(true).build();
+ register_stream_types(fory);
+
+ std::vector<int32_t> first_values;
+ std::vector<int32_t> second_values;
+ first_values.reserve(6000);
+ second_values.reserve(6000);
+ for (int32_t i = 0; i < 6000; ++i) {
+ first_values.push_back(i);
+ second_values.push_back(6000 - i);
+ }
+
+ StreamEnvelope first{
+ "first", std::move(first_values), {{"a", 11}, {"b", 22}}, {7, 8}, true,
+ };
+ StreamEnvelope second{
+ "second", std::move(second_values), {{"c", 33}, {"d", 44}}, {9, 10},
+ false,
+ };
+
+ std::vector<uint8_t> bytes;
+ ASSERT_TRUE(fory.serialize_to(bytes, first).ok());
+ ASSERT_TRUE(fory.serialize_to(bytes, second).ok());
+
+ std::string payload(reinterpret_cast<const char *>(bytes.data()),
+ bytes.size());
+ std::istringstream source(payload);
+ StdInputStream stream(source, 4096);
+ Buffer &buffer = stream.get_buffer();
+
+ auto first_result = fory.deserialize<StreamEnvelope>(buffer);
+ ASSERT_TRUE(first_result.ok()) << first_result.error().to_string();
+ EXPECT_EQ(first_result.value(), first);
+ EXPECT_EQ(buffer.reader_index(), 0U);
+
+ auto second_result = fory.deserialize<StreamEnvelope>(buffer);
+ ASSERT_TRUE(second_result.ok()) << second_result.error().to_string();
+ EXPECT_EQ(second_result.value(), second);
+ EXPECT_EQ(buffer.reader_index(), 0U);
+}
+
} // namespace test
} // namespace serialization
} // namespace fory
diff --git a/cpp/fory/serialization/struct_serializer.h
b/cpp/fory/serialization/struct_serializer.h
index ccde0e480..4fe76cff2 100644
--- a/cpp/fory/serialization/struct_serializer.h
+++ b/cpp/fory/serialization/struct_serializer.h
@@ -2773,7 +2773,7 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
// Note: varint bounds checking is done per-byte during reading since
// varint lengths are variable (actual size << max possible size)
if constexpr (varint_count > 0) {
- if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+ if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
// Stream-backed buffers may not have all varint bytes materialized
yet.
// Fall back to per-field readers that propagate stream read errors.
read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
@@ -2828,7 +2828,7 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
// Phase 2: Read consecutive varint primitives (int32, int64) if any
if constexpr (varint_count > 0) {
- if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+ if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
// Stream-backed buffers may not have all varint bytes materialized yet.
// Fall back to per-field readers that propagate stream read errors.
read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
@@ -3270,6 +3270,7 @@ struct Serializer<T,
std::enable_if_t<is_fory_serializable_v<T>>> {
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return T{};
}
+ ctx.buffer().shrink_input_buffer();
return obj;
}
@@ -3279,6 +3280,7 @@ struct Serializer<T,
std::enable_if_t<is_fory_serializable_v<T>>> {
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return T{};
}
+ ctx.buffer().shrink_input_buffer();
return obj;
}
@@ -3290,6 +3292,7 @@ struct Serializer<T,
std::enable_if_t<is_fory_serializable_v<T>>> {
return T{};
}
+ ctx.buffer().shrink_input_buffer();
return obj;
}
@@ -3333,6 +3336,7 @@ struct Serializer<T,
std::enable_if_t<is_fory_serializable_v<T>>> {
return T{};
}
+ ctx.buffer().shrink_input_buffer();
return obj;
}
diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h
index a44de2d75..a9dc0e1d2 100644
--- a/cpp/fory/util/buffer.h
+++ b/cpp/fory/util/buffer.h
@@ -106,11 +106,11 @@ public:
FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }
- FORY_ALWAYS_INLINE bool is_stream_backed() const {
+ FORY_ALWAYS_INLINE bool has_input_stream() const {
return input_stream_ != nullptr;
}
- FORY_ALWAYS_INLINE bool is_output_stream_backed() const {
+ FORY_ALWAYS_INLINE bool has_output_stream() const {
return output_stream_ != nullptr;
}
@@ -135,8 +135,12 @@ public:
output_stream_ = nullptr;
}
- FORY_ALWAYS_INLINE void shrink_stream_buffer() {
- if (input_stream_ != nullptr) {
+ // Best-effort stream buffer compaction entry point.
+ // Stage 1 guard: avoid calling into stream shrinking for very small
progress.
+ // Stage 2 guard lives in InputStream::shrink_buffer(), which can decide
based
+ // on stream-specific configured buffer size.
+ FORY_ALWAYS_INLINE void shrink_input_buffer() {
+ if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) {
input_stream_->shrink_buffer();
}
}
diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc
index 7f32c0685..97043a4fd 100644
--- a/cpp/fory/util/buffer_test.cc
+++ b/cpp/fory/util/buffer_test.cc
@@ -19,6 +19,8 @@
#include <iostream>
#include <limits>
+#include <sstream>
+#include <string>
#include <utility>
#include <vector>
@@ -330,6 +332,33 @@ TEST(Buffer, StreamSkipAndUnread) {
EXPECT_EQ(view.reader_index(), 2U);
}
+TEST(Buffer, StreamShrinkBufferBestEffortUsesConfiguredBufferSize) {
+ constexpr uint32_t kConfiguredBufferSize = 32768;
+ constexpr uint32_t kPayloadSize = kConfiguredBufferSize * 2;
+ std::string payload(kPayloadSize, '\x7');
+ std::istringstream source(payload);
+ StdInputStream stream(source, kConfiguredBufferSize);
+ Buffer reader(stream);
+ Error error;
+
+ reader.skip(5000, error);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.reader_index(), 5000U);
+
+ // Below configured input buffer size, shrink should be a no-op.
+ reader.shrink_input_buffer();
+ EXPECT_EQ(reader.reader_index(), 5000U);
+
+ reader.skip(kConfiguredBufferSize, error);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ ASSERT_GT(reader.reader_index(), kConfiguredBufferSize);
+
+ const uint32_t remaining_before = reader.remaining_size();
+ reader.shrink_input_buffer();
+ EXPECT_EQ(reader.reader_index(), 0U);
+ EXPECT_EQ(reader.size(), remaining_before);
+}
+
TEST(Buffer, StreamReadErrorWhenInsufficientData) {
std::vector<uint8_t> raw{0x01, 0x02, 0x03};
OneByteIStream one_byte_stream(raw);
@@ -398,8 +427,8 @@ TEST(Buffer,
OutputStreamRebindDetachesPreviousBufferBacklink) {
first->bind_output_stream(&writer);
second->bind_output_stream(&writer);
- EXPECT_FALSE(first->is_output_stream_backed());
- EXPECT_TRUE(second->is_output_stream_backed());
+ EXPECT_FALSE(first->has_output_stream());
+ EXPECT_TRUE(second->has_output_stream());
writer.enter_flush_barrier();
std::vector<uint8_t> second_payload(5000, 7);
diff --git a/cpp/fory/util/stream.cc b/cpp/fory/util/stream.cc
index 0d06ec7de..d694c3f84 100644
--- a/cpp/fory/util/stream.cc
+++ b/cpp/fory/util/stream.cc
@@ -204,6 +204,14 @@ void StdInputStream::shrink_buffer() {
}
const uint32_t read_pos = buffer_->reader_index_;
+ // Best-effort policy:
+ // 1) keep a hard 4096-byte floor to avoid tiny frequent compactions;
+ // 2) for larger configured input buffers, require at least one full initial
+ // buffer worth of consumed bytes before moving unread data.
+ if (FORY_PREDICT_TRUE(read_pos <= 4096 || read_pos < initial_buffer_size_)) {
+ return;
+ }
+
const uint32_t remaining = remaining_size();
if (read_pos > 0) {
if (remaining > 0) {
diff --git a/cpp/fory/util/stream.h b/cpp/fory/util/stream.h
index c97ceafe0..0927adc27 100644
--- a/cpp/fory/util/stream.h
+++ b/cpp/fory/util/stream.h
@@ -133,6 +133,9 @@ public:
virtual Result<void, Error> unread(uint32_t size) = 0;
+ // Best-effort input-buffer compaction/reclaim hook. Callers may invoke this
+ // frequently; implementations should return quickly unless configured
+ // compaction thresholds are met.
virtual void shrink_buffer() = 0;
virtual Buffer &get_buffer() = 0;
diff --git a/python/pyfory/_fory.py b/python/pyfory/_fory.py
index 2ccb8115b..b57f639da 100644
--- a/python/pyfory/_fory.py
+++ b/python/pyfory/_fory.py
@@ -698,6 +698,8 @@ class Fory:
def read_buffer_object(self, buffer) -> Buffer:
if not self.is_peer_out_of_band_enabled:
size = buffer.read_var_uint32()
+ if buffer.has_input_stream():
+ return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
@@ -707,6 +709,8 @@ class Fory:
assert self._buffers is not None
return next(self._buffers)
size = buffer.read_var_uint32()
+ if buffer.has_input_stream():
+ return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
diff --git a/python/pyfory/buffer.pxi b/python/pyfory/buffer.pxi
index d9c66e2b5..2b98353f4 100644
--- a/python/pyfory/buffer.pxi
+++ b/python/pyfory/buffer.pxi
@@ -244,6 +244,12 @@ cdef class Buffer:
raise ValueError("writer_index must be >= 0")
self.c_buffer.writer_index(<uint32_t>value)
+ cpdef inline void shrink_input_buffer(self):
+ self.c_buffer.shrink_input_buffer()
+
+ cpdef inline c_bool has_input_stream(self):
+ return self.c_buffer.has_input_stream()
+
cpdef c_bool own_data(self):
return self.c_buffer.own_data()
diff --git a/python/pyfory/cpp/pyfory.cc b/python/pyfory/cpp/pyfory.cc
index 13c28103b..0f35f8a64 100644
--- a/python/pyfory/cpp/pyfory.cc
+++ b/python/pyfory/cpp/pyfory.cc
@@ -366,6 +366,14 @@ public:
}
const uint32_t read_pos = buffer_->reader_index_;
+ // Keep Python-backed InputStream shrink behavior aligned with C++:
+ // best-effort compaction only after both the global floor (4096) and the
+ // configured stream buffer size threshold are crossed.
+ if (FORY_PREDICT_TRUE(read_pos <= 4096 ||
+ read_pos < initial_buffer_size_)) {
+ return;
+ }
+
const uint32_t remaining = remaining_size();
if (read_pos > 0) {
if (remaining > 0) {
@@ -1491,13 +1499,13 @@ int Fory_PyPrimitiveCollectionReadFromBuffer(PyObject
*collection,
"tuple collection size is smaller than requested read size");
return -1;
}
- if (!buffer->is_stream_backed() && kind == PythonCollectionKind::List) {
+ if (!buffer->has_input_stream() && kind == PythonCollectionKind::List) {
return read_primitive_sequence_indexed(
buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
PyList_SET_ITEM(collection, i, item);
});
}
- if (!buffer->is_stream_backed() && kind == PythonCollectionKind::Tuple) {
+ if (!buffer->has_input_stream() && kind == PythonCollectionKind::Tuple) {
return read_primitive_sequence_indexed(
buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
PyTuple_SET_ITEM(collection, i, item);
diff --git a/python/pyfory/includes/libutil.pxd
b/python/pyfory/includes/libutil.pxd
index b0d8914e3..3371b0c02 100644
--- a/python/pyfory/includes/libutil.pxd
+++ b/python/pyfory/includes/libutil.pxd
@@ -205,6 +205,10 @@ cdef extern from "fory/util/buffer.h" namespace "fory"
nogil:
void skip(uint32_t length, CError& error)
+ c_bool has_input_stream() const
+
+ void shrink_input_buffer()
+
void copy(uint32_t start, uint32_t nbytes,
uint8_t* out, uint32_t offset) const
diff --git a/python/pyfory/serialization.pyx b/python/pyfory/serialization.pyx
index 4cfe64894..dea890fbc 100644
--- a/python/pyfory/serialization.pyx
+++ b/python/pyfory/serialization.pyx
@@ -1630,6 +1630,8 @@ cdef class Fory:
cdef Buffer buf
if not self.is_peer_out_of_band_enabled:
size = buffer.read_var_uint32()
+ if buffer.has_input_stream():
+ return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
@@ -1639,6 +1641,8 @@ cdef class Fory:
assert self._buffers is not None
return next(self._buffers)
size = buffer.read_var_uint32()
+ if buffer.has_input_stream():
+ return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
diff --git a/python/pyfory/struct.pxi b/python/pyfory/struct.pxi
index 659f28121..6da7e7f9f 100644
--- a/python/pyfory/struct.pxi
+++ b/python/pyfory/struct.pxi
@@ -384,6 +384,7 @@ cdef class DataClassSerializer(Serializer):
self._apply_missing_defaults_slots(obj)
else:
self._apply_missing_defaults_dict(obj.__dict__)
+ buffer.shrink_input_buffer()
return obj
cdef inline void _read_dict(self, Buffer buffer, object obj):
diff --git a/python/pyfory/struct.py b/python/pyfory/struct.py
index cb61db342..bc24dea93 100644
--- a/python/pyfory/struct.py
+++ b/python/pyfory/struct.py
@@ -572,6 +572,7 @@ class DataClassSerializer(Serializer):
obj_dict[field_name] = value
else:
setattr(obj, field_name, value)
+ buffer.shrink_input_buffer()
return obj
diff --git a/python/pyfory/tests/test_stream.py
b/python/pyfory/tests/test_stream.py
index 3276c4054..efa28d622 100644
--- a/python/pyfory/tests/test_stream.py
+++ b/python/pyfory/tests/test_stream.py
@@ -16,6 +16,8 @@
# under the License.
from dataclasses import dataclass
+import io
+import pickle
import pytest
@@ -111,6 +113,12 @@ class StreamStructValue:
values: list
+@dataclass
+class StreamPickleBufferValue:
+ idx: int
+ payload: pickle.PickleBuffer
+
+
@pytest.mark.parametrize("xlang", [False, True])
def test_stream_roundtrip_primitives_and_strings(xlang):
fory = pyfory.Fory(xlang=xlang, ref=True)
@@ -183,6 +191,52 @@ def
test_stream_deserialize_multiple_objects_from_single_stream(xlang):
assert reader.get_reader_index() == reader.size()
[email protected]("xlang", [False, True])
+def test_stream_backed_buffer_struct_deserialize_shrinks_each_struct(xlang):
+ fory = pyfory.Fory(xlang=xlang, ref=True)
+ fory.register(StreamStructValue)
+ first = StreamStructValue(101, "first", list(range(6000)))
+ second = StreamStructValue(202, "second", list(range(6000, 0, -1)))
+
+ payload = fory.dumps(first) + fory.dumps(second)
+ reader = Buffer.from_stream(io.BytesIO(payload), 4096)
+
+ first_result = fory.deserialize(reader)
+ assert first_result == first
+ assert reader.get_reader_index() == 0
+
+ second_result = fory.deserialize(reader)
+ assert second_result == second
+ assert reader.get_reader_index() == 0
+
+
+def test_stream_backed_buffer_pickle_buffer_not_corrupted_after_next_struct():
+ fory = pyfory.Fory(xlang=False, ref=True, strict=False)
+ fory.register(StreamPickleBufferValue)
+ first_payload = b"a" * 7000
+ second_payload = b"b" * 7000
+ first = StreamPickleBufferValue(101, pickle.PickleBuffer(first_payload))
+ second = StreamPickleBufferValue(202, pickle.PickleBuffer(second_payload))
+
+ writer = Buffer.allocate(32768)
+ fory.serialize(first, writer)
+ fory.serialize(second, writer)
+ stream_data = writer.get_bytes(0, writer.get_writer_index())
+ reader = Buffer.from_stream(io.BytesIO(stream_data), 4096)
+
+ first_result = fory.deserialize(reader)
+ assert first_result.idx == first.idx
+ assert bytes(first_result.payload.raw()) == first_payload
+
+ second_result = fory.deserialize(reader)
+ assert second_result.idx == second.idx
+ assert bytes(second_result.payload.raw()) == second_payload
+
+ # Ensure previously returned zero-copy-like payloads remain stable even
+ # after later stream reads trigger shrink logic.
+ assert bytes(first_result.payload.raw()) == first_payload
+
+
@pytest.mark.parametrize("xlang", [False, True])
def test_stream_deserialize_truncated_error(xlang):
fory = pyfory.Fory(xlang=xlang, ref=True)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]