This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git


The following commit(s) were added to refs/heads/main by this push:
     new 812ade8bb feat(c++/python): support stream deserialization for c++ and 
python (#3307)
812ade8bb is described below

commit 812ade8bbbcb50fb26e9a795a7f2c21b2b7732bb
Author: Shawn Yang <[email protected]>
AuthorDate: Fri Feb 27 00:30:04 2026 +0800

    feat(c++/python): support stream deserialization for c++ and python (#3307)
    
    ## Why?
    
    C++ and Python deserialization currently assumes data is already
    materialized in memory-backed buffers. This PR adds stream-backed
    deserialization support so payloads can be read incrementally from input
    streams while preserving existing serialization behavior and error
    handling.
    
    ## What does this PR do?
    
    - Adds stream infrastructure in C++
    (`ForyInputStreamBuf`/`ForyInputStream`) and integrates it with `Buffer`
    so reads can request more bytes on demand.
    - Adds Python-to-C++ stream bridge (`Fory_PyCreateBufferFromStream`) so
    `pyfory.buffer.Buffer` can be constructed from Python objects that
    implement `read(size)`.
    - Updates deserialization paths to be stream-safe by using `ensure_size`
    checks for header/string/fixed-field reads and falling back from batched
    varint reads on stream-backed buffers.
    - Extends build rules to compile/link new stream components and adds
    stream-focused C++ tests (`stream_test.cc`, buffer stream tests).
    - Adds Python stream tests (`python/pyfory/tests/test_stream.py`) and
    buffer stream coverage in `python/pyfory/tests/test_buffer.py`.
    
    ## Related issues
    
    N/A
    
    ## Does this PR introduce any user-facing change?
    
    - [x] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    ## Benchmark
    
    N/A
---
 AGENTS.md                                      |   1 +
 BUILD                                          |   1 +
 cpp/fory/python/pyfory.cc                      | 359 ++++++++++++++++++++
 cpp/fory/python/pyfory.h                       |   6 +-
 cpp/fory/serialization/BUILD                   |  10 +
 cpp/fory/serialization/CMakeLists.txt          |   4 +
 cpp/fory/serialization/collection_serializer.h |   5 +-
 cpp/fory/serialization/fory.h                  |  54 ++-
 cpp/fory/serialization/skip.cc                 |  37 +-
 cpp/fory/serialization/stream_test.cc          | 240 +++++++++++++
 cpp/fory/serialization/string_serializer.h     |  43 +--
 cpp/fory/serialization/struct_serializer.h     |  33 +-
 cpp/fory/util/CMakeLists.txt                   |   2 +
 cpp/fory/util/buffer.cc                        |  11 +
 cpp/fory/util/buffer.h                         | 450 +++++++++++++++++--------
 cpp/fory/util/buffer_test.cc                   | 143 ++++++++
 cpp/fory/util/stream.cc                        | 218 ++++++++++++
 cpp/fory/util/stream.h                         |  91 +++++
 python/pyfory/buffer.pxd                       |   2 -
 python/pyfory/buffer.pyx                       |  65 +++-
 python/pyfory/includes/libpyfory.pxd           |  26 ++
 python/pyfory/includes/libutil.pxd             |   6 +-
 python/pyfory/tests/test_buffer.py             | 104 ++++++
 python/pyfory/tests/test_stream.py             | 150 +++++++++
 24 files changed, 1838 insertions(+), 223 deletions(-)

diff --git a/AGENTS.md b/AGENTS.md
index 398237fd0..bcc882131 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -6,6 +6,7 @@ This file provides comprehensive guidance to AI coding agents 
when working with
 
 While working on Fory, please remember:
 
+- **Do not reserve any legacy code/docs unless requested clearly**.
 - **Performance First**: Performance is the top priority. Never introduce code 
that reduces performance without explicit justification.
 - **English Only**: Always use English in code, comments, and documentation.
 - **Meaningful Comments**: Only add comments when the code's behavior is 
difficult to understand or when documenting complex algorithms.
diff --git a/BUILD b/BUILD
index 809727199..51bdffc8d 100644
--- a/BUILD
+++ b/BUILD
@@ -31,6 +31,7 @@ pyx_library(
         linkstatic = 1,
     ),
     deps = [
+        "//cpp/fory/python:_pyfory",
         "//cpp/fory/util:fory_util",
     ],
 )
diff --git a/cpp/fory/python/pyfory.cc b/cpp/fory/python/pyfory.cc
index 04117a7b5..fb6abd184 100644
--- a/cpp/fory/python/pyfory.cc
+++ b/cpp/fory/python/pyfory.cc
@@ -19,6 +19,15 @@
 
 #include "fory/python/pyfory.h"
 
+#include <algorithm>
+#include <cstring>
+#include <exception>
+#include <limits>
+#include <memory>
+#include <vector>
+
+#include "fory/util/stream.h"
+
 static PyObject **py_sequence_get_items(PyObject *collection) {
   if (PyList_CheckExact(collection)) {
     return ((PyListObject *)collection)->ob_item;
@@ -29,6 +38,335 @@ static PyObject **py_sequence_get_items(PyObject 
*collection) {
 }
 
 namespace fory {
+
+static std::string fetch_python_error_message() {
+  PyObject *type = nullptr;
+  PyObject *value = nullptr;
+  PyObject *traceback = nullptr;
+  PyErr_Fetch(&type, &value, &traceback);
+  PyErr_NormalizeException(&type, &value, &traceback);
+  std::string message = "python stream read failed";
+  if (value != nullptr) {
+    PyObject *value_str = PyObject_Str(value);
+    if (value_str != nullptr) {
+      const char *c_str = PyUnicode_AsUTF8(value_str);
+      if (c_str != nullptr) {
+        message = c_str;
+      }
+      Py_DECREF(value_str);
+    } else {
+      PyErr_Clear();
+    }
+  }
+  Py_XDECREF(type);
+  Py_XDECREF(value);
+  Py_XDECREF(traceback);
+  return message;
+}
+
+enum class PythonStreamReadMethod {
+  ReadInto,
+  RecvInto,
+  RecvIntoUnderscore,
+};
+
+static const char *
+python_stream_read_method_name(PythonStreamReadMethod method) {
+  switch (method) {
+  case PythonStreamReadMethod::ReadInto:
+    return "readinto";
+  case PythonStreamReadMethod::RecvInto:
+    return "recvinto";
+  case PythonStreamReadMethod::RecvIntoUnderscore:
+    return "recv_into";
+  }
+  return "readinto";
+}
+
+static bool resolve_python_stream_read_method(PyObject *stream,
+                                              PythonStreamReadMethod *method,
+                                              std::string *error_message) {
+  struct MethodCandidate {
+    const char *name;
+    PythonStreamReadMethod method;
+  };
+  constexpr MethodCandidate k_candidates[] = {
+      {"readinto", PythonStreamReadMethod::ReadInto},
+      {"recv_into", PythonStreamReadMethod::RecvIntoUnderscore},
+      {"recvinto", PythonStreamReadMethod::RecvInto},
+  };
+  for (const auto &candidate : k_candidates) {
+    const int has_method = PyObject_HasAttrString(stream, candidate.name);
+    if (has_method < 0) {
+      *error_message = fetch_python_error_message();
+      return false;
+    }
+    if (has_method == 0) {
+      continue;
+    }
+    PyObject *method_obj = PyObject_GetAttrString(stream, candidate.name);
+    if (method_obj == nullptr) {
+      *error_message = fetch_python_error_message();
+      return false;
+    }
+    const bool is_callable = PyCallable_Check(method_obj) != 0;
+    Py_DECREF(method_obj);
+    if (is_callable) {
+      *method = candidate.method;
+      return true;
+    }
+  }
+  *error_message = "stream object must provide readinto(buffer), "
+                   "recv_into(buffer, size) or recvinto(buffer, size) method";
+  return false;
+}
+
+class PythonStreamReader final : public StreamReader {
+public:
+  explicit PythonStreamReader(PyObject *stream, uint32_t buffer_size,
+                              PythonStreamReadMethod read_method)
+      : stream_(stream), read_method_(read_method),
+        read_method_name_(python_stream_read_method_name(read_method)),
+        data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+        initial_buffer_size_(
+            std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+        owned_buffer_(std::make_unique<Buffer>()) {
+    FORY_CHECK(stream_ != nullptr) << "stream must not be null";
+    Py_INCREF(stream_);
+    bind_buffer(owned_buffer_.get());
+  }
+
+  ~PythonStreamReader() override {
+    if (stream_ != nullptr) {
+      PyGILState_STATE gil_state = PyGILState_Ensure();
+      Py_DECREF(stream_);
+      PyGILState_Release(gil_state);
+      stream_ = nullptr;
+    }
+  }
+
+  Result<void, Error> fill_buffer(uint32_t min_fill_size) override {
+    if (min_fill_size == 0 || remaining_size() >= min_fill_size) {
+      return Result<void, Error>();
+    }
+
+    const uint32_t read_pos = buffer_->reader_index_;
+    const uint32_t deficit = min_fill_size - remaining_size();
+    constexpr uint64_t k_max_u32 = std::numeric_limits<uint32_t>::max();
+    const uint64_t required = static_cast<uint64_t>(buffer_->size_) + deficit;
+    if (required > k_max_u32) {
+      return Unexpected(
+          Error::out_of_bound("stream buffer size exceeds uint32 range"));
+    }
+    if (required > data_.size()) {
+      uint64_t new_size =
+          std::max<uint64_t>(required, static_cast<uint64_t>(data_.size()) * 
2);
+      if (new_size > k_max_u32) {
+        new_size = k_max_u32;
+      }
+      reserve(static_cast<uint32_t>(new_size));
+    }
+
+    uint32_t write_pos = buffer_->size_;
+    while (remaining_size() < min_fill_size) {
+      uint32_t writable = static_cast<uint32_t>(data_.size()) - write_pos;
+      auto read_result = recv_into(data_.data() + write_pos, writable);
+      if (FORY_PREDICT_FALSE(!read_result.ok())) {
+        return Unexpected(std::move(read_result).error());
+      }
+      uint32_t read_bytes = std::move(read_result).value();
+      if (read_bytes == 0) {
+        return Unexpected(Error::buffer_out_of_bound(read_pos, min_fill_size,
+                                                     remaining_size()));
+      }
+      write_pos += read_bytes;
+      buffer_->size_ = write_pos;
+    }
+    return Result<void, Error>();
+  }
+
+  Result<void, Error> read_to(uint8_t *dst, uint32_t length) override {
+    if (length == 0) {
+      return Result<void, Error>();
+    }
+    Error error;
+    if (FORY_PREDICT_FALSE(!buffer_->ensure_readable(length, error))) {
+      return Unexpected(std::move(error));
+    }
+    std::memcpy(dst, buffer_->data_ + buffer_->reader_index_,
+                static_cast<size_t>(length));
+    buffer_->reader_index_ += length;
+    return Result<void, Error>();
+  }
+
+  Result<void, Error> skip(uint32_t size) override {
+    if (size == 0) {
+      return Result<void, Error>();
+    }
+    Error error;
+    buffer_->increase_reader_index(size, error);
+    if (FORY_PREDICT_FALSE(!error.ok())) {
+      return Unexpected(std::move(error));
+    }
+    return Result<void, Error>();
+  }
+
+  void shrink_buffer() override {
+    if (buffer_ == nullptr) {
+      return;
+    }
+
+    const uint32_t read_pos = buffer_->reader_index_;
+    const uint32_t remaining = remaining_size();
+    if (read_pos > 0) {
+      if (remaining > 0) {
+        std::memmove(data_.data(), data_.data() + read_pos,
+                     static_cast<size_t>(remaining));
+      }
+      buffer_->reader_index_ = 0;
+      buffer_->size_ = remaining;
+      buffer_->writer_index_ = remaining;
+    }
+
+    const uint32_t current_capacity = static_cast<uint32_t>(data_.size());
+    uint32_t target_capacity = current_capacity;
+    if (current_capacity > initial_buffer_size_) {
+      if (remaining == 0) {
+        target_capacity = initial_buffer_size_;
+      } else if (remaining <= current_capacity / 4) {
+        const uint32_t doubled =
+            remaining > std::numeric_limits<uint32_t>::max() / 2
+                ? std::numeric_limits<uint32_t>::max()
+                : remaining * 2;
+        target_capacity = std::max<uint32_t>(
+            initial_buffer_size_,
+            std::max<uint32_t>(doubled, static_cast<uint32_t>(1)));
+      }
+    }
+    if (target_capacity < current_capacity) {
+      data_.resize(target_capacity);
+      data_.shrink_to_fit();
+      buffer_->data_ = data_.data();
+    }
+  }
+
+  Result<void, Error> unread(uint32_t size) override {
+    if (FORY_PREDICT_FALSE(size > buffer_->reader_index_)) {
+      return Unexpected(Error::buffer_out_of_bound(buffer_->reader_index_, 
size,
+                                                   buffer_->size_));
+    }
+    buffer_->reader_index_ -= size;
+    return Result<void, Error>();
+  }
+
+  Buffer &get_buffer() override { return *buffer_; }
+
+  void bind_buffer(Buffer *buffer) override {
+    Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
+    if (target == nullptr) {
+      if (buffer_ != nullptr) {
+        buffer_->stream_reader_ = nullptr;
+      }
+      buffer_ = nullptr;
+      return;
+    }
+
+    if (buffer_ == target) {
+      buffer_->data_ = data_.data();
+      buffer_->own_data_ = false;
+      buffer_->wrapped_vector_ = nullptr;
+      buffer_->stream_reader_ = this;
+      return;
+    }
+
+    Buffer *source = buffer_;
+    if (source != nullptr) {
+      target->size_ = source->size_;
+      target->writer_index_ = source->writer_index_;
+      target->reader_index_ = source->reader_index_;
+      source->stream_reader_ = nullptr;
+    } else {
+      target->size_ = 0;
+      target->writer_index_ = 0;
+      target->reader_index_ = 0;
+    }
+
+    buffer_ = target;
+    buffer_->data_ = data_.data();
+    buffer_->own_data_ = false;
+    buffer_->wrapped_vector_ = nullptr;
+    buffer_->stream_reader_ = this;
+  }
+
+private:
+  Result<uint32_t, Error> recv_into(void *dst, uint32_t length) {
+    if (length == 0) {
+      return 0U;
+    }
+    PyGILState_STATE gil_state = PyGILState_Ensure();
+    PyObject *memory_view =
+        PyMemoryView_FromMemory(reinterpret_cast<char *>(dst),
+                                static_cast<Py_ssize_t>(length), PyBUF_WRITE);
+    if (memory_view == nullptr) {
+      std::string message = fetch_python_error_message();
+      PyGILState_Release(gil_state);
+      return Unexpected(Error::io_error(message));
+    }
+    PyObject *read_bytes_obj = nullptr;
+    switch (read_method_) {
+    case PythonStreamReadMethod::ReadInto:
+      read_bytes_obj =
+          PyObject_CallMethod(stream_, read_method_name_, "O", memory_view);
+      break;
+    case PythonStreamReadMethod::RecvInto:
+    case PythonStreamReadMethod::RecvIntoUnderscore:
+      read_bytes_obj =
+          PyObject_CallMethod(stream_, read_method_name_, "On", memory_view,
+                              static_cast<Py_ssize_t>(length));
+      break;
+    }
+    Py_DECREF(memory_view);
+    if (read_bytes_obj == nullptr) {
+      std::string message = fetch_python_error_message();
+      PyGILState_Release(gil_state);
+      return Unexpected(Error::io_error(message));
+    }
+
+    Py_ssize_t read_bytes = PyLong_AsSsize_t(read_bytes_obj);
+    Py_DECREF(read_bytes_obj);
+    if (read_bytes == -1 && PyErr_Occurred()) {
+      std::string message = fetch_python_error_message();
+      PyGILState_Release(gil_state);
+      return Unexpected(Error::io_error(message));
+    }
+    PyGILState_Release(gil_state);
+    if (read_bytes < 0 ||
+        static_cast<uint64_t>(read_bytes) > static_cast<uint64_t>(length)) {
+      return Unexpected(Error::io_error("python stream " +
+                                        std::string(read_method_name_) +
+                                        " returned invalid length"));
+    }
+    return static_cast<uint32_t>(read_bytes);
+  }
+
+  uint32_t remaining_size() const {
+    return buffer_->size_ - buffer_->reader_index_;
+  }
+
+  void reserve(uint32_t new_size) {
+    data_.resize(new_size);
+    buffer_->data_ = data_.data();
+  }
+
+  PyObject *stream_ = nullptr;
+  PythonStreamReadMethod read_method_;
+  const char *read_method_name_ = nullptr;
+  std::vector<uint8_t> data_;
+  uint32_t initial_buffer_size_ = 1;
+  Buffer *buffer_ = nullptr;
+  std::unique_ptr<Buffer> owned_buffer_;
+};
+
 int Fory_PyBooleanSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
                                         Py_ssize_t start_index) {
   PyObject **items = py_sequence_get_items(collection);
@@ -58,4 +396,25 @@ int Fory_PyFloatSequenceWriteToBuffer(PyObject *collection, 
Buffer *buffer,
   }
   return 0;
 }
+
+int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
+                                  Buffer **out, std::string *error_message) {
+  if (stream == nullptr) {
+    *error_message = "stream must not be null";
+    return -1;
+  }
+  PythonStreamReadMethod read_method = PythonStreamReadMethod::ReadInto;
+  if (!resolve_python_stream_read_method(stream, &read_method, error_message)) 
{
+    return -1;
+  }
+  try {
+    auto stream_reader =
+        std::make_shared<PythonStreamReader>(stream, buffer_size, read_method);
+    *out = new Buffer(*stream_reader);
+    return 0;
+  } catch (const std::exception &e) {
+    *error_message = e.what();
+    return -1;
+  }
+}
 } // namespace fory
diff --git a/cpp/fory/python/pyfory.h b/cpp/fory/python/pyfory.h
index 467a86970..780ef3534 100644
--- a/cpp/fory/python/pyfory.h
+++ b/cpp/fory/python/pyfory.h
@@ -18,6 +18,8 @@
  */
 
 #pragma once
+#include <string>
+
 #include "Python.h"
 #include "fory/util/buffer.h"
 
@@ -26,4 +28,6 @@ int Fory_PyBooleanSequenceWriteToBuffer(PyObject *collection, 
Buffer *buffer,
                                         Py_ssize_t start_index);
 int Fory_PyFloatSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
                                       Py_ssize_t start_index);
-} // namespace fory
\ No newline at end of file
+int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
+                                  Buffer **out, std::string *error_message);
+} // namespace fory
diff --git a/cpp/fory/serialization/BUILD b/cpp/fory/serialization/BUILD
index 5f737ae5f..c3dfe5bd5 100644
--- a/cpp/fory/serialization/BUILD
+++ b/cpp/fory/serialization/BUILD
@@ -174,6 +174,16 @@ cc_test(
     ],
 )
 
