This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e2bb638e feat(python/c++): shrink stream buffers after struct 
deserialization (#3453)
1e2bb638e is described below

commit 1e2bb638e699b1c487cc9dd5bd2cdd83497e545e
Author: Shawn Yang <[email protected]>
AuthorDate: Wed Mar 4 12:15:16 2026 +0800

    feat(python/c++): shrink stream buffers after struct deserialization (#3453)
    
    ## Why?
    
    
    
    ## What does this PR do?
    
    
    
    ## Related issues
    
    #3449
    #3307
    
    ## Does this PR introduce any user-facing change?
    
    
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    ## Benchmark
---
 cpp/fory/serialization/stream_test.cc      | 53 +++++++++++++++++++++++++++--
 cpp/fory/serialization/struct_serializer.h |  8 +++--
 cpp/fory/util/buffer.h                     | 12 ++++---
 cpp/fory/util/buffer_test.cc               | 33 ++++++++++++++++--
 cpp/fory/util/stream.cc                    |  8 +++++
 cpp/fory/util/stream.h                     |  3 ++
 python/pyfory/_fory.py                     |  4 +++
 python/pyfory/buffer.pxi                   |  6 ++++
 python/pyfory/cpp/pyfory.cc                | 12 +++++--
 python/pyfory/includes/libutil.pxd         |  4 +++
 python/pyfory/serialization.pyx            |  4 +++
 python/pyfory/struct.pxi                   |  1 +
 python/pyfory/struct.py                    |  1 +
 python/pyfory/tests/test_stream.py         | 54 ++++++++++++++++++++++++++++++
 14 files changed, 190 insertions(+), 13 deletions(-)

diff --git a/cpp/fory/serialization/stream_test.cc 
b/cpp/fory/serialization/stream_test.cc
index 98783a37e..0c73bb7ac 100644
--- a/cpp/fory/serialization/stream_test.cc
+++ b/cpp/fory/serialization/stream_test.cc
@@ -21,6 +21,7 @@
 #include <istream>
 #include <map>
 #include <memory>
+#include <sstream>
 #include <streambuf>
 #include <string>
 #include <utility>
@@ -218,17 +219,20 @@ TEST(StreamSerializationTest, 
SequentialDeserializeFromSingleStream) {
   auto first = fory.deserialize<int32_t>(stream);
   ASSERT_TRUE(first.ok()) << first.error().to_string();
   EXPECT_EQ(first.value(), 12345);
-  EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+  const uint32_t first_reader_index = stream.get_buffer().reader_index();
+  EXPECT_GT(first_reader_index, 0U);
 
   auto second = fory.deserialize<std::string>(stream);
   ASSERT_TRUE(second.ok()) << second.error().to_string();
   EXPECT_EQ(second.value(), "next-value");
-  EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+  const uint32_t second_reader_index = stream.get_buffer().reader_index();
+  EXPECT_GT(second_reader_index, first_reader_index);
 
   auto third = fory.deserialize<StreamEnvelope>(stream);
   ASSERT_TRUE(third.ok()) << third.error().to_string();
   EXPECT_EQ(third.value(), envelope);
-  EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+  const uint32_t third_reader_index = stream.get_buffer().reader_index();
+  EXPECT_GT(third_reader_index, second_reader_index);
 
   EXPECT_EQ(stream.get_buffer().remaining_size(), 0U);
 }
@@ -310,6 +314,49 @@ TEST(StreamSerializationTest, 
SerializeToOStreamOverloadParity) {
   EXPECT_EQ(out.data(), expected.value());
 }
 
+TEST(StreamSerializationTest,
+     StructDeserializeFromStreamBackedBufferShrinksAfterEachStruct) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  std::vector<int32_t> first_values;
+  std::vector<int32_t> second_values;
+  first_values.reserve(6000);
+  second_values.reserve(6000);
+  for (int32_t i = 0; i < 6000; ++i) {
+    first_values.push_back(i);
+    second_values.push_back(6000 - i);
+  }
+
+  StreamEnvelope first{
+      "first", std::move(first_values), {{"a", 11}, {"b", 22}}, {7, 8}, true,
+  };
+  StreamEnvelope second{
+      "second", std::move(second_values), {{"c", 33}, {"d", 44}}, {9, 10},
+      false,
+  };
+
+  std::vector<uint8_t> bytes;
+  ASSERT_TRUE(fory.serialize_to(bytes, first).ok());
+  ASSERT_TRUE(fory.serialize_to(bytes, second).ok());
+
+  std::string payload(reinterpret_cast<const char *>(bytes.data()),
+                      bytes.size());
+  std::istringstream source(payload);
+  StdInputStream stream(source, 4096);
+  Buffer &buffer = stream.get_buffer();
+
+  auto first_result = fory.deserialize<StreamEnvelope>(buffer);
+  ASSERT_TRUE(first_result.ok()) << first_result.error().to_string();
+  EXPECT_EQ(first_result.value(), first);
+  EXPECT_EQ(buffer.reader_index(), 0U);
+
+  auto second_result = fory.deserialize<StreamEnvelope>(buffer);
+  ASSERT_TRUE(second_result.ok()) << second_result.error().to_string();
+  EXPECT_EQ(second_result.value(), second);
+  EXPECT_EQ(buffer.reader_index(), 0U);
+}
+
 } // namespace test
 } // namespace serialization
 } // namespace fory