+cc_test(
+    name = "stream_test",
+    srcs = ["stream_test.cc"],
+    deps = [
+        ":fory_serialization",
+        "@googletest//:gtest",
+        "@googletest//:gtest_main",
+    ],
+)
+
 cc_binary(
     name = "xlang_test_main",
     srcs = ["xlang_test_main.cc"],
diff --git a/cpp/fory/serialization/CMakeLists.txt 
b/cpp/fory/serialization/CMakeLists.txt
index d4852742e..0582554c7 100644
--- a/cpp/fory/serialization/CMakeLists.txt
+++ b/cpp/fory/serialization/CMakeLists.txt
@@ -113,6 +113,10 @@ if(FORY_BUILD_TESTS)
     add_executable(fory_serialization_any_test any_serializer_test.cc)
     target_link_libraries(fory_serialization_any_test fory_serialization 
GTest::gtest GTest::gtest_main)
     gtest_discover_tests(fory_serialization_any_test)
+
+    add_executable(fory_serialization_stream_test stream_test.cc)
+    target_link_libraries(fory_serialization_stream_test fory_serialization 
GTest::gtest GTest::gtest_main)
+    gtest_discover_tests(fory_serialization_stream_test)
 endif()
 
 # xlang test binary
diff --git a/cpp/fory/serialization/collection_serializer.h 
b/cpp/fory/serialization/collection_serializer.h
index a780d1c0c..3768275d1 100644
--- a/cpp/fory/serialization/collection_serializer.h
+++ b/cpp/fory/serialization/collection_serializer.h
@@ -905,7 +905,10 @@ template <typename Alloc> struct 
Serializer<std::vector<bool, Alloc>> {
       for (uint32_t i = 0; i < size; ++i) {
         result[i] = (src[i] != 0);
       }
-      buffer.increase_reader_index(size);
+      buffer.increase_reader_index(size, ctx.error());
+      if (FORY_PREDICT_FALSE(ctx.has_error())) {
+        return std::vector<bool, Alloc>();
+      }
     } else {
       // Fallback: read byte-by-byte with bounds checking
       for (uint32_t i = 0; i < size; ++i) {
diff --git a/cpp/fory/serialization/fory.h b/cpp/fory/serialization/fory.h
index 53735400b..09a0bfa48 100644
--- a/cpp/fory/serialization/fory.h
+++ b/cpp/fory/serialization/fory.h
@@ -37,6 +37,7 @@
 #include "fory/util/error.h"
 #include "fory/util/pool.h"
 #include "fory/util/result.h"
+#include "fory/util/stream.h"
 #include <cstring>
 #include <memory>
 #include <mutex>
@@ -513,9 +514,9 @@ public:
       ensure_finalized();
     }
     // Swap in the caller's buffer so all writes go there.
-    std::swap(buffer, write_ctx_->buffer());
+    buffer.swap(write_ctx_->buffer());
     auto result = serialize_impl(obj, write_ctx_->buffer());
-    std::swap(buffer, write_ctx_->buffer());
+    buffer.swap(write_ctx_->buffer());
     // reset internal state after use without clobbering caller buffer.
     write_ctx_->reset();
     return result;
@@ -606,7 +607,11 @@ public:
     if (FORY_PREDICT_FALSE(!finalized_)) {
       ensure_finalized();
     }
-    FORY_TRY(header, read_header(buffer));
+    auto header_result = read_header(buffer);
+    if (FORY_PREDICT_FALSE(!header_result.ok())) {
+      return Unexpected(std::move(header_result).error());
+    }
+    auto header = std::move(header_result).value();
     if (header.is_null) {
       return Unexpected(Error::invalid_data("Cannot deserialize null object"));
     }
@@ -622,6 +627,38 @@ public:
     return deserialize_impl<T>(buffer);
   }
 
+  /// Deserialize an object from a stream reader.
+  ///
+  /// This overload obtains the reader-owned Buffer via get_buffer() and
+  /// continues deserialization on that buffer.
+  ///
+  /// @tparam T The type of object to deserialize.
+  /// @param stream_reader Stream reader to read from.
+  /// @return Deserialized object, or error.
+  template <typename T>
+  Result<T, Error> deserialize(StreamReader &stream_reader) {
+    struct StreamShrinkGuard {
+      StreamReader *stream_reader = nullptr;
+      ~StreamShrinkGuard() {
+        if (stream_reader != nullptr) {
+          stream_reader->shrink_buffer();
+        }
+      }
+    };
+    StreamShrinkGuard shrink_guard{&stream_reader};
+    Buffer &buffer = stream_reader.get_buffer();
+    return deserialize<T>(buffer);
+  }
+
+  /// Deserialize an object from ForyInputStream.
+  ///
+  /// @tparam T The type of object to deserialize.
+  /// @param stream Input stream wrapper to read from.
+  /// @return Deserialized object, or error.
+  template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
+    return deserialize<T>(static_cast<StreamReader &>(stream));
+  }
+
   // ==========================================================================
   // Advanced Access
   // ==========================================================================
@@ -792,6 +829,17 @@ public:
     return deserialize<T>(data.data(), data.size());
   }
 
+  template <typename T>
+  Result<T, Error> deserialize(StreamReader &stream_reader) {
+    auto fory_handle = fory_pool_.acquire();
+    return fory_handle->template deserialize<T>(stream_reader);
+  }
+
+  template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
+    auto fory_handle = fory_pool_.acquire();
+    return fory_handle->template deserialize<T>(stream);
+  }
+
 private:
   explicit ThreadSafeFory(const Config &config,
                           std::shared_ptr<TypeResolver> resolver)
diff --git a/cpp/fory/serialization/skip.cc b/cpp/fory/serialization/skip.cc
index 946d6fbf4..3a89e295a 100644
--- a/cpp/fory/serialization/skip.cc
+++ b/cpp/fory/serialization/skip.cc
@@ -48,7 +48,7 @@ void skip_string(ReadContext &ctx) {
   uint64_t size = size_encoding >> 2;
 
   // skip string data
-  ctx.buffer().increase_reader_index(size);
+  ctx.buffer().increase_reader_index(size, ctx.error());
 }
 
 void skip_list(ReadContext &ctx, const FieldType &field_type) {
@@ -494,13 +494,13 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
   case TypeId::BOOL:
   case TypeId::INT8:
   case TypeId::FLOAT8:
-    ctx.buffer().increase_reader_index(1);
+    ctx.buffer().increase_reader_index(1, ctx.error());
     return;
 
   case TypeId::INT16:
   case TypeId::BFLOAT16:
   case TypeId::FLOAT16:
-    ctx.buffer().increase_reader_index(2);
+    ctx.buffer().increase_reader_index(2, ctx.error());
     return;
 
   case TypeId::INT32: {
@@ -512,7 +512,7 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
   }
 
   case TypeId::FLOAT32:
-    ctx.buffer().increase_reader_index(4);
+    ctx.buffer().increase_reader_index(4, ctx.error());
     return;
 
   case TypeId::INT64: {
@@ -524,7 +524,7 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
   }
 
   case TypeId::FLOAT64:
-    ctx.buffer().increase_reader_index(8);
+    ctx.buffer().increase_reader_index(8, ctx.error());
     return;
 
   case TypeId::VARINT32:
@@ -551,36 +551,21 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
   case TypeId::DURATION: {
     // Duration is stored as fixed 8-byte nanosecond count.
     constexpr uint32_t k_bytes = static_cast<uint32_t>(sizeof(int64_t));
-    if (ctx.buffer().reader_index() + k_bytes > ctx.buffer().size()) {
-      ctx.set_error(Error::buffer_out_of_bound(ctx.buffer().reader_index(),
-                                               k_bytes, ctx.buffer().size()));
-      return;
-    }
-    ctx.buffer().increase_reader_index(k_bytes);
+    ctx.buffer().increase_reader_index(k_bytes, ctx.error());
     return;
   }
   case TypeId::TIMESTAMP: {
     // Timestamp is stored as int64 seconds + uint32 nanoseconds.
     constexpr uint32_t k_bytes =
         static_cast<uint32_t>(sizeof(int64_t) + sizeof(uint32_t));
-    if (ctx.buffer().reader_index() + k_bytes > ctx.buffer().size()) {
-      ctx.set_error(Error::buffer_out_of_bound(ctx.buffer().reader_index(),
-                                               k_bytes, ctx.buffer().size()));
-      return;
-    }
-    ctx.buffer().increase_reader_index(k_bytes);
+    ctx.buffer().increase_reader_index(k_bytes, ctx.error());
     return;
   }
 
   case TypeId::DATE: {
     // Date is stored as fixed 4-byte day count.
     constexpr uint32_t k_bytes = static_cast<uint32_t>(sizeof(int32_t));
-    if (ctx.buffer().reader_index() + k_bytes > ctx.buffer().size()) {
-      ctx.set_error(Error::buffer_out_of_bound(ctx.buffer().reader_index(),
-                                               k_bytes, ctx.buffer().size()));
-      return;
-    }
-    ctx.buffer().increase_reader_index(k_bytes);
+    ctx.buffer().increase_reader_index(k_bytes, ctx.error());
     return;
   }
 
@@ -603,7 +588,7 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
   case TypeId::FLOAT32_ARRAY:
   case TypeId::FLOAT64_ARRAY: {
     if (tid == TypeId::FLOAT8_ARRAY) {
-      ctx.buffer().increase_reader_index(1);
+      ctx.buffer().increase_reader_index(1, ctx.error());
       if (FORY_PREDICT_FALSE(ctx.has_error())) {
         return;
       }
@@ -634,7 +619,7 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
       break;
     }
 
-    ctx.buffer().increase_reader_index(len * elem_size);
+    ctx.buffer().increase_reader_index(len * elem_size, ctx.error());
     return;
   }
 
@@ -644,7 +629,7 @@ void skip_field_value(ReadContext &ctx, const FieldType 
&field_type,
     if (FORY_PREDICT_FALSE(ctx.has_error())) {
       return;
     }
-    ctx.buffer().increase_reader_index(len);
+    ctx.buffer().increase_reader_index(len, ctx.error());
     return;
   }
 
diff --git a/cpp/fory/serialization/stream_test.cc 
b/cpp/fory/serialization/stream_test.cc
new file mode 100644
index 000000000..edbefd63d
--- /dev/null
+++ b/cpp/fory/serialization/stream_test.cc
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <istream>
+#include <map>
+#include <memory>
+#include <streambuf>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "fory/serialization/fory.h"
+#include "fory/util/stream.h"
+
+namespace fory {
+namespace serialization {
+namespace test {
+
+struct StreamPoint {
+  int32_t x;
+  int32_t y;
+
+  bool operator==(const StreamPoint &other) const {
+    return x == other.x && y == other.y;
+  }
+
+  FORY_STRUCT(StreamPoint, x, y);
+};
+
+struct StreamEnvelope {
+  std::string name;
+  std::vector<int32_t> values;
+  std::map<std::string, int64_t> metrics;
+  StreamPoint point;
+  bool active;
+
+  bool operator==(const StreamEnvelope &other) const {
+    return name == other.name && values == other.values &&
+           metrics == other.metrics && point == other.point &&
+           active == other.active;
+  }
+
+  FORY_STRUCT(StreamEnvelope, name, values, metrics, point, active);
+};
+
+struct SharedIntPair {
+  std::shared_ptr<int32_t> first;
+  std::shared_ptr<int32_t> second;
+
+  FORY_STRUCT(SharedIntPair, first, second);
+};
+
+class OneByteStreamBuf final : public std::streambuf {
+public:
+  explicit OneByteStreamBuf(std::vector<uint8_t> data)
+      : data_(std::move(data)), pos_(0) {}
+
+protected:
+  std::streamsize xsgetn(char *s, std::streamsize count) override {
+    if (pos_ >= data_.size() || count <= 0) {
+      return 0;
+    }
+    s[0] = static_cast<char>(data_[pos_]);
+    ++pos_;
+    return 1;
+  }
+
+  int_type underflow() override {
+    if (pos_ >= data_.size()) {
+      return traits_type::eof();
+    }
+    current_ = static_cast<char>(data_[pos_]);
+    setg(&current_, &current_, &current_ + 1);
+    return traits_type::to_int_type(current_);
+  }
+
+private:
+  std::vector<uint8_t> data_;
+  size_t pos_;
+  char current_ = 0;
+};
+
+class OneByteIStream final : public std::istream {
+public:
+  explicit OneByteIStream(std::vector<uint8_t> data)
+      : std::istream(nullptr), buf_(std::move(data)) {
+    rdbuf(&buf_);
+  }
+
+private:
+  OneByteStreamBuf buf_;
+};
+
+static inline void register_stream_types(Fory &fory) {
+  uint32_t type_id = 1;
+  fory.register_struct<StreamPoint>(type_id++);
+  fory.register_struct<StreamEnvelope>(type_id++);
+  fory.register_struct<SharedIntPair>(type_id++);
+}
+
+TEST(StreamSerializationTest, PrimitiveAndStringRoundTrip) {
+  auto fory = Fory::builder().xlang(true).track_ref(false).build();
+
+  auto number_bytes_result = fory.serialize<int64_t>(-9876543212345LL);
+  ASSERT_TRUE(number_bytes_result.ok())
+      << number_bytes_result.error().to_string();
+  OneByteIStream number_source(std::move(number_bytes_result).value());
+  ForyInputStream number_stream(number_source, 8);
+  auto number_result = fory.deserialize<int64_t>(number_stream);
+  ASSERT_TRUE(number_result.ok()) << number_result.error().to_string();
+  EXPECT_EQ(number_result.value(), -9876543212345LL);
+
+  auto string_bytes_result = fory.serialize<std::string>("stream-hello-世界");
+  ASSERT_TRUE(string_bytes_result.ok())
+      << string_bytes_result.error().to_string();
+  OneByteIStream string_source(std::move(string_bytes_result).value());
+  ForyInputStream string_stream(string_source, 8);
+  auto string_result = fory.deserialize<std::string>(string_stream);
+  ASSERT_TRUE(string_result.ok()) << string_result.error().to_string();
+  EXPECT_EQ(string_result.value(), "stream-hello-世界");
+}
+
+TEST(StreamSerializationTest, StructRoundTrip) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  StreamEnvelope original{
+      "payload-name",
+      {1, 3, 5, 7, 9},
+      {{"count", 5}, {"sum", 25}, {"max", 9}},
+      {42, -7},
+      true,
+  };
+
+  auto bytes_result = fory.serialize(original);
+  ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
+
+  OneByteIStream source(std::move(bytes_result).value());
+  ForyInputStream stream(source, 4);
+  auto result = fory.deserialize<StreamEnvelope>(stream);
+  ASSERT_TRUE(result.ok()) << result.error().to_string();
+  EXPECT_EQ(result.value(), original);
+}
+
+TEST(StreamSerializationTest, SequentialDeserializeFromSingleStream) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  StreamEnvelope envelope{
+      "batch", {10, 20, 30}, {{"a", 1}, {"b", 2}}, {9, 8}, false,
+  };
+
+  std::vector<uint8_t> bytes;
+  ASSERT_TRUE(fory.serialize_to(bytes, static_cast<int32_t>(12345)).ok());
+  ASSERT_TRUE(fory.serialize_to(bytes, std::string("next-value")).ok());
+  ASSERT_TRUE(fory.serialize_to(bytes, envelope).ok());
+
+  OneByteIStream source(bytes);
+  ForyInputStream stream(source, 3);
+
+  auto first = fory.deserialize<int32_t>(stream);
+  ASSERT_TRUE(first.ok()) << first.error().to_string();
+  EXPECT_EQ(first.value(), 12345);
+  EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+
+  auto second = fory.deserialize<std::string>(stream);
+  ASSERT_TRUE(second.ok()) << second.error().to_string();
+  EXPECT_EQ(second.value(), "next-value");
+  EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+
+  auto third = fory.deserialize<StreamEnvelope>(stream);
+  ASSERT_TRUE(third.ok()) << third.error().to_string();
+  EXPECT_EQ(third.value(), envelope);
+  EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+
+  EXPECT_EQ(stream.get_buffer().remaining_size(), 0U);
+}
+
+TEST(StreamSerializationTest, SharedPointerIdentityRoundTrip) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  auto shared = std::make_shared<int32_t>(2026);
+  SharedIntPair pair{shared, shared};
+
+  auto bytes_result = fory.serialize(pair);
+  ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
+
+  OneByteIStream source(std::move(bytes_result).value());
+  ForyInputStream stream(source, 2);
+  auto result = fory.deserialize<SharedIntPair>(stream);
+  ASSERT_TRUE(result.ok()) << result.error().to_string();
+  ASSERT_NE(result.value().first, nullptr);
+  ASSERT_NE(result.value().second, nullptr);
+  EXPECT_EQ(*result.value().first, 2026);
+  EXPECT_EQ(result.value().first, result.value().second);
+}
+
+TEST(StreamSerializationTest, TruncatedStreamReturnsError) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  StreamEnvelope original{
+      "truncated", {1, 2, 3, 4}, {{"k", 99}}, {7, 7}, true,
+  };
+  auto bytes_result = fory.serialize(original);
+  ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
+
+  std::vector<uint8_t> truncated = std::move(bytes_result).value();
+  ASSERT_GT(truncated.size(), 1u);
+  truncated.pop_back();
+
+  OneByteIStream source(truncated);
+  ForyInputStream stream(source, 4);
+  auto result = fory.deserialize<StreamEnvelope>(stream);
+  EXPECT_FALSE(result.ok());
+}
+
+} // namespace test
+} // namespace serialization
+} // namespace fory
diff --git a/cpp/fory/serialization/string_serializer.h 
b/cpp/fory/serialization/string_serializer.h
index 139564b93..c570e0c25 100644
--- a/cpp/fory/serialization/string_serializer.h
+++ b/cpp/fory/serialization/string_serializer.h
@@ -25,6 +25,7 @@
 #include "fory/util/error.h"
 #include "fory/util/string_util.h"
 #include <cstdint>
+#include <limits>
 #include <string>
 #include <string_view>
 #include <vector>
@@ -113,29 +114,29 @@ inline std::string read_string_data(ReadContext &ctx) {
     return std::string();
   }
 
-  // Validate length against buffer remaining size
-  if (length > ctx.buffer().remaining_size()) {
-    ctx.set_error(Error::invalid_data("String length exceeds buffer size"));
+  if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max())) {
+    ctx.set_error(Error::invalid_data("String length exceeds uint32 range"));
     return std::string();
   }