diff --git a/cpp/fory/serialization/struct_serializer.h 
b/cpp/fory/serialization/struct_serializer.h
index ccde0e480..4fe76cff2 100644
--- a/cpp/fory/serialization/struct_serializer.h
+++ b/cpp/fory/serialization/struct_serializer.h
@@ -2773,7 +2773,7 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
     // Note: varint bounds checking is done per-byte during reading since
     // varint lengths are variable (actual size << max possible size)
     if constexpr (varint_count > 0) {
-      if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+      if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
         // Stream-backed buffers may not have all varint bytes materialized 
yet.
         // Fall back to per-field readers that propagate stream read errors.
         read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
@@ -2828,7 +2828,7 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
 
   // Phase 2: Read consecutive varint primitives (int32, int64) if any
   if constexpr (varint_count > 0) {
-    if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+    if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
       // Stream-backed buffers may not have all varint bytes materialized yet.
       // Fall back to per-field readers that propagate stream read errors.
       read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
@@ -3270,6 +3270,7 @@ struct Serializer<T, 
std::enable_if_t<is_fory_serializable_v<T>>> {
         if (FORY_PREDICT_FALSE(ctx.has_error())) {
           return T{};
         }
+        ctx.buffer().shrink_input_buffer();
         return obj;
       }
 
@@ -3279,6 +3280,7 @@ struct Serializer<T, 
std::enable_if_t<is_fory_serializable_v<T>>> {
       if (FORY_PREDICT_FALSE(ctx.has_error())) {
         return T{};
       }
+      ctx.buffer().shrink_input_buffer();
       return obj;
     }
 
@@ -3290,6 +3292,7 @@ struct Serializer<T, 
std::enable_if_t<is_fory_serializable_v<T>>> {
       return T{};
     }
 
+    ctx.buffer().shrink_input_buffer();
     return obj;
   }
 
@@ -3333,6 +3336,7 @@ struct Serializer<T, 
std::enable_if_t<is_fory_serializable_v<T>>> {
       return T{};
     }
 
+    ctx.buffer().shrink_input_buffer();
     return obj;
   }
 
diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h
index a44de2d75..a9dc0e1d2 100644
--- a/cpp/fory/util/buffer.h
+++ b/cpp/fory/util/buffer.h
@@ -106,11 +106,11 @@ public:
 
   FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }
 