+  const uint32_t length_u32 = static_cast<uint32_t>(length);
 
   // Handle different encodings
   switch (encoding) {
   case StringEncoding::LATIN1: {
-    std::vector<uint8_t> bytes(length);
-    ctx.read_bytes(bytes.data(), length, ctx.error());
+    std::vector<uint8_t> bytes(length_u32);
+    ctx.read_bytes(bytes.data(), length_u32, ctx.error());
     if (FORY_PREDICT_FALSE(ctx.has_error())) {
       return std::string();
     }
-    return latin1_to_utf8(bytes.data(), length);
+    return latin1_to_utf8(bytes.data(), length_u32);
   }
   case StringEncoding::UTF16: {
-    if (length % 2 != 0) {
+    if ((length_u32 & 1) != 0) {
       ctx.set_error(Error::invalid_data("UTF-16 length must be even"));
       return std::string();
     }
-    std::vector<uint16_t> utf16_chars(length / 2);
-    ctx.read_bytes(reinterpret_cast<uint8_t *>(utf16_chars.data()), length,
+    std::vector<uint16_t> utf16_chars(length_u32 / 2);
+    ctx.read_bytes(reinterpret_cast<uint8_t *>(utf16_chars.data()), length_u32,
                    ctx.error());
     if (FORY_PREDICT_FALSE(ctx.has_error())) {
       return std::string();
@@ -144,8 +145,8 @@ inline std::string read_string_data(ReadContext &ctx) {
   }
   case StringEncoding::UTF8: {
     // UTF-8: read bytes directly
-    std::string result(length, '\0');
-    ctx.read_bytes(&result[0], length, ctx.error());
+    std::string result(length_u32, '\0');
+    ctx.read_bytes(&result[0], length_u32, ctx.error());
     if (FORY_PREDICT_FALSE(ctx.has_error())) {
       return std::string();
     }
@@ -176,18 +177,18 @@ inline std::u16string read_u16string_data(ReadContext 
&ctx) {
     return std::u16string();
   }
 
-  // Validate length against buffer remaining size
-  if (length > ctx.buffer().remaining_size()) {
-    ctx.set_error(Error::invalid_data("String length exceeds buffer size"));
+  if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max())) {
+    ctx.set_error(Error::invalid_data("String length exceeds uint32 range"));
     return std::u16string();
   }
+  const uint32_t length_u32 = static_cast<uint32_t>(length);
 
   // Handle different encodings
   switch (encoding) {
   case StringEncoding::LATIN1: {
     // Latin1 bytes map directly to char16_t (codepoints 0-255)
-    std::u16string result(length, u'\0');
-    for (size_t i = 0; i < length; ++i) {
+    std::u16string result(length_u32, u'\0');
+    for (size_t i = 0; i < length_u32; ++i) {
       result[i] = static_cast<char16_t>(ctx.read_uint8(ctx.error()));
       if (FORY_PREDICT_FALSE(ctx.has_error())) {
         return std::u16string();
@@ -196,12 +197,12 @@ inline std::u16string read_u16string_data(ReadContext 
&ctx) {
     return result;
   }
   case StringEncoding::UTF16: {
-    if (length % 2 != 0) {
+    if ((length_u32 & 1) != 0) {
       ctx.set_error(Error::invalid_data("UTF-16 length must be even"));
       return std::u16string();
     }
-    std::u16string result(length / 2, u'\0');
-    ctx.read_bytes(reinterpret_cast<uint8_t *>(&result[0]), length,
+    std::u16string result(length_u32 / 2, u'\0');
+    ctx.read_bytes(reinterpret_cast<uint8_t *>(&result[0]), length_u32,
                    ctx.error());
     if (FORY_PREDICT_FALSE(ctx.has_error())) {
       return std::u16string();
@@ -210,8 +211,8 @@ inline std::u16string read_u16string_data(ReadContext &ctx) 
{
   }
   case StringEncoding::UTF8: {
     // Read UTF-8 bytes and convert to UTF-16
-    std::string utf8(length, '\0');
-    ctx.read_bytes(&utf8[0], length, ctx.error());
+    std::string utf8(length_u32, '\0');
+    ctx.read_bytes(&utf8[0], length_u32, ctx.error());
     if (FORY_PREDICT_FALSE(ctx.has_error())) {
       return std::u16string();
     }
diff --git a/cpp/fory/serialization/struct_serializer.h 
b/cpp/fory/serialization/struct_serializer.h
index bb4a24bb8..812586114 100644
--- a/cpp/fory/serialization/struct_serializer.h
+++ b/cpp/fory/serialization/struct_serializer.h
@@ -31,6 +31,7 @@
 #include "fory/util/string_util.h"
 #include <algorithm>
 #include <array>
+#include <limits>
 #include <memory>
 #include <numeric>
 #include <string_view>
@@ -831,6 +832,12 @@ template <typename T> struct CompileTimeFieldHelpers {
 
       if constexpr (is_configurable_int_v<FieldType>) {
         return configurable_int_max_varint_bytes<FieldType, T, Index>();
+      } else if constexpr (std::is_same_v<FieldType, int32_t> ||
+                           std::is_same_v<FieldType, int>) {
+        return 5;
+      } else if constexpr (std::is_same_v<FieldType, int64_t> ||
+                           std::is_same_v<FieldType, long long>) {
+        return 10;
       }
       return 0;
     }
@@ -2753,11 +2760,8 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
 
     // Phase 1: Read leading fixed-size primitives if any
     if constexpr (fixed_count > 0 && fixed_bytes > 0) {
-      // Pre-check bounds for all fixed-size fields at once
-      if (FORY_PREDICT_FALSE(buffer.reader_index() + fixed_bytes >
-                             buffer.size())) {
-        ctx.set_error(Error::buffer_out_of_bound(buffer.reader_index(),
-                                                 fixed_bytes, buffer.size()));
+      if (FORY_PREDICT_FALSE(!buffer.ensure_readable(
+              static_cast<uint32_t>(fixed_bytes), ctx.error()))) {
         return;
       }
       // Fast read fixed-size primitives
@@ -2769,6 +2773,12 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
     // Note: varint bounds checking is done per-byte during reading since
     // varint lengths are variable (actual size << max possible size)
     if constexpr (varint_count > 0) {
+      if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+        // Stream-backed buffers may not have all varint bytes materialized 
yet.
+        // Fall back to per-field readers that propagate stream read errors.
+        read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
+        return;
+      }
       // Track offset locally for batch varint reading
       uint32_t offset = buffer.reader_index();
       // Fast read varint primitives (bounds checking happens in
@@ -2807,11 +2817,8 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
 
   // Phase 1: Read leading fixed-size primitives if any
   if constexpr (fixed_count > 0 && fixed_bytes > 0) {
-    // Pre-check bounds for all fixed-size fields at once
-    if (FORY_PREDICT_FALSE(buffer.reader_index() + fixed_bytes >
-                           buffer.size())) {
-      ctx.set_error(Error::buffer_out_of_bound(buffer.reader_index(),
-                                               fixed_bytes, buffer.size()));
+    if (FORY_PREDICT_FALSE(!buffer.ensure_readable(
+            static_cast<uint32_t>(fixed_bytes), ctx.error()))) {
       return;
     }
     // Fast read fixed-size primitives
@@ -2821,6 +2828,12 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
 
   // Phase 2: Read consecutive varint primitives (int32, int64) if any
   if constexpr (varint_count > 0) {
+    if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+      // Stream-backed buffers may not have all varint bytes materialized yet.
+      // Fall back to per-field readers that propagate stream read errors.
+      read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
+      return;
+    }
     // Track offset locally for batch varint reading
     uint32_t offset = buffer.reader_index();
     // Fast read varint primitives (bounds checking happens in
diff --git a/cpp/fory/util/CMakeLists.txt b/cpp/fory/util/CMakeLists.txt
index 2bc4f30c5..9b7718d67 100644
--- a/cpp/fory/util/CMakeLists.txt
+++ b/cpp/fory/util/CMakeLists.txt
@@ -19,6 +19,7 @@ set(FORY_UTIL_SOURCES
     buffer.cc
     error.cc
     logging.cc
+    stream.cc
     string_util.cc
     time_util.cc
 )
@@ -32,6 +33,7 @@ set(FORY_UTIL_HEADERS
     platform.h
     pool.h
     result.h
+    stream.h
     string_util.h
     time_util.h
     flat_int_map.h
diff --git a/cpp/fory/util/buffer.cc b/cpp/fory/util/buffer.cc
index 4c1e5f593..e7006ae38 100644
--- a/cpp/fory/util/buffer.cc
+++ b/cpp/fory/util/buffer.cc
@@ -31,6 +31,7 @@ Buffer::Buffer() {
   writer_index_ = 0;
   reader_index_ = 0;
   wrapped_vector_ = nullptr;
+  stream_reader_ = nullptr;
 }
 
 Buffer::Buffer(Buffer &&buffer) noexcept {
@@ -40,6 +41,10 @@ Buffer::Buffer(Buffer &&buffer) noexcept {
   writer_index_ = buffer.writer_index_;
   reader_index_ = buffer.reader_index_;
   wrapped_vector_ = buffer.wrapped_vector_;
+  stream_reader_ = buffer.stream_reader_;
+  stream_reader_owner_ = std::move(buffer.stream_reader_owner_);
+  rebind_stream_reader_to_this();
+  buffer.stream_reader_ = nullptr;
   buffer.data_ = nullptr;
   buffer.size_ = 0;
   buffer.own_data_ = false;
@@ -47,6 +52,7 @@ Buffer::Buffer(Buffer &&buffer) noexcept {
 }
 
 Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
+  detach_stream_reader_from_this();
   if (own_data_) {
     free(data_);
     data_ = nullptr;
@@ -57,6 +63,10 @@ Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
   writer_index_ = buffer.writer_index_;
   reader_index_ = buffer.reader_index_;
   wrapped_vector_ = buffer.wrapped_vector_;
+  stream_reader_ = buffer.stream_reader_;
+  stream_reader_owner_ = std::move(buffer.stream_reader_owner_);
+  rebind_stream_reader_to_this();
+  buffer.stream_reader_ = nullptr;
   buffer.data_ = nullptr;
   buffer.size_ = 0;
   buffer.own_data_ = false;
@@ -65,6 +75,7 @@ Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
 }
 
 Buffer::~Buffer() {
+  detach_stream_reader_from_this();
   if (own_data_) {
     free(data_);
     data_ = nullptr;
diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h
index b626fb7d9..857902fc4 100644
--- a/cpp/fory/util/buffer.h
+++ b/cpp/fory/util/buffer.h
@@ -24,15 +24,20 @@
 #include <limits>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "fory/util/bit_util.h"
 #include "fory/util/error.h"
 #include "fory/util/logging.h"
 #include "fory/util/result.h"
+#include "fory/util/stream.h"
 
 namespace fory {
 
+class ForyInputStream;
+class PythonStreamReader;
+
 // A buffer class for storing raw bytes with various methods for reading and
 // writing the bytes.
 class Buffer {
@@ -40,8 +45,8 @@ public:
   Buffer();
 
   Buffer(uint8_t *data, uint32_t size, bool own_data = true)
-      : data_(data), size_(size), own_data_(own_data),
-        wrapped_vector_(nullptr) {
+      : data_(data), size_(size), own_data_(own_data), 
wrapped_vector_(nullptr),
+        stream_reader_(nullptr) {
     writer_index_ = 0;
     reader_index_ = 0;
   }
@@ -54,7 +59,17 @@ public:
   explicit Buffer(std::vector<uint8_t> &vec)
       : data_(vec.data()), size_(static_cast<uint32_t>(vec.size())),
         own_data_(false), writer_index_(static_cast<uint32_t>(vec.size())),
-        reader_index_(0), wrapped_vector_(&vec) {}
+        reader_index_(0), wrapped_vector_(&vec), stream_reader_(nullptr) {}
+
+  explicit Buffer(StreamReader &stream_reader)
+      : data_(nullptr), size_(0), own_data_(false), writer_index_(0),
+        reader_index_(0), wrapped_vector_(nullptr),
+        stream_reader_(&stream_reader) {
+    stream_reader_->bind_buffer(this);
+    stream_reader_owner_ = stream_reader_->weak_from_this().lock();
+    FORY_CHECK(&stream_reader_->get_buffer() == this)
+        << "StreamReader must hold and return the same Buffer instance";
+  }
 
   Buffer(Buffer &&buffer) noexcept;
 
@@ -62,6 +77,23 @@ public:
 
   virtual ~Buffer();
 
+  FORY_ALWAYS_INLINE void swap(Buffer &other) noexcept {
+    if (this == &other) {
+      return;
+    }
+    using std::swap;
+    swap(data_, other.data_);
+    swap(size_, other.size_);
+    swap(own_data_, other.own_data_);
+    swap(writer_index_, other.writer_index_);
+    swap(reader_index_, other.reader_index_);
+    swap(wrapped_vector_, other.wrapped_vector_);
+    swap(stream_reader_, other.stream_reader_);
+    swap(stream_reader_owner_, other.stream_reader_owner_);
+    rebind_stream_reader_to_this();
+    other.rebind_stream_reader_to_this();
+  }
+
   /// \brief Return a pointer to the buffer's data
   FORY_ALWAYS_INLINE uint8_t *data() const { return data_; }
 
@@ -70,6 +102,16 @@ public:
 
   FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }
 
+  FORY_ALWAYS_INLINE bool is_stream_backed() const {
+    return stream_reader_ != nullptr;
+  }
+
+  FORY_ALWAYS_INLINE void shrink_stream_buffer() {
+    if (stream_reader_ != nullptr) {
+      stream_reader_->shrink_buffer();
+    }
+  }
+
   FORY_ALWAYS_INLINE uint32_t writer_index() { return writer_index_; }
 
   FORY_ALWAYS_INLINE uint32_t reader_index() { return reader_index_; }
@@ -79,6 +121,22 @@ public:
     return size_ - reader_index_;
   }
 
+  FORY_ALWAYS_INLINE bool ensure_readable(uint32_t length, Error &error) {
+    if (FORY_PREDICT_TRUE(length <= size_ - reader_index_)) {
+      return true;
+    }
+    if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max() -
+                                        reader_index_)) {
+      error.set_error(ErrorCode::OutOfBound,
+                      "reader index exceeds uint32 range");
+      return false;
+    }
+    if (FORY_PREDICT_FALSE(!fill_buffer(length, error))) {
+      return false;
+    }
+    return true;
+  }
+
   FORY_ALWAYS_INLINE void writer_index(uint32_t writer_index) {
     FORY_CHECK(writer_index < std::numeric_limits<uint32_t>::max())
         << "Buffer overflow writer_index" << writer_index_
@@ -93,18 +151,36 @@ public:
     writer_index_ = writer_index;
   }
 
-  FORY_ALWAYS_INLINE void reader_index(uint32_t reader_index) {
-    FORY_CHECK(reader_index < std::numeric_limits<uint32_t>::max())
-        << "Buffer overflow reader_index" << reader_index_
-        << " target reader_index " << reader_index;
+  FORY_ALWAYS_INLINE bool reader_index(uint32_t reader_index, Error &error) {
+    if (FORY_PREDICT_FALSE(reader_index > size_ && stream_reader_ != nullptr)) 
{
+      if (FORY_PREDICT_FALSE(
+              !fill_buffer(reader_index - reader_index_, error))) {
+        return false;
+      }
+    }
+    if (FORY_PREDICT_FALSE(reader_index > size_)) {
+      const uint32_t diff =
+          reader_index > reader_index_ ? reader_index - reader_index_ : 0;
+      error.set_buffer_out_of_bound(reader_index_, diff, size_);
+      return false;
+    }
     reader_index_ = reader_index;
+    return true;
   }
 
-  FORY_ALWAYS_INLINE void increase_reader_index(uint32_t diff) {
-    uint64_t reader_index = reader_index_ + diff;
-    FORY_CHECK(reader_index < std::numeric_limits<uint32_t>::max())
-        << "Buffer overflow reader_index" << reader_index_ << " diff " << diff;
-    reader_index_ = reader_index;
+  FORY_ALWAYS_INLINE void reader_index(uint32_t reader_index) {
+    Error error;
+    const bool ok = this->reader_index(reader_index, error);
+    FORY_CHECK(ok) << "Buffer overflow reader_index " << reader_index_
+                   << " target reader_index " << reader_index << " size "
+                   << size_ << ", " << error.to_string();
+  }
+
+  FORY_ALWAYS_INLINE void increase_reader_index(uint32_t diff, Error &error) {
+    if (FORY_PREDICT_FALSE(!ensure_readable(diff, error))) {
+      return;
+    }
+    reader_index_ += diff;
   }
 
   // Unsafe methods don't check bound
@@ -143,8 +219,9 @@ public:
   }
 
   template <typename T> FORY_ALWAYS_INLINE T get(uint32_t relative_offset) {
-    FORY_CHECK(relative_offset < size_) << "Out of range " << relative_offset
-                                        << " should be less than " << size_;
+    FORY_CHECK(relative_offset + sizeof(T) <= size_)
+        << "Out of range " << relative_offset << " should be less than "
+        << size_;
     T value = reinterpret_cast<const T *>(data_ + relative_offset)[0];
     return value;
   }
@@ -303,11 +380,11 @@ public:
       return result;
     }
     // Slow path: byte-by-byte read
-    return get_var_uint32_slow(offset, read_bytes_length);
+    return read_var_uint32_slow(offset, read_bytes_length);
   }
 
-  /// Slow path for get_var_uint32 when not enough bytes for bulk read.
-  uint32_t get_var_uint32_slow(uint32_t offset, uint32_t *read_bytes_length) {
+  /// Slow path for varuint32 decode when not enough bytes for bulk read.
+  uint32_t read_var_uint32_slow(uint32_t offset, uint32_t *read_bytes_length) {
     if (FORY_PREDICT_FALSE(offset >= size_)) {
       *read_bytes_length = 0;
       return 0;
@@ -739,8 +816,7 @@ public:
 
   /// Read uint8_t value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE uint8_t read_uint8(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
       return 0;
     }
     uint8_t value = data_[reader_index_];
@@ -750,8 +826,7 @@ public:
 
   /// Read int8_t value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE int8_t read_int8(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
       return 0;
     }
     int8_t value = static_cast<int8_t>(data_[reader_index_]);
@@ -761,8 +836,7 @@ public:
 
   /// Read uint16_t value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE uint16_t read_uint16(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 2 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 2, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(2, error))) {
       return 0;
     }
     uint16_t value =
@@ -773,8 +847,7 @@ public:
 
   /// Read int16_t value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE int16_t read_int16(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 2 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 2, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(2, error))) {
       return 0;
     }
     int16_t value = reinterpret_cast<const int16_t *>(data_ + 
reader_index_)[0];
@@ -784,8 +857,7 @@ public:
 
   /// Read int24 value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE int32_t read_int24(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 3 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 3, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(3, error))) {
       return 0;
     }
     int32_t b0 = data_[reader_index_];
@@ -798,8 +870,7 @@ public:
   /// Read uint32_t value from buffer (fixed 4 bytes). Sets error on bounds
   /// violation.
   FORY_ALWAYS_INLINE uint32_t read_uint32(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 4, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
       return 0;
     }
     uint32_t value =
@@ -811,8 +882,7 @@ public:
   /// Read int32_t value from buffer (fixed 4 bytes). Sets error on bounds
   /// violation.
   FORY_ALWAYS_INLINE int32_t read_int32(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 4, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
       return 0;
     }
     int32_t value = reinterpret_cast<const int32_t *>(data_ + 
reader_index_)[0];
@@ -823,8 +893,7 @@ public:
   /// Read uint64_t value from buffer (fixed 8 bytes). Sets error on bounds
   /// violation.
   FORY_ALWAYS_INLINE uint64_t read_uint64(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 8 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 8, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(8, error))) {
       return 0;
     }
     uint64_t value =
@@ -836,8 +905,7 @@ public:
   /// Read int64_t value from buffer (fixed 8 bytes). Sets error on bounds
   /// violation.
   FORY_ALWAYS_INLINE int64_t read_int64(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 8 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 8, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(8, error))) {
       return 0;
     }
     int64_t value = reinterpret_cast<const int64_t *>(data_ + 
reader_index_)[0];
@@ -847,8 +915,7 @@ public:
 
   /// Read float value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE float read_float(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 4, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
       return 0.0f;
     }
     float value = reinterpret_cast<const float *>(data_ + reader_index_)[0];
@@ -858,8 +925,7 @@ public:
 
   /// Read double value from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE double read_double(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 8 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 8, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(8, error))) {
       return 0.0;
     }
     double value = reinterpret_cast<const double *>(data_ + reader_index_)[0];
@@ -869,50 +935,65 @@ public:
 
   /// Read uint32_t value as varint from buffer. Sets error on bounds 
violation.
   FORY_ALWAYS_INLINE uint32_t read_var_uint32(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
       return 0;
     }
-    uint32_t read_bytes = 0;
-    uint32_t value = get_var_uint32(reader_index_, &read_bytes);
-    if (FORY_PREDICT_FALSE(read_bytes == 0)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
-      return 0;
+    if (FORY_PREDICT_FALSE(size_ - reader_index_ < 5)) {
+      return read_var_uint32_slow(error);
     }
-    increase_reader_index(read_bytes);
-    return value;
+    uint32_t offset = reader_index_;
+    uint32_t bulk = *reinterpret_cast<uint32_t *>(data_ + offset);
+
+    uint32_t result = bulk & 0x7F;
+    if ((bulk & 0x80) == 0) {
+      reader_index_ = offset + 1;
+      return result;
+    }
+    result |= (bulk >> 1) & 0x3F80;
+    if ((bulk & 0x8000) == 0) {
+      reader_index_ = offset + 2;
+      return result;
+    }
+    result |= (bulk >> 2) & 0x1FC000;
+    if ((bulk & 0x800000) == 0) {
+      reader_index_ = offset + 3;
+      return result;
+    }
+    result |= (bulk >> 3) & 0xFE00000;
+    if ((bulk & 0x80000000) == 0) {
+      reader_index_ = offset + 4;
+      return result;
+    }
+    result |= static_cast<uint32_t>(data_[offset + 4] & 0x7F) << 28;
+    reader_index_ = offset + 5;
+    return result;
   }
 
   /// Read int32_t value as varint (zigzag encoded). Sets error on bounds
   /// violation.
   FORY_ALWAYS_INLINE int32_t read_var_int32(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
+    uint32_t raw = read_var_uint32(error);
+    if (FORY_PREDICT_FALSE(!error.ok())) {
       return 0;
     }
-    uint32_t read_bytes = 0;
-    uint32_t raw = get_var_uint32(reader_index_, &read_bytes);
-    if (FORY_PREDICT_FALSE(read_bytes == 0)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
-      return 0;
-    }
-    increase_reader_index(read_bytes);
     return static_cast<int32_t>((raw >> 1) ^ (~(raw & 1) + 1));
   }
 
   /// Read uint64_t value as varint from buffer. Sets error on bounds 
violation.
   FORY_ALWAYS_INLINE uint64_t read_var_uint64(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
       return 0;
     }
+    if (FORY_PREDICT_FALSE(size_ - reader_index_ < 9)) {
+      return read_var_uint64_slow(error);
+    }
     uint32_t read_bytes = 0;
     uint64_t value = get_var_uint64(reader_index_, &read_bytes);
     if (FORY_PREDICT_FALSE(read_bytes == 0)) {
       error.set_buffer_out_of_bound(reader_index_, 1, size_);
       return 0;
     }
-    increase_reader_index(read_bytes);
+    reader_index_ += read_bytes;
     return value;
   }
 
@@ -943,8 +1024,7 @@ public:
   /// If bit 0 is 0, return value >> 1 (arithmetic shift).
   /// Otherwise, skip flag byte and read 8 bytes as int64.
   FORY_ALWAYS_INLINE int64_t read_tagged_int64(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 4, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
       return 0;
     }
     int32_t i = reinterpret_cast<const int32_t *>(data_ + reader_index_)[0];
@@ -952,8 +1032,7 @@ public:
       reader_index_ += 4;
       return static_cast<int64_t>(i >> 1); // arithmetic right shift
     } else {
-      if (FORY_PREDICT_FALSE(reader_index_ + 9 > size_)) {
-        error.set_buffer_out_of_bound(reader_index_, 9, size_);
+      if (FORY_PREDICT_FALSE(!ensure_readable(9, error))) {
         return 0;
       }
       int64_t value =
@@ -982,8 +1061,7 @@ public:
   /// If bit 0 is 0, return value >> 1.
   /// Otherwise, skip flag byte and read 8 bytes as uint64.
   FORY_ALWAYS_INLINE uint64_t read_tagged_uint64(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 4, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
       return 0;
     }
     uint32_t i = reinterpret_cast<const uint32_t *>(data_ + reader_index_)[0];
@@ -991,8 +1069,7 @@ public:
       reader_index_ += 4;
       return static_cast<uint64_t>(i >> 1);
     } else {
-      if (FORY_PREDICT_FALSE(reader_index_ + 9 > size_)) {
-        error.set_buffer_out_of_bound(reader_index_, 9, size_);
+      if (FORY_PREDICT_FALSE(!ensure_readable(9, error))) {
         return 0;
       }
       uint64_t value =
@@ -1004,98 +1081,54 @@ public:
 
   /// Read uint64_t value as varuint36small. Sets error on bounds violation.
   FORY_ALWAYS_INLINE uint64_t read_var_uint36_small(Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, 1, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
       return 0;
     }
     uint32_t offset = reader_index_;
-    // Fast path: need at least 8 bytes for safe bulk read
-    if (FORY_PREDICT_TRUE(size_ - offset >= 8)) {
-      uint64_t bulk = *reinterpret_cast<uint64_t *>(data_ + offset);
-
-      uint64_t result = bulk & 0x7F;
-      if ((bulk & 0x80) == 0) {
-        increase_reader_index(1);
-        return result;
-      }
-      result |= (bulk >> 1) & 0x3F80;
-      if ((bulk & 0x8000) == 0) {
-        increase_reader_index(2);
-        return result;
-      }
-      result |= (bulk >> 2) & 0x1FC000;
-      if ((bulk & 0x800000) == 0) {
-        increase_reader_index(3);
-        return result;
-      }
-      result |= (bulk >> 3) & 0xFE00000;
-      if ((bulk & 0x80000000) == 0) {
-        increase_reader_index(4);
-        return result;
-      }
-      // 5th byte for bits 28-35 (up to 36 bits)
-      result |= (bulk >> 4) & 0xFF0000000ULL;
-      increase_reader_index(5);
+    if (FORY_PREDICT_FALSE(size_ - offset < 8)) {
+      return read_var_uint36_small_slow(error);
+    }
+    // Fast path: need at least 8 bytes for safe bulk read.
+    uint64_t bulk = *reinterpret_cast<uint64_t *>(data_ + offset);
+    uint64_t result = bulk & 0x7F;
+    if ((bulk & 0x80) == 0) {
+      reader_index_ = offset + 1;
       return result;
     }
-    // Slow path: byte-by-byte read
-    uint32_t position = offset;
-    uint8_t b = data_[position++];
-    uint64_t result = b & 0x7F;
-    if ((b & 0x80) != 0) {
-      if (FORY_PREDICT_FALSE(position >= size_)) {
-        error.set_buffer_out_of_bound(position, 1, size_);
-        return 0;
-      }
-      b = data_[position++];
-      result |= static_cast<uint64_t>(b & 0x7F) << 7;
-      if ((b & 0x80) != 0) {
-        if (FORY_PREDICT_FALSE(position >= size_)) {
-          error.set_buffer_out_of_bound(position, 1, size_);
-          return 0;
-        }
-        b = data_[position++];
-        result |= static_cast<uint64_t>(b & 0x7F) << 14;
-        if ((b & 0x80) != 0) {
-          if (FORY_PREDICT_FALSE(position >= size_)) {
-            error.set_buffer_out_of_bound(position, 1, size_);
-            return 0;
-          }
-          b = data_[position++];
-          result |= static_cast<uint64_t>(b & 0x7F) << 21;
-          if ((b & 0x80) != 0) {
-            if (FORY_PREDICT_FALSE(position >= size_)) {
-              error.set_buffer_out_of_bound(position, 1, size_);
-              return 0;
-            }
-            b = data_[position++];
-            result |= static_cast<uint64_t>(b & 0xFF) << 28;
-          }
-        }
-      }
+    result |= (bulk >> 1) & 0x3F80;
+    if ((bulk & 0x8000) == 0) {
+      reader_index_ = offset + 2;
+      return result;
+    }
+    result |= (bulk >> 2) & 0x1FC000;
+    if ((bulk & 0x800000) == 0) {
+      reader_index_ = offset + 3;
+      return result;
     }
-    increase_reader_index(position - offset);
+    result |= (bulk >> 3) & 0xFE00000;
+    if ((bulk & 0x80000000) == 0) {
+      reader_index_ = offset + 4;
+      return result;
+    }
+    // 5th byte for bits 28-35 (up to 36 bits)
+    result |= (bulk >> 4) & 0xFF0000000ULL;
+    reader_index_ = offset + 5;
     return result;
   }
 
   /// Read raw bytes from buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE void read_bytes(void *data, uint32_t length,
                                      Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + length > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, length, size_);
+    if (FORY_PREDICT_FALSE(!ensure_readable(length, error))) {
       return;
     }
     copy(reader_index_, length, static_cast<uint8_t *>(data));
-    increase_reader_index(length);
+    reader_index_ += length;
   }
 
   /// skip bytes in buffer. Sets error on bounds violation.
   FORY_ALWAYS_INLINE void skip(uint32_t length, Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index_ + length > size_)) {
-      error.set_buffer_out_of_bound(reader_index_, length, size_);
-      return;
-    }
-    increase_reader_index(length);
+    increase_reader_index(length, error);
   }
 
   /// Return true if both buffers are the same size and contain the same bytes
@@ -1190,12 +1223,147 @@ public:
   std::string hex() const;
 
 private:
+  friend class ForyInputStream;
+  friend class PythonStreamReader;
+
+  FORY_ALWAYS_INLINE void rebind_stream_reader_to_this() {
+    if (stream_reader_ == nullptr) {
+      return;
+    }
+    stream_reader_->bind_buffer(this);
+    FORY_CHECK(&stream_reader_->get_buffer() == this)
+        << "StreamReader must hold and return the same Buffer instance";
+  }
+
+  FORY_ALWAYS_INLINE void detach_stream_reader_from_this() {
+    if (stream_reader_ == nullptr) {
+      return;
+    }
+    if (&stream_reader_->get_buffer() == this) {
+      stream_reader_->bind_buffer(nullptr);
+    }
+  }
+
+  FORY_ALWAYS_INLINE bool fill_buffer(uint32_t min_fill_size, Error &error) {
+    if (FORY_PREDICT_TRUE(min_fill_size <= size_ - reader_index_)) {
+      return true;
+    }
+    if (FORY_PREDICT_TRUE(stream_reader_ == nullptr)) {
+      error.set_buffer_out_of_bound(reader_index_, min_fill_size, size_);
+      return false;
+    }
+    auto fill_result = stream_reader_->fill_buffer(min_fill_size);
+    if (FORY_PREDICT_FALSE(!fill_result.ok())) {
+      error = std::move(fill_result).error();
+      return false;
+    }
+    if (FORY_PREDICT_FALSE(min_fill_size > size_ - reader_index_)) {
+      error.set_buffer_out_of_bound(reader_index_, min_fill_size, size_);
+      return false;
+    }
+    return true;
+  }
+
+  FORY_ALWAYS_INLINE uint32_t read_var_uint32_slow(Error &error) {
+    uint32_t position = reader_index_;
+    uint32_t result = 0;
+    for (int i = 0; i < 5; ++i) {
+      if (FORY_PREDICT_FALSE(!ensure_readable(i + 1, error))) {
+        return 0;
+      }
+      uint8_t b = data_[position++];
+      result |= static_cast<uint32_t>(b & 0x7F) << (i * 7);
+      if ((b & 0x80) == 0) {
+        reader_index_ = position;
+        return result;
+      }
+    }
+    error.set_error(ErrorCode::InvalidData, "Invalid var_uint32 encoding");
+    return 0;
+  }
+
+  FORY_ALWAYS_INLINE uint64_t read_var_uint64_slow(Error &error) {
+    uint32_t position = reader_index_;
+    uint64_t result = 0;
+    for (int i = 0; i < 8; ++i) {
+      if (FORY_PREDICT_FALSE(!ensure_readable(i + 1, error))) {
+        return 0;
+      }
+      uint8_t b = data_[position++];
+      result |= static_cast<uint64_t>(b & 0x7F) << (i * 7);
+      if ((b & 0x80) == 0) {
+        reader_index_ = position;
+        return result;
+      }
+    }
+    if (FORY_PREDICT_FALSE(!ensure_readable(9, error))) {
+      return 0;
+    }
+    uint8_t b = data_[position++];
+    result |= static_cast<uint64_t>(b) << 56;
+    reader_index_ = position;
+    return result;
+  }
+
+  FORY_ALWAYS_INLINE uint64_t read_var_uint36_small_slow(Error &error) {
+    uint32_t position = reader_index_;
+    if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
+      return 0;
+    }
+    uint8_t b = data_[position++];
+    uint64_t result = b & 0x7F;
+    if ((b & 0x80) == 0) {
+      reader_index_ = position;
+      return result;
+    }
+
+    if (FORY_PREDICT_FALSE(!ensure_readable(2, error))) {
+      return 0;
+    }
+    b = data_[position++];
+    result |= static_cast<uint64_t>(b & 0x7F) << 7;
+    if ((b & 0x80) == 0) {
+      reader_index_ = position;
+      return result;
+    }
+
+    if (FORY_PREDICT_FALSE(!ensure_readable(3, error))) {
+      return 0;
+    }
+    b = data_[position++];
+    result |= static_cast<uint64_t>(b & 0x7F) << 14;
+    if ((b & 0x80) == 0) {
+      reader_index_ = position;
+      return result;
+    }
+
+    if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
+      return 0;
+    }
+    b = data_[position++];
+    result |= static_cast<uint64_t>(b & 0x7F) << 21;
+    if ((b & 0x80) == 0) {
+      reader_index_ = position;
+      return result;
+    }
+
+    if (FORY_PREDICT_FALSE(!ensure_readable(5, error))) {
+      return 0;
+    }
+    b = data_[position++];
+    result |= static_cast<uint64_t>(b) << 28;
+    reader_index_ = position;
+    return result;
+  }
+
   uint8_t *data_;
   uint32_t size_;
   bool own_data_;
   uint32_t writer_index_;
   uint32_t reader_index_;
   std::vector<uint8_t> *wrapped_vector_ = nullptr;
+  StreamReader *stream_reader_ = nullptr;
+  std::shared_ptr<StreamReader> stream_reader_owner_;
 };
 
 /// \brief Allocate a fixed-size mutable buffer from the default memory pool
diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc
index 1a49f02a5..d06a65ba9 100644
--- a/cpp/fory/util/buffer_test.cc
+++ b/cpp/fory/util/buffer_test.cc
@@ -25,9 +25,51 @@
 #include "gtest/gtest.h"
 
 #include "fory/util/buffer.h"
+#include "fory/util/stream.h"
 
 namespace fory {
 
+class OneByteStreamBuf : public std::streambuf {
+public:
+  explicit OneByteStreamBuf(std::vector<uint8_t> data)
+      : data_(std::move(data)), pos_(0) {}
+
+protected:
+  std::streamsize xsgetn(char *s, std::streamsize count) override {
+    if (pos_ >= data_.size() || count <= 0) {
+      return 0;
+    }
+    s[0] = static_cast<char>(data_[pos_]);
+    ++pos_;
+    return 1;
+  }
+
+  int_type underflow() override {
+    if (pos_ >= data_.size()) {
+      return traits_type::eof();
+    }
+    current_ = static_cast<char>(data_[pos_]);
+    setg(&current_, &current_, &current_ + 1);
+    return traits_type::to_int_type(current_);
+  }
+
+private:
+  std::vector<uint8_t> data_;
+  size_t pos_;
+  char current_ = 0;
+};
+
+class OneByteIStream : public std::istream {
+public:
+  explicit OneByteIStream(std::vector<uint8_t> data)
+      : std::istream(nullptr), buf_(std::move(data)) {
+    rdbuf(&buf_);
+  }
+
+private:
+  OneByteStreamBuf buf_;
+};
+
 TEST(Buffer, to_string) {
   std::shared_ptr<Buffer> buffer;
   allocate_buffer(16, &buffer);
@@ -170,6 +212,107 @@ TEST(Buffer, TestReadVarUint36SmallTruncated) {
   EXPECT_FALSE(error.ok());
   EXPECT_EQ(buffer.reader_index(), 0U);
 }
+
+TEST(Buffer, StreamReadFromOneByteSource) {
+  std::vector<uint8_t> raw;
+  raw.reserve(64);
+  Buffer writer(raw);
+  writer.write_uint32(0x01020304U);
+  writer.write_int64(-1234567890LL);
+  writer.write_var_uint32(300);
+  writer.write_var_int64(-4567890123LL);
+  writer.write_tagged_uint64(0x123456789ULL);
+  writer.write_var_uint36_small(0x1FFFFULL);
+
+  raw.resize(writer.writer_index());
+  OneByteIStream one_byte_stream(raw);
+  ForyInputStream stream(one_byte_stream, 8);
+  Buffer reader(stream);
+  Error error;
+
+  EXPECT_EQ(reader.read_uint32(error), 0x01020304U);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.read_int64(error), -1234567890LL);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.read_var_uint32(error), 300U);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.read_var_int64(error), -4567890123LL);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.read_tagged_uint64(error), 0x123456789ULL);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.read_var_uint36_small(error), 0x1FFFFULL);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+}
+
+TEST(Buffer, StreamGetAndReaderIndexFromOneByteSource) {
+  std::vector<uint8_t> raw{0x11, 0x22, 0x33, 0x44, 0x55};
+  OneByteIStream one_byte_stream(raw);
+  ForyInputStream stream(one_byte_stream, 2);
+  Buffer reader(stream);
+  Error error;
+  ASSERT_TRUE(reader.ensure_readable(4, error)) << error.to_string();
+
+  EXPECT_EQ(reader.get<uint32_t>(0), 0x44332211U);
+  reader.reader_index(4);
+  EXPECT_EQ(reader.read_uint8(error), 0x55);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+}
+
+TEST(Buffer, StreamReadBytesAndSkipAdvanceReaderIndex) {
+  std::vector<uint8_t> raw{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+  OneByteIStream one_byte_stream(raw);
+  ForyInputStream stream(one_byte_stream, 2);
+  Buffer reader(stream);
+  Error error;
+  uint8_t out[5] = {0};
+
+  reader.read_bytes(out, 5, error);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.reader_index(), 5U);
+  EXPECT_EQ(out[0], 0U);
+  EXPECT_EQ(out[4], 4U);
+
+  reader.skip(3, error);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+  EXPECT_EQ(reader.reader_index(), 8U);
+  EXPECT_EQ(reader.read_uint8(error), 8U);
+  ASSERT_TRUE(error.ok()) << error.to_string();
+}
+
+TEST(Buffer, StreamSkipAndUnread) {
+  std::vector<uint8_t> raw{0x01, 0x02, 0x03, 0x04, 0x05};
+  OneByteIStream one_byte_stream(raw);
+  ForyInputStream stream(one_byte_stream, 2);
+  auto fill_result = stream.fill_buffer(4);
+  ASSERT_TRUE(fill_result.ok()) << fill_result.error().to_string();
+
+  Buffer &view = stream.get_buffer();
+  EXPECT_EQ(view.size(), 4U);
+  EXPECT_EQ(view.reader_index(), 0U);
+
+  auto skip_result = stream.skip(3);
+  ASSERT_TRUE(skip_result.ok()) << skip_result.error().to_string();
+  EXPECT_EQ(view.reader_index(), 3U);
+
+  auto unread_result = stream.unread(2);
+  ASSERT_TRUE(unread_result.ok()) << unread_result.error().to_string();
+  EXPECT_EQ(view.reader_index(), 1U);
+
+  skip_result = stream.skip(1);
+  ASSERT_TRUE(skip_result.ok()) << skip_result.error().to_string();
+  EXPECT_EQ(view.reader_index(), 2U);
+}
+
+TEST(Buffer, StreamReadErrorWhenInsufficientData) {
+  std::vector<uint8_t> raw{0x01, 0x02, 0x03};
+  OneByteIStream one_byte_stream(raw);
+  ForyInputStream stream(one_byte_stream, 2);
+  Buffer reader(stream);
+  Error error;
+  EXPECT_EQ(reader.read_uint32(error), 0U);
+  EXPECT_FALSE(error.ok());
+  EXPECT_EQ(error.code(), ErrorCode::BufferOutOfBound);
+}
 } // namespace fory
 
 int main(int argc, char **argv) {
diff --git a/cpp/fory/util/stream.cc b/cpp/fory/util/stream.cc
new file mode 100644
index 000000000..a8466ab83
--- /dev/null
+++ b/cpp/fory/util/stream.cc
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fory/util/stream.h"
+
+#include <algorithm>
+#include <cstring>
+#include <limits>
+
+#include "fory/util/buffer.h"
+#include "fory/util/logging.h"
+
+namespace fory {
+
+ForyInputStream::ForyInputStream(std::istream &stream, uint32_t buffer_size)
+    : stream_(&stream),
+      data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+      initial_buffer_size_(
+          std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+      owned_buffer_(std::make_unique<Buffer>()) {
+  bind_buffer(owned_buffer_.get());
+}
+
+ForyInputStream::ForyInputStream(std::shared_ptr<std::istream> stream,
+                                 uint32_t buffer_size)
+    : stream_owner_(std::move(stream)), stream_(stream_owner_.get()),
+      data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+      initial_buffer_size_(
+          std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+      owned_buffer_(std::make_unique<Buffer>()) {
+  FORY_CHECK(stream_owner_ != nullptr) << "stream must not be null";
+  bind_buffer(owned_buffer_.get());
+}
+
+ForyInputStream::~ForyInputStream() = default;
+
+Result<void, Error> ForyInputStream::fill_buffer(uint32_t min_fill_size) {
+  if (min_fill_size == 0 || remaining_size() >= min_fill_size) {
+    return Result<void, Error>();
+  }
+
+  const uint32_t read_pos = buffer_->reader_index_;
+  const uint32_t deficit = min_fill_size - remaining_size();
+  constexpr uint64_t k_max_u32 = std::numeric_limits<uint32_t>::max();
+  const uint64_t required = static_cast<uint64_t>(buffer_->size_) + deficit;
+  if (required > k_max_u32) {
+    return Unexpected(
+        Error::out_of_bound("stream buffer size exceeds uint32 range"));
+  }
+  if (required > data_.size()) {
+    uint64_t new_size =
+        std::max<uint64_t>(required, static_cast<uint64_t>(data_.size()) * 2);
+    if (new_size > k_max_u32) {
+      new_size = k_max_u32;
+    }
+    reserve(static_cast<uint32_t>(new_size));
+  }
+
+  std::streambuf *source = stream_->rdbuf();
+  if (source == nullptr) {
+    return Unexpected(Error::io_error("input stream has no stream buffer"));
+  }
+  uint32_t write_pos = buffer_->size_;
+  while (remaining_size() < min_fill_size) {
+    uint32_t writable = static_cast<uint32_t>(data_.size()) - write_pos;
+    const std::streamsize read_bytes =
+        source->sgetn(reinterpret_cast<char *>(data_.data() + write_pos),
+                      static_cast<std::streamsize>(writable));
+    if (read_bytes <= 0) {
+      return Unexpected(Error::buffer_out_of_bound(read_pos, min_fill_size,
+                                                   remaining_size()));
+    }
+    write_pos += static_cast<uint32_t>(read_bytes);
+    buffer_->size_ = write_pos;
+  }
+  return Result<void, Error>();
+}
+
+Result<void, Error> ForyInputStream::read_to(uint8_t *dst, uint32_t length) {
+  if (length == 0) {
+    return Result<void, Error>();
+  }
+  Error error;
+  if (FORY_PREDICT_FALSE(!buffer_->ensure_readable(length, error))) {
+    return Unexpected(std::move(error));
+  }
+  std::memcpy(dst, buffer_->data_ + buffer_->reader_index_,
+              static_cast<size_t>(length));
+  buffer_->reader_index_ += length;
+  return Result<void, Error>();
+}
+
+Result<void, Error> ForyInputStream::skip(uint32_t size) {
+  if (size == 0) {
+    return Result<void, Error>();
+  }
+  Error error;
+  buffer_->increase_reader_index(size, error);
+  if (FORY_PREDICT_FALSE(!error.ok())) {
+    return Unexpected(std::move(error));
+  }
+  return Result<void, Error>();
+}
+
+Result<void, Error> ForyInputStream::unread(uint32_t size) {
+  if (FORY_PREDICT_FALSE(size > buffer_->reader_index_)) {
+    return Unexpected(Error::buffer_out_of_bound(buffer_->reader_index_, size,
+                                                 buffer_->size_));
+  }
+  buffer_->reader_index_ -= size;
+  return Result<void, Error>();
+}
+
+void ForyInputStream::shrink_buffer() {
+  if (buffer_ == nullptr) {
+    return;
+  }
+
+  const uint32_t read_pos = buffer_->reader_index_;
+  const uint32_t remaining = remaining_size();
+  if (read_pos > 0) {
+    if (remaining > 0) {
+      std::memmove(data_.data(), data_.data() + read_pos,
+                   static_cast<size_t>(remaining));
+    }
+    buffer_->reader_index_ = 0;
+    buffer_->size_ = remaining;
+    buffer_->writer_index_ = remaining;
+  }
+
+  const uint32_t current_capacity = static_cast<uint32_t>(data_.size());
+  uint32_t target_capacity = current_capacity;
+  if (current_capacity > initial_buffer_size_) {
+    if (remaining == 0) {
+      target_capacity = initial_buffer_size_;
+    } else if (remaining <= current_capacity / 4) {
+      const uint32_t doubled =
+          remaining > std::numeric_limits<uint32_t>::max() / 2
+              ? std::numeric_limits<uint32_t>::max()
+              : remaining * 2;
+      target_capacity = std::max<uint32_t>(
+          initial_buffer_size_,
+          std::max<uint32_t>(doubled, static_cast<uint32_t>(1)));
+    }
+  }
+
+  if (target_capacity < current_capacity) {
+    data_.resize(target_capacity);
+    data_.shrink_to_fit();
+    buffer_->data_ = data_.data();
+  }
+}
+
+Buffer &ForyInputStream::get_buffer() { return *buffer_; }
+
+uint32_t ForyInputStream::remaining_size() const {
+  return buffer_->size_ - buffer_->reader_index_;
+}
+
+void ForyInputStream::reserve(uint32_t new_size) {
+  data_.resize(new_size);
+  buffer_->data_ = data_.data();
+}
+
+void ForyInputStream::bind_buffer(Buffer *buffer) {
+  Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
+  if (target == nullptr) {
+    if (buffer_ != nullptr) {
+      buffer_->stream_reader_ = nullptr;
+    }
+    buffer_ = nullptr;
+    return;
+  }
+
+  if (buffer_ == target) {
+    buffer_->data_ = data_.data();
+    buffer_->own_data_ = false;
+    buffer_->wrapped_vector_ = nullptr;
+    buffer_->stream_reader_ = this;
+    return;
+  }
+
+  Buffer *source = buffer_;
+  if (source != nullptr) {
+    target->size_ = source->size_;
+    target->writer_index_ = source->writer_index_;
+    target->reader_index_ = source->reader_index_;
+    source->stream_reader_ = nullptr;
+  } else {
+    target->size_ = 0;
+    target->writer_index_ = 0;
+    target->reader_index_ = 0;
+  }
+
+  buffer_ = target;
+  buffer_->data_ = data_.data();
+  buffer_->own_data_ = false;
+  buffer_->wrapped_vector_ = nullptr;
+  buffer_->stream_reader_ = this;
+}
+
+} // namespace fory
diff --git a/cpp/fory/util/stream.h b/cpp/fory/util/stream.h
new file mode 100644
index 000000000..be37e66b8
--- /dev/null
+++ b/cpp/fory/util/stream.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <istream>
+#include <memory>
+#include <vector>
+
+#include "fory/util/error.h"
+#include "fory/util/result.h"
+
+namespace fory {
+
+class Buffer;
+
+class StreamReader : public std::enable_shared_from_this<StreamReader> {
+public:
+  virtual ~StreamReader() = default;
+
+  virtual Result<void, Error> fill_buffer(uint32_t min_fill_size) = 0;
+
+  virtual Result<void, Error> read_to(uint8_t *dst, uint32_t length) = 0;
+
+  virtual Result<void, Error> skip(uint32_t size) = 0;
+
+  virtual Result<void, Error> unread(uint32_t size) = 0;
+
+  virtual void shrink_buffer() = 0;
+
+  virtual Buffer &get_buffer() = 0;
+
+  // Bind the reader to an external Buffer. Passing nullptr rebinds to the
+  // reader-owned internal buffer.
+  virtual void bind_buffer(Buffer *buffer) = 0;
+};
+
+class ForyInputStream final : public StreamReader {
+public:
+  explicit ForyInputStream(std::istream &stream, uint32_t buffer_size = 4096);
+
+  explicit ForyInputStream(std::shared_ptr<std::istream> stream,
+                           uint32_t buffer_size = 4096);
+
+  ~ForyInputStream() override;
+
+  Result<void, Error> fill_buffer(uint32_t min_fill_size) override;
+
+  Result<void, Error> read_to(uint8_t *dst, uint32_t length) override;
+
+  Result<void, Error> skip(uint32_t size) override;
+
+  Result<void, Error> unread(uint32_t size) override;
+
+  void shrink_buffer() override;
+
+  Buffer &get_buffer() override;
+
+  void bind_buffer(Buffer *buffer) override;
+
+private:
+  uint32_t remaining_size() const;
+
+  void reserve(uint32_t new_size);
+
+  std::shared_ptr<std::istream> stream_owner_;
+  std::istream *stream_ = nullptr;
+  std::vector<uint8_t> data_;
+  uint32_t initial_buffer_size_ = 1;
+  Buffer *buffer_ = nullptr;
+  std::unique_ptr<Buffer> owned_buffer_;
+};
+
+} // namespace fory
diff --git a/python/pyfory/buffer.pxd b/python/pyfory/buffer.pxd
index d6f02f133..a4a33667c 100644
--- a/python/pyfory/buffer.pxd
+++ b/python/pyfory/buffer.pxd
@@ -186,8 +186,6 @@ cdef class Buffer:
 
     cdef inline write_c_buffer(self, const uint8_t* value, int32_t length)
 
-    cdef inline int32_t read_c_buffer(self, uint8_t** buf)
-
     cpdef inline write_bytes_and_size(self, bytes value)
 
     cpdef inline bytes read_bytes_and_size(self)
diff --git a/python/pyfory/buffer.pyx b/python/pyfory/buffer.pyx
index 3f8e0935c..e0a3a0bf3 100644
--- a/python/pyfory/buffer.pyx
+++ b/python/pyfory/buffer.pyx
@@ -34,6 +34,7 @@ from pyfory.includes.libutil cimport(
     CBuffer, allocate_buffer, get_bit as c_get_bit, set_bit as c_set_bit, 
clear_bit as c_clear_bit,
     set_bit_to as c_set_bit_to, CError, CErrorCode, CResultVoidError, 
utf16_has_surrogate_pairs
 )
+from pyfory.includes.libpyfory cimport Fory_PyCreateBufferFromStream
 import os
 from pyfory.error import raise_fory_error
 
@@ -69,6 +70,24 @@ cdef class Buffer:
         self.c_buffer.reader_index(0)
         self.c_buffer.writer_index(0)
 
+    @classmethod
+    def from_stream(cls, stream not None, uint32_t buffer_size=4096):
+        cdef CBuffer* stream_buffer
+        cdef c_string stream_error
+        if Fory_PyCreateBufferFromStream(
+            <PyObject*>stream, buffer_size, &stream_buffer, &stream_error
+        ) != 0:
+            raise ValueError(stream_error.decode("UTF-8"))
+        if stream_buffer == NULL:
+            raise ValueError("failed to create stream buffer")
+        cdef Buffer buffer = Buffer.__new__(Buffer)
+        buffer.c_buffer = move(deref(stream_buffer))
+        del stream_buffer
+        buffer.data = stream
+        buffer.c_buffer.reader_index(0)
+        buffer.c_buffer.writer_index(0)
+        return buffer
+
     @staticmethod
     cdef Buffer wrap(shared_ptr[CBuffer] c_buffer):
         cdef Buffer buffer = Buffer.__new__(Buffer)
@@ -109,7 +128,8 @@ cdef class Buffer:
     cpdef inline void set_reader_index(self, int32_t value):
         if value < 0:
             raise ValueError("reader_index must be >= 0")
-        self.c_buffer.reader_index(<uint32_t>value)
+        if not self.c_buffer.reader_index(<uint32_t>value, self._error):
+            self._raise_if_error()
 
     cpdef inline int32_t get_writer_index(self):
         return <int32_t>self.c_buffer.writer_index()
@@ -287,11 +307,23 @@ cdef class Buffer:
 
     cpdef inline int64_t read_bytes_as_int64(self, int32_t length):
         cdef int64_t result = 0
-        cdef uint32_t offset = self.c_buffer.reader_index()
-        cdef CResultVoidError res = self.c_buffer.get_bytes_as_int64(offset, 
length,  &result)
+        cdef uint32_t offset
+        cdef CResultVoidError res
+        if length == 0:
+            return 0
+        if length < 0 or length > 8:
+            raise_fory_error(
+                CErrorCode.InvalidData,
+                f"get_bytes_as_int64 length should be in range [0, 8], but got 
{length}",
+            )
+        if not self.c_buffer.ensure_readable(<uint32_t>length, self._error):
+            self._raise_if_error()
+        offset = self.c_buffer.reader_index()
+        res = self.c_buffer.get_bytes_as_int64(offset, <uint32_t>length, 
&result)
         if not res.ok():
             raise_fory_error(res.error().code(), res.error().message())
-        self.c_buffer.increase_reader_index(length)
+        self.c_buffer.increase_reader_index(<uint32_t>length, self._error)
+        self._raise_if_error()
         return result
 
     cpdef inline put_bytes(self, uint32_t offset, bytes value):
@@ -502,15 +534,6 @@ cdef class Buffer:
         self.c_buffer.copy_from(offset, value, 0, length)
         self.c_buffer.increase_writer_index(length)
 
-    cdef inline int32_t read_c_buffer(self, uint8_t** buf):
-        cdef int32_t length = self.read_var_uint32()
-        cdef uint8_t* binary_data = self.c_buffer.data()
-        cdef uint32_t offset = self.c_buffer.reader_index()
-        self.check_bound(offset, length)
-        buf[0] = binary_data + offset
-        self.c_buffer.increase_reader_index(length)
-        return length
-
     cpdef inline write_string(self, str value):
         cdef Py_ssize_t length = PyUnicode_GET_LENGTH(value)
         cdef int32_t kind = PyUnicode_KIND(value)
@@ -540,11 +563,19 @@ cdef class Buffer:
     cpdef inline str read_string(self):
         cdef uint64_t header = self.read_var_uint64()
         cdef uint32_t size = header >> 2
-        cdef uint32_t offset = self.c_buffer.reader_index()
-        self.check_bound(offset, size)
-        cdef const char * buf = <const char *>(self.c_buffer.data() + offset)
-        self.c_buffer.increase_reader_index(size)
         cdef uint32_t encoding = header & <uint32_t>0b11
+        if size == 0:
+            return ""
+        cdef uint32_t offset = self.c_buffer.reader_index()
+        cdef uint32_t available = self.c_buffer.size() - offset
+        cdef const char * buf
+        cdef bytes py_bytes
+        if available >= size:
+            buf = <const char *>(self.c_buffer.data() + offset)
+            self.c_buffer.reader_index(offset + size)
+        else:
+            py_bytes = self.read_bytes(<int32_t>size)
+            buf = <const char *>PyBytes_AS_STRING(py_bytes)
         if encoding == 0:
             # PyUnicode_FromASCII
             return PyUnicode_DecodeLatin1(buf, size, "strict")
diff --git a/python/pyfory/includes/libpyfory.pxd 
b/python/pyfory/includes/libpyfory.pxd
new file mode 100644
index 000000000..c0e888c18
--- /dev/null
+++ b/python/pyfory/includes/libpyfory.pxd
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cpython.object cimport PyObject
+from libc.stdint cimport uint32_t
+from libcpp.string cimport string as c_string
+
+from pyfory.includes.libutil cimport CBuffer
+
+cdef extern from "fory/python/pyfory.h" namespace "fory":
+    int Fory_PyCreateBufferFromStream(PyObject* stream, uint32_t buffer_size,
+                                      CBuffer** out, c_string* error_message)
diff --git a/python/pyfory/includes/libutil.pxd 
b/python/pyfory/includes/libutil.pxd
index 3328889b0..50373e19c 100644
--- a/python/pyfory/includes/libutil.pxd
+++ b/python/pyfory/includes/libutil.pxd
@@ -71,13 +71,17 @@ cdef extern from "fory/util/buffer.h" namespace "fory" 
nogil:
 
         inline uint32_t reader_index()
 
+        inline c_bool ensure_readable(uint32_t length, CError& error)
+
         inline void writer_index(uint32_t writer_index)
 
         inline void increase_writer_index(uint32_t diff)
 
         inline void reader_index(uint32_t reader_index)
 
-        inline void increase_reader_index(uint32_t diff)
+        inline c_bool reader_index(uint32_t reader_index, CError& error)
+
+        inline void increase_reader_index(uint32_t diff, CError& error)
 
         void grow(uint32_t min_capacity)
 
diff --git a/python/pyfory/tests/test_buffer.py 
b/python/pyfory/tests/test_buffer.py
index 9080d966b..c5e0c9007 100644
--- a/python/pyfory/tests/test_buffer.py
+++ b/python/pyfory/tests/test_buffer.py
@@ -15,13 +15,56 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import pytest
+
 from pyfory.buffer import Buffer
 from pyfory.tests.core import require_pyarrow
+from pyfory.tests.test_stream import OneByteStream
 from pyfory.utils import lazy_import
 
 pa = lazy_import("pyarrow")
 
 
+class RecvIntoOnlyStream:
+    def __init__(self, data: bytes):
+        self._data = data
+        self._offset = 0
+
+    def recv_into(self, buffer, size=-1):
+        if self._offset >= len(self._data):
+            return 0
+        view = memoryview(buffer).cast("B")
+        if size < 0 or size > len(view):
+            size = len(view)
+        if size == 0:
+            return 0
+        read_size = min(1, size, len(self._data) - self._offset)
+        start = self._offset
+        self._offset += read_size
+        view[:read_size] = self._data[start : start + read_size]
+        return read_size
+
+
+class LegacyRecvIntoOnlyStream:
+    def __init__(self, data: bytes):
+        self._data = data
+        self._offset = 0
+
+    def recvinto(self, buffer, size=-1):
+        if self._offset >= len(self._data):
+            return 0
+        view = memoryview(buffer).cast("B")
+        if size < 0 or size > len(view):
+            size = len(view)
+        if size == 0:
+            return 0
+        read_size = min(1, size, len(self._data) - self._offset)
+        start = self._offset
+        self._offset += read_size
+        view[:read_size] = self._data[start : start + read_size]
+        return read_size
+
+
 def test_buffer():
     buffer = Buffer.allocate(8)
     buffer.write_bool(True)
@@ -248,5 +291,66 @@ def test_read_bytes_as_int64():
     buf.read_bytes_as_int64(8)
 
 
+def test_stream_buffer_read():
+    writer = Buffer.allocate(32)
+    writer.write_uint32(0x01020304)
+    writer.write_int64(-1234567890)
+    writer.write_var_uint32(300)
+    writer.write_varint64(-4567890123)
+    writer.write_tagged_uint64(0x123456789)
+    writer.write_var_uint64(0x1FFFF)
+    writer.write_bytes_and_size(b"stream-data")
+    writer.write_string("hello-stream")
+
+    data = writer.get_bytes(0, writer.get_writer_index())
+    stream = OneByteStream(data)
+    reader = Buffer.from_stream(stream)
+
+    assert reader.read_uint32() == 0x01020304
+    assert reader.read_int64() == -1234567890
+    assert reader.read_var_uint32() == 300
+    assert reader.read_varint64() == -4567890123
+    assert reader.read_tagged_uint64() == 0x123456789
+    assert reader.read_var_uint64() == 0x1FFFF
+    assert reader.read_bytes_and_size() == b"stream-data"
+    assert reader.read_string() == "hello-stream"
+
+
+def test_stream_buffer_read_with_recv_into():
+    reader = Buffer.from_stream(RecvIntoOnlyStream(bytes([0x11, 0x22, 0x33, 
0x44])))
+    assert reader.read_uint32() == 0x44332211
+
+
+def test_stream_buffer_read_with_legacy_recvinto():
+    reader = Buffer.from_stream(LegacyRecvIntoOnlyStream(bytes([0x11, 0x22, 
0x33, 0x44])))
+    assert reader.read_uint32() == 0x44332211
+
+
+def test_stream_buffer_set_reader_index():
+    reader = Buffer.from_stream(OneByteStream(bytes([0x11, 0x22, 0x33, 0x44, 
0x55])))
+    reader.set_reader_index(4)
+    assert reader.read_uint8() == 0x55
+
+
+def test_stream_buffer_set_reader_index_out_of_bound():
+    reader = Buffer.from_stream(OneByteStream(b"\x11\x22\x33"))
+    with pytest.raises(Exception, match="Buffer out of bound"):
+        reader.set_reader_index(10)
+
+
+def test_stream_buffer_read_bytes_and_skip_update_reader_index():
+    reader = Buffer.from_stream(OneByteStream(bytes(range(20))), buffer_size=2)
+    assert reader.read_bytes(5) == bytes([0, 1, 2, 3, 4])
+    assert reader.get_reader_index() == 5
+    reader.skip(5)
+    assert reader.get_reader_index() == 10
+
+
+def test_stream_buffer_short_read_error():
+    reader = Buffer.from_stream(OneByteStream(b"\x01\x02\x03"))
+    with pytest.raises(Exception, match="Buffer out of bound"):
+        reader.read_uint32()
+
+
 if __name__ == "__main__":
     test_grow()
diff --git a/python/pyfory/tests/test_stream.py 
b/python/pyfory/tests/test_stream.py
new file mode 100644
index 000000000..a8d6e6e68
--- /dev/null
+++ b/python/pyfory/tests/test_stream.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import pyfory
+from pyfory.buffer import Buffer
+
+
+class OneByteStream:
+    def __init__(self, data: bytes):
+        self._data = data
+        self._offset = 0
+
+    def read(self, size=-1):
+        if self._offset >= len(self._data):
+            return b""
+        if size < 0:
+            size = len(self._data) - self._offset
+        if size == 0:
+            return b""
+        read_size = min(1, size, len(self._data) - self._offset)
+        start = self._offset
+        self._offset += read_size
+        return self._data[start : start + read_size]
+
+    def readinto(self, buffer):
+        if self._offset >= len(self._data):
+            return 0
+        view = memoryview(buffer).cast("B")
+        if len(view) == 0:
+            return 0
+        read_size = min(1, len(view), len(self._data) - self._offset)
+        start = self._offset
+        self._offset += read_size
+        view[:read_size] = self._data[start : start + read_size]
+        return read_size
+
+    def recv_into(self, buffer, size=-1):
+        if self._offset >= len(self._data):
+            return 0
+        view = memoryview(buffer).cast("B")
+        if size < 0 or size > len(view):
+            size = len(view)
+        if size == 0:
+            return 0
+        read_size = min(1, size, len(self._data) - self._offset)
+        start = self._offset
+        self._offset += read_size
+        view[:read_size] = self._data[start : start + read_size]
+        return read_size
+
+    def recvinto(self, buffer, size=-1):
+        return self.recv_into(buffer, size)
+
+
[email protected]("xlang", [False, True])
+def test_stream_roundtrip_primitives_and_strings(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    values = [
+        0,
+        -123456789,
+        3.1415926,
+        "stream-hello",
+        "stream-你好-😀",
+        b"binary-data" * 8,
+        [1, 2, 3, 5, 8],
+    ]
+
+    for value in values:
+        data = fory.serialize(value)
+        restored = fory.deserialize(Buffer.from_stream(OneByteStream(data)))
+        assert restored == value
+
+
[email protected]("xlang", [False, True])
+def test_stream_roundtrip_nested_collections(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    value = {
+        "name": "stream-object",
+        "items": [1, 2, {"k1": "v1", "k2": [3, 4, 5]}],
+        "scores": {"a": 10, "b": 20},
+        "flags": [True, False, True],
+    }
+
+    data = fory.serialize(value)
+    restored = fory.deserialize(Buffer.from_stream(OneByteStream(data)))
+    assert restored == value
+
+
+def test_stream_roundtrip_reference_graph_python_mode():
+    fory = pyfory.Fory(xlang=False, ref=True)
+    shared = ["x", 1, 2]
+    value = {"a": shared, "b": shared}
+    cycle = []
+    cycle.append(cycle)
+
+    data_ref = fory.serialize(value)
+    restored_ref = 
fory.deserialize(Buffer.from_stream(OneByteStream(data_ref)))
+    assert restored_ref["a"] == shared
+    assert restored_ref["a"] is restored_ref["b"]
+
+    data_cycle = fory.serialize(cycle)
+    restored_cycle = 
fory.deserialize(Buffer.from_stream(OneByteStream(data_cycle)))
+    assert restored_cycle[0] is restored_cycle
+
+
[email protected]("xlang", [False, True])
+def test_stream_deserialize_multiple_objects_from_single_stream(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    expected = [
+        2026,
+        "multi-object-stream",
+        {"k": [1, 2, 3], "nested": {"x": True}},
+        [10, 20, 30, 40],
+    ]
+
+    write_buffer = Buffer.allocate(1024)
+    for obj in expected:
+        fory.serialize(obj, write_buffer)
+
+    reader = Buffer.from_stream(OneByteStream(write_buffer.get_bytes(0, 
write_buffer.get_writer_index())))
+    for obj in expected:
+        assert fory.deserialize(reader) == obj
+
+    assert reader.get_reader_index() == reader.size()
+
+
[email protected]("xlang", [False, True])
+def test_stream_deserialize_truncated_error(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    data = fory.serialize({"k": "value", "numbers": [1, 2, 3, 4]})
+    truncated = data[:-1]
+
+    with pytest.raises(Exception):
+        fory.deserialize(Buffer.from_stream(OneByteStream(truncated)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to