-  FORY_ALWAYS_INLINE bool is_stream_backed() const {
+  FORY_ALWAYS_INLINE bool has_input_stream() const {
     return input_stream_ != nullptr;
   }
 
-  FORY_ALWAYS_INLINE bool is_output_stream_backed() const {
+  FORY_ALWAYS_INLINE bool has_output_stream() const {
     return output_stream_ != nullptr;
   }
 
@@ -135,8 +135,12 @@ public:
     output_stream_ = nullptr;
   }
 
-  FORY_ALWAYS_INLINE void shrink_stream_buffer() {
-    if (input_stream_ != nullptr) {
+  // Best-effort stream buffer compaction entry point.
+  // Stage 1 guard: avoid calling into stream shrinking for very small 
progress.
+  // Stage 2 guard lives in InputStream::shrink_buffer(), which can decide 
based
+  // on stream-specific configured buffer size.
+  FORY_ALWAYS_INLINE void shrink_input_buffer() {
+    if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) {
       input_stream_->shrink_buffer();
     }
   }
diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc
index 7f32c0685..97043a4fd 100644
--- a/cpp/fory/util/buffer_test.cc
+++ b/cpp/fory/util/buffer_test.cc
@@ -19,6 +19,8 @@
 
 #include <iostream>
 #include <limits>
+#include <sstream>
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -330,6 +332,33 @@ TEST(Buffer, StreamSkipAndUnread) {
   EXPECT_EQ(view.reader_index(), 2U);
 }
 
+TEST(Buffer, StreamShrinkBufferBestEffortUsesConfiguredBufferSize) {
+  constexpr uint32_t kConfiguredBufferSize = 32768;
+  constexpr uint32_t kPayloadSize = kConfiguredBufferSize * 2;
+  std::string payload(kPayloadSize, '\x7');
+  std::istringstream source(payload);
+  StdInputStream stream(source, kConfiguredBufferSize);
+  Buffer reader(stream);
+  Error error;
+
+  reader.skip(5000, error);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.reader_index(), 5000U);
+
+  // Below configured input buffer size, shrink should be a no-op.
+  reader.shrink_input_buffer();
+  EXPECT_EQ(reader.reader_index(), 5000U);
+
+  reader.skip(kConfiguredBufferSize, error);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  ASSERT_GT(reader.reader_index(), kConfiguredBufferSize);
+
+  const uint32_t remaining_before = reader.remaining_size();
+  reader.shrink_input_buffer();
+  EXPECT_EQ(reader.reader_index(), 0U);
+  EXPECT_EQ(reader.size(), remaining_before);
+}
+
 TEST(Buffer, StreamReadErrorWhenInsufficientData) {
   std::vector<uint8_t> raw{0x01, 0x02, 0x03};
   OneByteIStream one_byte_stream(raw);
@@ -398,8 +427,8 @@ TEST(Buffer, 
OutputStreamRebindDetachesPreviousBufferBacklink) {
 
   first->bind_output_stream(&writer);
   second->bind_output_stream(&writer);
-  EXPECT_FALSE(first->is_output_stream_backed());
-  EXPECT_TRUE(second->is_output_stream_backed());
+  EXPECT_FALSE(first->has_output_stream());
+  EXPECT_TRUE(second->has_output_stream());
 
   writer.enter_flush_barrier();
   std::vector<uint8_t> second_payload(5000, 7);
diff --git a/cpp/fory/util/stream.cc b/cpp/fory/util/stream.cc
index 0d06ec7de..d694c3f84 100644
--- a/cpp/fory/util/stream.cc
+++ b/cpp/fory/util/stream.cc
@@ -204,6 +204,14 @@ void StdInputStream::shrink_buffer() {
   }
 
   const uint32_t read_pos = buffer_->reader_index_;
+  // Best-effort policy:
+  // 1) keep a hard 4096-byte floor to avoid tiny frequent compactions;
+  // 2) for larger configured input buffers, require at least one full initial
+  //    buffer worth of consumed bytes before moving unread data.
+  if (FORY_PREDICT_TRUE(read_pos <= 4096 || read_pos < initial_buffer_size_)) {
+    return;
+  }
+
   const uint32_t remaining = remaining_size();
   if (read_pos > 0) {
     if (remaining > 0) {
diff --git a/cpp/fory/util/stream.h b/cpp/fory/util/stream.h
index c97ceafe0..0927adc27 100644
--- a/cpp/fory/util/stream.h
+++ b/cpp/fory/util/stream.h
@@ -133,6 +133,9 @@ public:
 
   virtual Result<void, Error> unread(uint32_t size) = 0;
 
+  // Best-effort input-buffer compaction/reclaim hook. Callers may invoke this
+  // frequently; implementations should return quickly unless configured
+  // compaction thresholds are met.
   virtual void shrink_buffer() = 0;
 
   virtual Buffer &get_buffer() = 0;
diff --git a/python/pyfory/_fory.py b/python/pyfory/_fory.py
index 2ccb8115b..b57f639da 100644
--- a/python/pyfory/_fory.py
+++ b/python/pyfory/_fory.py
@@ -698,6 +698,8 @@ class Fory:
     def read_buffer_object(self, buffer) -> Buffer:
         if not self.is_peer_out_of_band_enabled:
             size = buffer.read_var_uint32()
+            if buffer.has_input_stream():
+                return buffer.read_bytes(size)
             reader_index = buffer.get_reader_index()
             buf = buffer.slice(reader_index, size)
             buffer.set_reader_index(reader_index + size)
@@ -707,6 +709,8 @@ class Fory:
             assert self._buffers is not None
             return next(self._buffers)
         size = buffer.read_var_uint32()
+        if buffer.has_input_stream():
+            return buffer.read_bytes(size)
         reader_index = buffer.get_reader_index()
         buf = buffer.slice(reader_index, size)
         buffer.set_reader_index(reader_index + size)
diff --git a/python/pyfory/buffer.pxi b/python/pyfory/buffer.pxi
index d9c66e2b5..2b98353f4 100644
--- a/python/pyfory/buffer.pxi
+++ b/python/pyfory/buffer.pxi
@@ -244,6 +244,12 @@ cdef class Buffer:
             raise ValueError("writer_index must be >= 0")
         self.c_buffer.writer_index(<uint32_t>value)
 
+    cpdef inline void shrink_input_buffer(self):
+        self.c_buffer.shrink_input_buffer()
+
+    cpdef inline c_bool has_input_stream(self):
+        return self.c_buffer.has_input_stream()
+
     cpdef c_bool own_data(self):
         return self.c_buffer.own_data()
 
diff --git a/python/pyfory/cpp/pyfory.cc b/python/pyfory/cpp/pyfory.cc
index 13c28103b..0f35f8a64 100644
--- a/python/pyfory/cpp/pyfory.cc
+++ b/python/pyfory/cpp/pyfory.cc
@@ -366,6 +366,14 @@ public:
     }
 
     const uint32_t read_pos = buffer_->reader_index_;
+    // Keep Python-backed InputStream shrink behavior aligned with C++:
+    // best-effort compaction only after both the global floor (4096) and the
+    // configured stream buffer size threshold are crossed.
+    if (FORY_PREDICT_TRUE(read_pos <= 4096 ||
+                          read_pos < initial_buffer_size_)) {
+      return;
+    }
+
     const uint32_t remaining = remaining_size();
     if (read_pos > 0) {
       if (remaining > 0) {
@@ -1491,13 +1499,13 @@ int Fory_PyPrimitiveCollectionReadFromBuffer(PyObject 
*collection,
         "tuple collection size is smaller than requested read size");
     return -1;
   }
-  if (!buffer->is_stream_backed() && kind == PythonCollectionKind::List) {
+  if (!buffer->has_input_stream() && kind == PythonCollectionKind::List) {
     return read_primitive_sequence_indexed(
         buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
           PyList_SET_ITEM(collection, i, item);
         });
   }
-  if (!buffer->is_stream_backed() && kind == PythonCollectionKind::Tuple) {
+  if (!buffer->has_input_stream() && kind == PythonCollectionKind::Tuple) {
     return read_primitive_sequence_indexed(
         buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
           PyTuple_SET_ITEM(collection, i, item);
diff --git a/python/pyfory/includes/libutil.pxd 
b/python/pyfory/includes/libutil.pxd
index b0d8914e3..3371b0c02 100644
--- a/python/pyfory/includes/libutil.pxd
+++ b/python/pyfory/includes/libutil.pxd
@@ -205,6 +205,10 @@ cdef extern from "fory/util/buffer.h" namespace "fory" 
nogil:
 
         void skip(uint32_t length, CError& error)
 
+        c_bool has_input_stream() const
+
+        void shrink_input_buffer()
+
         void copy(uint32_t start, uint32_t nbytes,
                   uint8_t* out, uint32_t offset) const
 
diff --git a/python/pyfory/serialization.pyx b/python/pyfory/serialization.pyx
index 4cfe64894..dea890fbc 100644
--- a/python/pyfory/serialization.pyx
+++ b/python/pyfory/serialization.pyx
@@ -1630,6 +1630,8 @@ cdef class Fory:
         cdef Buffer buf
         if not self.is_peer_out_of_band_enabled:
             size = buffer.read_var_uint32()
+            if buffer.has_input_stream():
+                return buffer.read_bytes(size)
             reader_index = buffer.get_reader_index()
             buf = buffer.slice(reader_index, size)
             buffer.set_reader_index(reader_index + size)
@@ -1639,6 +1641,8 @@ cdef class Fory:
             assert self._buffers is not None
             return next(self._buffers)
         size = buffer.read_var_uint32()
+        if buffer.has_input_stream():
+            return buffer.read_bytes(size)
         reader_index = buffer.get_reader_index()
         buf = buffer.slice(reader_index, size)
         buffer.set_reader_index(reader_index + size)
diff --git a/python/pyfory/struct.pxi b/python/pyfory/struct.pxi
index 659f28121..6da7e7f9f 100644
--- a/python/pyfory/struct.pxi
+++ b/python/pyfory/struct.pxi
@@ -384,6 +384,7 @@ cdef class DataClassSerializer(Serializer):
                 self._apply_missing_defaults_slots(obj)
             else:
                 self._apply_missing_defaults_dict(obj.__dict__)
+        buffer.shrink_input_buffer()
         return obj
 
     cdef inline void _read_dict(self, Buffer buffer, object obj):
diff --git a/python/pyfory/struct.py b/python/pyfory/struct.py
index cb61db342..bc24dea93 100644
--- a/python/pyfory/struct.py
+++ b/python/pyfory/struct.py
@@ -572,6 +572,7 @@ class DataClassSerializer(Serializer):
                     obj_dict[field_name] = value
                 else:
                     setattr(obj, field_name, value)
+        buffer.shrink_input_buffer()
         return obj
 
 
diff --git a/python/pyfory/tests/test_stream.py 
b/python/pyfory/tests/test_stream.py
index 3276c4054..efa28d622 100644
--- a/python/pyfory/tests/test_stream.py
+++ b/python/pyfory/tests/test_stream.py
@@ -16,6 +16,8 @@
 # under the License.
 
 from dataclasses import dataclass
+import io
+import pickle
 
 import pytest
 
@@ -111,6 +113,12 @@ class StreamStructValue:
     values: list
 
 
+@dataclass
+class StreamPickleBufferValue:
+    idx: int
+    payload: pickle.PickleBuffer
+
+
 @pytest.mark.parametrize("xlang", [False, True])
 def test_stream_roundtrip_primitives_and_strings(xlang):
     fory = pyfory.Fory(xlang=xlang, ref=True)
@@ -183,6 +191,52 @@ def 
test_stream_deserialize_multiple_objects_from_single_stream(xlang):
     assert reader.get_reader_index() == reader.size()
 
 
[email protected]("xlang", [False, True])
+def test_stream_backed_buffer_struct_deserialize_shrinks_each_struct(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    fory.register(StreamStructValue)
+    first = StreamStructValue(101, "first", list(range(6000)))
+    second = StreamStructValue(202, "second", list(range(6000, 0, -1)))
+
+    payload = fory.dumps(first) + fory.dumps(second)
+    reader = Buffer.from_stream(io.BytesIO(payload), 4096)
+
+    first_result = fory.deserialize(reader)
+    assert first_result == first
+    assert reader.get_reader_index() == 0
+
+    second_result = fory.deserialize(reader)
+    assert second_result == second
+    assert reader.get_reader_index() == 0
+
+
+def test_stream_backed_buffer_pickle_buffer_not_corrupted_after_next_struct():
+    fory = pyfory.Fory(xlang=False, ref=True, strict=False)
+    fory.register(StreamPickleBufferValue)
+    first_payload = b"a" * 7000
+    second_payload = b"b" * 7000
+    first = StreamPickleBufferValue(101, pickle.PickleBuffer(first_payload))
+    second = StreamPickleBufferValue(202, pickle.PickleBuffer(second_payload))
+
+    writer = Buffer.allocate(32768)
+    fory.serialize(first, writer)
+    fory.serialize(second, writer)
+    stream_data = writer.get_bytes(0, writer.get_writer_index())
+    reader = Buffer.from_stream(io.BytesIO(stream_data), 4096)
+
+    first_result = fory.deserialize(reader)
+    assert first_result.idx == first.idx
+    assert bytes(first_result.payload.raw()) == first_payload
+
+    second_result = fory.deserialize(reader)
+    assert second_result.idx == second.idx
+    assert bytes(second_result.payload.raw()) == second_payload
+
+    # Ensure previously returned zero-copy-like payloads remain stable even
+    # after later stream reads trigger shrink logic.
+    assert bytes(first_result.payload.raw()) == first_payload
+
+
 @pytest.mark.parametrize("xlang", [False, True])
 def test_stream_deserialize_truncated_error(xlang):
     fory = pyfory.Fory(xlang=xlang, ref=True)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to