This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c4510af0 feat(python/cpp): add streaming serialization support to 
python and c++ (#3449)
9c4510af0 is described below

commit 9c4510af0eb48490e96e577fbaf2859b5d51ee44
Author: Shawn Yang <[email protected]>
AuthorDate: Wed Mar 4 10:56:05 2026 +0800

    feat(python/cpp): add streaming serialization support to python and c++ 
(#3449)
    
    ## Why?
    
    Add output-stream serialization support so large payloads can be written
    incrementally instead of materializing full byte arrays first, and align
    Python/C++ stream APIs for end-to-end stream roundtrips.
    
    
    ## What does this PR do?
    
    - Add C++ output-stream serialization APIs:
    `Fory::serialize(OutputStream&, const T&)`,
    `Fory::serialize(std::ostream&, const T&)`, and corresponding
    `ThreadSafeFory` overloads.
    - Unify stream input types in C++ (`StreamReader`/`ForyInputStream` to
    `InputStream`/`StdInputStream`) and wire stream-writer state/reset
    handling through `WriteContext` and `Buffer`.
    - Introduce flush barrier and incremental flush integration for chunked
    serializers (notably map and struct paths) to keep chunk headers valid
    while streaming.
    - Add Python stream-write bridge (`PyOutputStream`),
    `Buffer.wrap_stream` + stream writer binding, and `Fory.dump(obj,
    stream)` for direct serialization to Python writable streams.
    - Expand tests in C++ and Python for stream output roundtrip/parity,
    partial-write flush behavior, and map chunk streaming paths.
    
    
    ## Related issues
    
    Closes #3445
    Closes #3444
    Closes #3452
    
    
    ## Does this PR introduce any user-facing change?
    
    Yes. This adds new public stream serialization APIs in C++ and Python
    (`dump` to writable streams).
    
    
    - [x] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    ## Benchmark
---
 cpp/fory/serialization/context.cc          |   2 +
 cpp/fory/serialization/context.h           |  68 ++++++++--
 cpp/fory/serialization/fory.h              |  87 +++++++++++--
 cpp/fory/serialization/map_serializer.h    |  13 ++
 cpp/fory/serialization/stream_test.cc      |  87 ++++++++++++-
 cpp/fory/serialization/struct_serializer.h |   8 ++
 cpp/fory/util/buffer.cc                    |  34 +++--
 cpp/fory/util/buffer.h                     | 100 ++++++++++-----
 cpp/fory/util/buffer_test.cc               | 114 +++++++++++++++-
 cpp/fory/util/stream.cc                    | 144 ++++++++++++++++++---
 cpp/fory/util/stream.h                     | 123 +++++++++++++++++-
 python/pyfory/_fory.py                     |  88 ++++++++++++-
 python/pyfory/buffer.pxi                   | 112 +++++++++++++++-
 python/pyfory/collection.pxi               |   3 +
 python/pyfory/collection.py                |   3 +
 python/pyfory/cpp/pyfory.cc                | 200 +++++++++++++++++++++++++++--
 python/pyfory/cpp/pyfory.h                 |   6 +
 python/pyfory/includes/libpyfory.pxd       |   7 +-
 python/pyfory/includes/libutil.pxd         |  13 ++
 python/pyfory/serialization.pyx            |  75 ++++++++++-
 python/pyfory/struct.pxi                   |   1 +
 python/pyfory/struct.py                    |   1 +
 python/pyfory/tests/test_buffer.py         |  75 ++++++++++-
 python/pyfory/tests/test_stream.py         | 102 +++++++++++++++
 24 files changed, 1331 insertions(+), 135 deletions(-)

diff --git a/cpp/fory/serialization/context.cc 
b/cpp/fory/serialization/context.cc
index 12c22deb7..28fbc6b1d 100644
--- a/cpp/fory/serialization/context.cc
+++ b/cpp/fory/serialization/context.cc
@@ -406,6 +406,8 @@ void WriteContext::reset() {
   has_first_type_info_ = false;
   type_info_index_map_active_ = false;
   current_dyn_depth_ = 0;
+  buffer_.clear_output_stream();
+  output_stream_ = nullptr;
   // reset buffer indices for reuse - no memory operations needed
   buffer_.writer_index(0);
   buffer_.reader_index(0);
diff --git a/cpp/fory/serialization/context.h b/cpp/fory/serialization/context.h
index 18e5e68bd..e080604f6 100644
--- a/cpp/fory/serialization/context.h
+++ b/cpp/fory/serialization/context.h
@@ -118,6 +118,10 @@ public:
   /// get const reference to internal output buffer.
   inline const Buffer &buffer() const { return buffer_; }
 
+  inline void set_output_stream(OutputStream *output_stream) {
+    output_stream_ = output_stream;
+  }
+
   /// get reference writer for tracking shared references.
   inline RefWriter &ref_writer() { return ref_writer_; }
 
@@ -167,70 +171,107 @@ public:
     }
   }
 
+  inline uint32_t flush_barrier_depth() const {
+    return output_stream_ == nullptr ? 0
+                                     : output_stream_->flush_barrier_depth();
+  }
+
+  inline void enter_flush_barrier() {
+    if (output_stream_ != nullptr) {
+      output_stream_->enter_flush_barrier();
+    }
+  }
+
+  inline void exit_flush_barrier() {
+    if (output_stream_ != nullptr) {
+      output_stream_->exit_flush_barrier();
+    }
+  }
+
+  inline void try_flush() {
+    if (output_stream_ == nullptr || buffer_.writer_index() <= 4096) {
+      return;
+    }
+    output_stream_->try_flush();
+    if (FORY_PREDICT_FALSE(output_stream_->has_error())) {
+      set_error(output_stream_->error());
+    }
+  }
+
+  inline void force_flush() {
+    if (output_stream_ == nullptr) {
+      return;
+    }
+    output_stream_->force_flush();
+    if (FORY_PREDICT_FALSE(output_stream_->has_error())) {
+      set_error(output_stream_->error());
+    }
+  }
+
   /// write uint8_t value to buffer.
   FORY_ALWAYS_INLINE void write_uint8(uint8_t value) {
-    buffer().write_uint8(value);
+    buffer_.write_uint8(value);
   }
 
   /// write int8_t value to buffer.
   FORY_ALWAYS_INLINE void write_int8(int8_t value) {
-    buffer().write_int8(value);
+    buffer_.write_int8(value);
   }
 
   /// write uint16_t value to buffer.
   FORY_ALWAYS_INLINE void write_uint16(uint16_t value) {
-    buffer().write_uint16(value);
+    buffer_.write_uint16(value);
   }
 
   /// write uint32_t value to buffer.
   FORY_ALWAYS_INLINE void write_uint32(uint32_t value) {
-    buffer().write_uint32(value);
+    buffer_.write_uint32(value);
   }
 
   /// write int64_t value to buffer.
   FORY_ALWAYS_INLINE void write_int64(int64_t value) {
-    buffer().write_int64(value);
+    buffer_.write_int64(value);
   }
 
   /// write uint32_t value as varint to buffer.
   FORY_ALWAYS_INLINE void write_var_uint32(uint32_t value) {
-    buffer().write_var_uint32(value);
+    buffer_.write_var_uint32(value);
   }
 
   /// write int32_t value as zigzag varint to buffer.
   FORY_ALWAYS_INLINE void write_varint32(int32_t value) {
-    buffer().write_var_int32(value);
+    buffer_.write_var_int32(value);
   }
 
   /// write uint64_t value as varint to buffer.
   FORY_ALWAYS_INLINE void write_var_uint64(uint64_t value) {
-    buffer().write_var_uint64(value);
+    buffer_.write_var_uint64(value);
   }
 
   /// write int64_t value as zigzag varint to buffer.
   FORY_ALWAYS_INLINE void write_varint64(int64_t value) {
-    buffer().write_var_int64(value);
+    buffer_.write_var_int64(value);
   }
 
   /// write uint64_t value using tagged encoding to buffer.
   FORY_ALWAYS_INLINE void write_tagged_uint64(uint64_t value) {
-    buffer().write_tagged_uint64(value);
+    buffer_.write_tagged_uint64(value);
   }
 
   /// write int64_t value using tagged encoding to buffer.
   FORY_ALWAYS_INLINE void write_tagged_int64(int64_t value) {
-    buffer().write_tagged_int64(value);
+    buffer_.write_tagged_int64(value);
   }
 
   /// write uint64_t value as varuint36small to buffer.
   /// This is the special variable-length encoding used for string headers.
   FORY_ALWAYS_INLINE void write_var_uint36_small(uint64_t value) {
-    buffer().write_var_uint36_small(value);
+    buffer_.write_var_uint36_small(value);
   }
 
   /// write raw bytes to buffer.
   FORY_ALWAYS_INLINE void write_bytes(const void *data, uint32_t length) {
-    buffer().write_bytes(data, length);
+    buffer_.write_bytes(data, length);
   }
 
   /// write TypeMeta inline using streaming protocol.
@@ -329,6 +370,7 @@ private:
   std::unique_ptr<TypeResolver> type_resolver_;
   RefWriter ref_writer_;
   uint32_t current_dyn_depth_;
+  OutputStream *output_stream_ = nullptr;
 
   // Meta sharing state (for streaming inline TypeMeta)
   // Maps TypeInfo* to index for reference tracking - uses map size as counter
diff --git a/cpp/fory/serialization/fory.h b/cpp/fory/serialization/fory.h
index 09a0bfa48..1c5f19522 100644
--- a/cpp/fory/serialization/fory.h
+++ b/cpp/fory/serialization/fory.h
@@ -41,6 +41,7 @@
 #include <cstring>
 #include <memory>
 #include <mutex>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -501,6 +502,52 @@ public:
     return result;
   }
 
+  /// Serialize an object to an output stream.
+  ///
+  /// @tparam T The type of object to serialize.
+  /// @param output_stream The output stream.
+  /// @param obj The object to serialize.
+  /// @return Number of bytes written, or error.
+  template <typename T>
+  Result<size_t, Error> serialize(OutputStream &output_stream, const T &obj) {
+    if (FORY_PREDICT_FALSE(!finalized_)) {
+      ensure_finalized();
+    }
+    WriteContextGuard guard(*write_ctx_);
+    output_stream.reset();
+    write_ctx_->set_output_stream(&output_stream);
+    Buffer &buffer = write_ctx_->buffer();
+    buffer.bind_output_stream(&output_stream);
+    auto serialize_result = serialize_impl(obj, buffer);
+    if (FORY_PREDICT_FALSE(!serialize_result.ok())) {
+      buffer.clear_output_stream();
+      write_ctx_->set_output_stream(nullptr);
+      return Unexpected(std::move(serialize_result).error());
+    }
+    output_stream.force_flush();
+    buffer.clear_output_stream();
+    write_ctx_->set_output_stream(nullptr);
+    if (FORY_PREDICT_FALSE(output_stream.has_error())) {
+      return Unexpected(output_stream.error());
+    }
+    if (FORY_PREDICT_FALSE(write_ctx_->has_error())) {
+      return Unexpected(write_ctx_->take_error());
+    }
+    return output_stream.flushed_bytes();
+  }
+
+  /// Serialize an object to a std::ostream.
+  ///
+  /// @tparam T The type of object to serialize.
+  /// @param ostream The output stream.
+  /// @param obj The object to serialize.
+  /// @return Number of bytes written, or error.
+  template <typename T>
+  Result<size_t, Error> serialize(std::ostream &ostream, const T &obj) {
+    StdOutputStream output_stream(ostream);
+    return serialize(output_stream, obj);
+  }
+
   /// Serialize an object to an existing Buffer (fastest path).
   ///
   /// @tparam T The type of object to serialize.
@@ -627,36 +674,36 @@ public:
     return deserialize_impl<T>(buffer);
   }
 
-  /// Deserialize an object from a stream reader.
+  /// Deserialize an object from an input stream.
   ///
   /// This overload obtains the reader-owned Buffer via get_buffer() and
   /// continues deserialization on that buffer.
   ///
   /// @tparam T The type of object to deserialize.
-  /// @param stream_reader Stream reader to read from.
+  /// @param input_stream Input stream to read from.
   /// @return Deserialized object, or error.
   template <typename T>
-  Result<T, Error> deserialize(StreamReader &stream_reader) {
+  Result<T, Error> deserialize(InputStream &input_stream) {
     struct StreamShrinkGuard {
-      StreamReader *stream_reader = nullptr;
+      InputStream *input_stream = nullptr;
       ~StreamShrinkGuard() {
-        if (stream_reader != nullptr) {
-          stream_reader->shrink_buffer();
+        if (input_stream != nullptr) {
+          input_stream->shrink_buffer();
         }
       }
     };
-    StreamShrinkGuard shrink_guard{&stream_reader};
-    Buffer &buffer = stream_reader.get_buffer();
+    StreamShrinkGuard shrink_guard{&input_stream};
+    Buffer &buffer = input_stream.get_buffer();
     return deserialize<T>(buffer);
   }
 
-  /// Deserialize an object from ForyInputStream.
+  /// Deserialize an object from StdInputStream.
   ///
   /// @tparam T The type of object to deserialize.
   /// @param stream Input stream wrapper to read from.
   /// @return Deserialized object, or error.
-  template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
-    return deserialize<T>(static_cast<StreamReader &>(stream));
+  template <typename T> Result<T, Error> deserialize(StdInputStream &stream) {
+    return deserialize<T>(static_cast<InputStream &>(stream));
   }
 
   // ==========================================================================
@@ -805,6 +852,18 @@ public:
     return fory_handle->serialize(obj);
   }
 
+  template <typename T>
+  Result<size_t, Error> serialize(OutputStream &output_stream, const T &obj) {
+    auto fory_handle = fory_pool_.acquire();
+    return fory_handle->serialize(output_stream, obj);
+  }
+
+  template <typename T>
+  Result<size_t, Error> serialize(std::ostream &ostream, const T &obj) {
+    auto fory_handle = fory_pool_.acquire();
+    return fory_handle->serialize(ostream, obj);
+  }
+
   template <typename T>
   Result<size_t, Error> serialize_to(Buffer &buffer, const T &obj) {
     auto fory_handle = fory_pool_.acquire();
@@ -830,12 +889,12 @@ public:
   }
 
   template <typename T>
-  Result<T, Error> deserialize(StreamReader &stream_reader) {
+  Result<T, Error> deserialize(InputStream &input_stream) {
     auto fory_handle = fory_pool_.acquire();
-    return fory_handle->template deserialize<T>(stream_reader);
+    return fory_handle->template deserialize<T>(input_stream);
   }
 
-  template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
+  template <typename T> Result<T, Error> deserialize(StdInputStream &stream) {
     auto fory_handle = fory_pool_.acquire();
     return fory_handle->template deserialize<T>(stream);
   }
diff --git a/cpp/fory/serialization/map_serializer.h 
b/cpp/fory/serialization/map_serializer.h
index 5bd9bea51..dd2952da9 100644
--- a/cpp/fory/serialization/map_serializer.h
+++ b/cpp/fory/serialization/map_serializer.h
@@ -138,6 +138,7 @@ inline void write_map_data_fast(const MapType &map, 
WriteContext &ctx,
     // If nullability is needed, use the slow path
 
     if (need_write_header) {
+      ctx.enter_flush_barrier();
       // reserve space for header (1 byte) + chunk size (1 byte)
       header_offset = ctx.buffer().writer_index();
       ctx.write_uint16(0); // Placeholder for header and chunk size
@@ -174,6 +175,8 @@ inline void write_map_data_fast(const MapType &map, 
WriteContext &ctx,
     pair_counter++;
     if (pair_counter == MAX_CHUNK_SIZE) {
       write_chunk_size(ctx, header_offset, pair_counter);
+      ctx.exit_flush_barrier();
+      ctx.try_flush();
       pair_counter = 0;
       need_write_header = true;
     }
@@ -182,6 +185,8 @@ inline void write_map_data_fast(const MapType &map, 
WriteContext &ctx,
   // write final chunk size
   if (pair_counter > 0) {
     write_chunk_size(ctx, header_offset, pair_counter);
+    ctx.exit_flush_barrier();
+    ctx.try_flush();
   }
 }
 
@@ -238,6 +243,7 @@ inline void write_map_data_slow(const MapType &map, 
WriteContext &ctx,
       // Finish current chunk if any
       if (pair_counter > 0) {
         write_chunk_size(ctx, header_offset, pair_counter);
+        ctx.exit_flush_barrier();
         pair_counter = 0;
         need_write_header = true;
       }
@@ -394,9 +400,12 @@ inline void write_map_data_slow(const MapType &map, 
WriteContext &ctx,
       // Finish previous chunk if types changed
       if (types_changed && pair_counter > 0) {
         write_chunk_size(ctx, header_offset, pair_counter);
+        ctx.exit_flush_barrier();
+        ctx.try_flush();
         pair_counter = 0;
       }
 
+      ctx.enter_flush_barrier();
       // write new chunk header
       header_offset = ctx.buffer().writer_index();
       ctx.write_uint16(0); // Placeholder for header and chunk size
@@ -513,6 +522,8 @@ inline void write_map_data_slow(const MapType &map, 
WriteContext &ctx,
     pair_counter++;
     if (pair_counter == MAX_CHUNK_SIZE) {
       write_chunk_size(ctx, header_offset, pair_counter);
+      ctx.exit_flush_barrier();
+      ctx.try_flush();
       pair_counter = 0;
       need_write_header = true;
       current_key_type_info = nullptr;
@@ -523,6 +534,8 @@ inline void write_map_data_slow(const MapType &map, 
WriteContext &ctx,
   // write final chunk size
   if (pair_counter > 0) {
     write_chunk_size(ctx, header_offset, pair_counter);
+    ctx.exit_flush_barrier();
+    ctx.try_flush();
   }
 }
 
diff --git a/cpp/fory/serialization/stream_test.cc 
b/cpp/fory/serialization/stream_test.cc
index edbefd63d..98783a37e 100644
--- a/cpp/fory/serialization/stream_test.cc
+++ b/cpp/fory/serialization/stream_test.cc
@@ -110,6 +110,44 @@ private:
   OneByteStreamBuf buf_;
 };
 
+class OneByteOutputStreamBuf final : public std::streambuf {
+public:
+  OneByteOutputStreamBuf() = default;
+
+  const std::vector<uint8_t> &data() const { return data_; }
+
+protected:
+  std::streamsize xsputn(const char *s, std::streamsize count) override {
+    if (count <= 0) {
+      return 0;
+    }
+    data_.insert(data_.end(), reinterpret_cast<const uint8_t *>(s),
+                 reinterpret_cast<const uint8_t *>(s + count));
+    return count;
+  }
+
+  int_type overflow(int_type ch) override {
+    if (traits_type::eq_int_type(ch, traits_type::eof())) {
+      return traits_type::not_eof(ch);
+    }
+    data_.push_back(static_cast<uint8_t>(traits_type::to_char_type(ch)));
+    return ch;
+  }
+
+private:
+  std::vector<uint8_t> data_;
+};
+
+class OneByteOStream final : public std::ostream {
+public:
+  OneByteOStream() : std::ostream(nullptr) { rdbuf(&buf_); }
+
+  std::vector<uint8_t> data() const { return buf_.data(); }
+
+private:
+  OneByteOutputStreamBuf buf_;
+};
+
 static inline void register_stream_types(Fory &fory) {
   uint32_t type_id = 1;
   fory.register_struct<StreamPoint>(type_id++);
@@ -124,7 +162,7 @@ TEST(StreamSerializationTest, PrimitiveAndStringRoundTrip) {
   ASSERT_TRUE(number_bytes_result.ok())
       << number_bytes_result.error().to_string();
   OneByteIStream number_source(std::move(number_bytes_result).value());
-  ForyInputStream number_stream(number_source, 8);
+  StdInputStream number_stream(number_source, 8);
   auto number_result = fory.deserialize<int64_t>(number_stream);
   ASSERT_TRUE(number_result.ok()) << number_result.error().to_string();
   EXPECT_EQ(number_result.value(), -9876543212345LL);
@@ -133,7 +171,7 @@ TEST(StreamSerializationTest, PrimitiveAndStringRoundTrip) {
   ASSERT_TRUE(string_bytes_result.ok())
       << string_bytes_result.error().to_string();
   OneByteIStream string_source(std::move(string_bytes_result).value());
-  ForyInputStream string_stream(string_source, 8);
+  StdInputStream string_stream(string_source, 8);
   auto string_result = fory.deserialize<std::string>(string_stream);
   ASSERT_TRUE(string_result.ok()) << string_result.error().to_string();
   EXPECT_EQ(string_result.value(), "stream-hello-世界");
@@ -155,7 +193,7 @@ TEST(StreamSerializationTest, StructRoundTrip) {
   ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
 
   OneByteIStream source(std::move(bytes_result).value());
-  ForyInputStream stream(source, 4);
+  StdInputStream stream(source, 4);
   auto result = fory.deserialize<StreamEnvelope>(stream);
   ASSERT_TRUE(result.ok()) << result.error().to_string();
   EXPECT_EQ(result.value(), original);
@@ -175,7 +213,7 @@ TEST(StreamSerializationTest, 
SequentialDeserializeFromSingleStream) {
   ASSERT_TRUE(fory.serialize_to(bytes, envelope).ok());
 
   OneByteIStream source(bytes);
-  ForyInputStream stream(source, 3);
+  StdInputStream stream(source, 3);
 
   auto first = fory.deserialize<int32_t>(stream);
   ASSERT_TRUE(first.ok()) << first.error().to_string();
@@ -206,7 +244,7 @@ TEST(StreamSerializationTest, 
SharedPointerIdentityRoundTrip) {
   ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
 
   OneByteIStream source(std::move(bytes_result).value());
-  ForyInputStream stream(source, 2);
+  StdInputStream stream(source, 2);
   auto result = fory.deserialize<SharedIntPair>(stream);
   ASSERT_TRUE(result.ok()) << result.error().to_string();
   ASSERT_NE(result.value().first, nullptr);
@@ -230,11 +268,48 @@ TEST(StreamSerializationTest, 
TruncatedStreamReturnsError) {
   truncated.pop_back();
 
   OneByteIStream source(truncated);
-  ForyInputStream stream(source, 4);
+  StdInputStream stream(source, 4);
   auto result = fory.deserialize<StreamEnvelope>(stream);
   EXPECT_FALSE(result.ok());
 }
 
+TEST(StreamSerializationTest, SerializeToOutputStreamRoundTrip) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  StreamEnvelope original{
+      "writer-roundtrip", {2, 4, 6, 8}, {{"x", 1}, {"y", 2}}, {5, -9}, true,
+  };
+
+  OneByteOStream out;
+  StdOutputStream writer(out);
+  auto write_result = fory.serialize(writer, original);
+  ASSERT_TRUE(write_result.ok()) << write_result.error().to_string();
+  ASSERT_GT(write_result.value(), 0U);
+
+  auto bytes = out.data();
+  auto roundtrip = fory.deserialize<StreamEnvelope>(bytes);
+  ASSERT_TRUE(roundtrip.ok()) << roundtrip.error().to_string();
+  EXPECT_EQ(roundtrip.value(), original);
+}
+
+TEST(StreamSerializationTest, SerializeToOStreamOverloadParity) {
+  auto fory = Fory::builder().xlang(true).track_ref(true).build();
+  register_stream_types(fory);
+
+  StreamEnvelope original{
+      "ostream-overload", {11, 22, 33}, {{"k", 99}}, {1, 2}, false,
+  };
+
+  auto expected = fory.serialize(original);
+  ASSERT_TRUE(expected.ok()) << expected.error().to_string();
+
+  OneByteOStream out;
+  auto write_result = fory.serialize(out, original);
+  ASSERT_TRUE(write_result.ok()) << write_result.error().to_string();
+  EXPECT_EQ(out.data(), expected.value());
+}
+
 } // namespace test
 } // namespace serialization
 } // namespace fory
diff --git a/cpp/fory/serialization/struct_serializer.h 
b/cpp/fory/serialization/struct_serializer.h
index 812586114..ccde0e480 100644
--- a/cpp/fory/serialization/struct_serializer.h
+++ b/cpp/fory/serialization/struct_serializer.h
@@ -2997,6 +2997,10 @@ struct Serializer<T, 
std::enable_if_t<is_fory_serializable_v<T>>> {
     constexpr size_t field_count = FieldDescriptor::Size;
     detail::write_struct_fields_impl(
         obj, ctx, std::make_index_sequence<field_count>{}, false);
+    if (FORY_PREDICT_FALSE(ctx.has_error())) {
+      return;
+    }
+    ctx.try_flush();
   }
 
   static void write_data_generic(const T &obj, WriteContext &ctx,
@@ -3025,6 +3029,10 @@ struct Serializer<T, 
std::enable_if_t<is_fory_serializable_v<T>>> {
     constexpr size_t field_count = FieldDescriptor::Size;
     detail::write_struct_fields_impl(
         obj, ctx, std::make_index_sequence<field_count>{}, has_generics);
+    if (FORY_PREDICT_FALSE(ctx.has_error())) {
+      return;
+    }
+    ctx.try_flush();
   }
 
   static T read(ReadContext &ctx, RefMode ref_mode, bool read_type) {
diff --git a/cpp/fory/util/buffer.cc b/cpp/fory/util/buffer.cc
index e7006ae38..eea383354 100644
--- a/cpp/fory/util/buffer.cc
+++ b/cpp/fory/util/buffer.cc
@@ -31,20 +31,25 @@ Buffer::Buffer() {
   writer_index_ = 0;
   reader_index_ = 0;
   wrapped_vector_ = nullptr;
-  stream_reader_ = nullptr;
+  input_stream_ = nullptr;
+  output_stream_ = nullptr;
 }
 
 Buffer::Buffer(Buffer &&buffer) noexcept {
+  FORY_CHECK(buffer.output_stream_ == nullptr)
+      << "Cannot move stream-writer-owned Buffer";
   data_ = buffer.data_;
   size_ = buffer.size_;
   own_data_ = buffer.own_data_;
   writer_index_ = buffer.writer_index_;
   reader_index_ = buffer.reader_index_;
   wrapped_vector_ = buffer.wrapped_vector_;
-  stream_reader_ = buffer.stream_reader_;
-  stream_reader_owner_ = std::move(buffer.stream_reader_owner_);
-  rebind_stream_reader_to_this();
-  buffer.stream_reader_ = nullptr;
+  input_stream_ = buffer.input_stream_;
+  input_stream_owner_ = std::move(buffer.input_stream_owner_);
+  output_stream_ = buffer.output_stream_;
+  rebind_input_stream_to_this();
+  buffer.input_stream_ = nullptr;
+  buffer.output_stream_ = nullptr;
   buffer.data_ = nullptr;
   buffer.size_ = 0;
   buffer.own_data_ = false;
@@ -52,7 +57,11 @@ Buffer::Buffer(Buffer &&buffer) noexcept {
 }
 
 Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
-  detach_stream_reader_from_this();
+  FORY_CHECK(buffer.output_stream_ == nullptr)
+      << "Cannot move stream-writer-owned Buffer";
+  FORY_CHECK(output_stream_ == nullptr)
+      << "Cannot assign to stream-writer-owned Buffer";
+  detach_input_stream_from_this();
   if (own_data_) {
     free(data_);
     data_ = nullptr;
@@ -63,10 +72,12 @@ Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
   writer_index_ = buffer.writer_index_;
   reader_index_ = buffer.reader_index_;
   wrapped_vector_ = buffer.wrapped_vector_;
-  stream_reader_ = buffer.stream_reader_;
-  stream_reader_owner_ = std::move(buffer.stream_reader_owner_);
-  rebind_stream_reader_to_this();
-  buffer.stream_reader_ = nullptr;
+  input_stream_ = buffer.input_stream_;
+  input_stream_owner_ = std::move(buffer.input_stream_owner_);
+  output_stream_ = buffer.output_stream_;
+  rebind_input_stream_to_this();
+  buffer.input_stream_ = nullptr;
+  buffer.output_stream_ = nullptr;
   buffer.data_ = nullptr;
   buffer.size_ = 0;
   buffer.own_data_ = false;
@@ -75,7 +86,8 @@ Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
 }
 
 Buffer::~Buffer() {
-  detach_stream_reader_from_this();
+  clear_output_stream();
+  detach_input_stream_from_this();
   if (own_data_) {
     free(data_);
     data_ = nullptr;
diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h
index 857902fc4..a44de2d75 100644
--- a/cpp/fory/util/buffer.h
+++ b/cpp/fory/util/buffer.h
@@ -35,8 +35,8 @@
 
 namespace fory {
 
-class ForyInputStream;
-class PythonStreamReader;
+class StdInputStream;
+class PyInputStream;
 
 // A buffer class for storing raw bytes with various methods for reading and
 // writing the bytes.
@@ -46,7 +46,7 @@ public:
 
   Buffer(uint8_t *data, uint32_t size, bool own_data = true)
       : data_(data), size_(size), own_data_(own_data), 
wrapped_vector_(nullptr),
-        stream_reader_(nullptr) {
+        input_stream_(nullptr), output_stream_(nullptr) {
     writer_index_ = 0;
     reader_index_ = 0;
   }
@@ -59,16 +59,17 @@ public:
   explicit Buffer(std::vector<uint8_t> &vec)
       : data_(vec.data()), size_(static_cast<uint32_t>(vec.size())),
         own_data_(false), writer_index_(static_cast<uint32_t>(vec.size())),
-        reader_index_(0), wrapped_vector_(&vec), stream_reader_(nullptr) {}
+        reader_index_(0), wrapped_vector_(&vec), input_stream_(nullptr),
+        output_stream_(nullptr) {}
 
-  explicit Buffer(StreamReader &stream_reader)
+  explicit Buffer(InputStream &input_stream)
       : data_(nullptr), size_(0), own_data_(false), writer_index_(0),
         reader_index_(0), wrapped_vector_(nullptr),
-        stream_reader_(&stream_reader) {
-    stream_reader_->bind_buffer(this);
-    stream_reader_owner_ = stream_reader_->weak_from_this().lock();
-    FORY_CHECK(&stream_reader_->get_buffer() == this)
-        << "StreamReader must hold and return the same Buffer instance";
+        input_stream_(&input_stream), output_stream_(nullptr) {
+    input_stream_->bind_buffer(this);
+    input_stream_owner_ = input_stream_->weak_from_this().lock();
+    FORY_CHECK(&input_stream_->get_buffer() == this)
+        << "InputStream must hold and return the same Buffer instance";
   }
 
   Buffer(Buffer &&buffer) noexcept;
@@ -81,6 +82,8 @@ public:
     if (this == &other) {
       return;
     }
+    FORY_CHECK(output_stream_ == nullptr && other.output_stream_ == nullptr)
+        << "Cannot swap stream-writer-owned Buffer";
     using std::swap;
     swap(data_, other.data_);
     swap(size_, other.size_);
@@ -88,10 +91,11 @@ public:
     swap(writer_index_, other.writer_index_);
     swap(reader_index_, other.reader_index_);
     swap(wrapped_vector_, other.wrapped_vector_);
-    swap(stream_reader_, other.stream_reader_);
-    swap(stream_reader_owner_, other.stream_reader_owner_);
-    rebind_stream_reader_to_this();
-    other.rebind_stream_reader_to_this();
+    swap(input_stream_, other.input_stream_);
+    swap(input_stream_owner_, other.input_stream_owner_);
+    swap(output_stream_, other.output_stream_);
+    rebind_input_stream_to_this();
+    other.rebind_input_stream_to_this();
   }
 
   /// \brief Return a pointer to the buffer's data
@@ -103,12 +107,37 @@ public:
   FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }
 
   FORY_ALWAYS_INLINE bool is_stream_backed() const {
-    return stream_reader_ != nullptr;
+    return input_stream_ != nullptr;
+  }
+
+  FORY_ALWAYS_INLINE bool is_output_stream_backed() const {
+    return output_stream_ != nullptr;
+  }
+
+  FORY_ALWAYS_INLINE void bind_output_stream(OutputStream *output_stream) {
+    if (output_stream_ == output_stream) {
+      return;
+    }
+    if (output_stream_ != nullptr) {
+      output_stream_->unbind_buffer(this);
+    }
+    output_stream_ = output_stream;
+    if (output_stream_ != nullptr) {
+      output_stream_->bind_buffer(this);
+    }
+  }
+
+  FORY_ALWAYS_INLINE void clear_output_stream() {
+    if (output_stream_ == nullptr) {
+      return;
+    }
+    output_stream_->unbind_buffer(this);
+    output_stream_ = nullptr;
   }
 
   FORY_ALWAYS_INLINE void shrink_stream_buffer() {
-    if (stream_reader_ != nullptr) {
-      stream_reader_->shrink_buffer();
+    if (input_stream_ != nullptr) {
+      input_stream_->shrink_buffer();
     }
   }
 
@@ -152,7 +181,7 @@ public:
   }
 
   FORY_ALWAYS_INLINE bool reader_index(uint32_t reader_index, Error &error) {
-    if (FORY_PREDICT_FALSE(reader_index > size_ && stream_reader_ != nullptr)) 
{
+    if (FORY_PREDICT_FALSE(reader_index > size_ && input_stream_ != nullptr)) {
       if (FORY_PREDICT_FALSE(
               !fill_buffer(reader_index - reader_index_, error))) {
         return false;
@@ -806,6 +835,9 @@ public:
     grow(length);
     unsafe_put(writer_index_, data, length);
     increase_writer_index(length);
+    if (FORY_PREDICT_FALSE(output_stream_ != nullptr && writer_index_ > 4096)) 
{
+      output_stream_->try_flush();
+    }
   }
 
   // 
===========================================================================
@@ -1223,24 +1255,25 @@ public:
   std::string hex() const;
 
 private:
-  friend class ForyInputStream;
-  friend class PythonStreamReader;
+  friend class StdInputStream;
+  friend class PyInputStream;
+  friend class OutputStream;
 
-  FORY_ALWAYS_INLINE void rebind_stream_reader_to_this() {
-    if (stream_reader_ == nullptr) {
+  FORY_ALWAYS_INLINE void rebind_input_stream_to_this() {
+    if (input_stream_ == nullptr) {
       return;
     }
-    stream_reader_->bind_buffer(this);
-    FORY_CHECK(&stream_reader_->get_buffer() == this)
-        << "StreamReader must hold and return the same Buffer instance";
+    input_stream_->bind_buffer(this);
+    FORY_CHECK(&input_stream_->get_buffer() == this)
+        << "InputStream must hold and return the same Buffer instance";
   }
 
-  FORY_ALWAYS_INLINE void detach_stream_reader_from_this() {
-    if (stream_reader_ == nullptr) {
+  FORY_ALWAYS_INLINE void detach_input_stream_from_this() {
+    if (input_stream_ == nullptr) {
       return;
     }
-    if (&stream_reader_->get_buffer() == this) {
-      stream_reader_->bind_buffer(nullptr);
+    if (&input_stream_->get_buffer() == this) {
+      input_stream_->bind_buffer(nullptr);
     }
   }
 
@@ -1248,11 +1281,11 @@ private:
     if (FORY_PREDICT_TRUE(min_fill_size <= size_ - reader_index_)) {
       return true;
     }
-    if (FORY_PREDICT_TRUE(stream_reader_ == nullptr)) {
+    if (FORY_PREDICT_TRUE(input_stream_ == nullptr)) {
       error.set_buffer_out_of_bound(reader_index_, min_fill_size, size_);
       return false;
     }
-    auto fill_result = stream_reader_->fill_buffer(min_fill_size);
+    auto fill_result = input_stream_->fill_buffer(min_fill_size);
     if (FORY_PREDICT_FALSE(!fill_result.ok())) {
       error = std::move(fill_result).error();
       return false;
@@ -1362,8 +1395,9 @@ private:
   uint32_t writer_index_;
   uint32_t reader_index_;
   std::vector<uint8_t> *wrapped_vector_ = nullptr;
-  StreamReader *stream_reader_ = nullptr;
-  std::shared_ptr<StreamReader> stream_reader_owner_;
+  InputStream *input_stream_ = nullptr;
+  std::shared_ptr<InputStream> input_stream_owner_;
+  OutputStream *output_stream_ = nullptr;
 };
 
 /// \brief Allocate a fixed-size mutable buffer from the default memory pool
diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc
index d06a65ba9..7f32c0685 100644
--- a/cpp/fory/util/buffer_test.cc
+++ b/cpp/fory/util/buffer_test.cc
@@ -70,6 +70,33 @@ private:
   OneByteStreamBuf buf_;
 };
 
+class CountingOutputStream final : public OutputStream {
+public:
+  Result<void, Error> write_to_stream(const uint8_t *src,
+                                      uint32_t length) override {
+    write_calls_++;
+    if (length == 0) {
+      return Result<void, Error>();
+    }
+    data_.insert(data_.end(), src, src + length);
+    return Result<void, Error>();
+  }
+
+  Result<void, Error> flush_stream() override {
+    flush_calls_++;
+    return Result<void, Error>();
+  }
+
+  const std::vector<uint8_t> &data() const { return data_; }
+  uint32_t write_calls() const { return write_calls_; }
+  uint32_t flush_calls() const { return flush_calls_; }
+
+private:
+  std::vector<uint8_t> data_;
+  uint32_t write_calls_ = 0;
+  uint32_t flush_calls_ = 0;
+};
+
 TEST(Buffer, to_string) {
   std::shared_ptr<Buffer> buffer;
   allocate_buffer(16, &buffer);
@@ -226,7 +253,7 @@ TEST(Buffer, StreamReadFromOneByteSource) {
 
   raw.resize(writer.writer_index());
   OneByteIStream one_byte_stream(raw);
-  ForyInputStream stream(one_byte_stream, 8);
+  StdInputStream stream(one_byte_stream, 8);
   Buffer reader(stream);
   Error error;
 
@@ -247,7 +274,7 @@ TEST(Buffer, StreamReadFromOneByteSource) {
 TEST(Buffer, StreamGetAndReaderIndexFromOneByteSource) {
   std::vector<uint8_t> raw{0x11, 0x22, 0x33, 0x44, 0x55};
   OneByteIStream one_byte_stream(raw);
-  ForyInputStream stream(one_byte_stream, 2);
+  StdInputStream stream(one_byte_stream, 2);
   Buffer reader(stream);
   Error error;
   ASSERT_TRUE(reader.ensure_readable(4, error)) << error.to_string();
@@ -261,7 +288,7 @@ TEST(Buffer, StreamGetAndReaderIndexFromOneByteSource) {
 TEST(Buffer, StreamReadBytesAndSkipAdvanceReaderIndex) {
   std::vector<uint8_t> raw{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
   OneByteIStream one_byte_stream(raw);
-  ForyInputStream stream(one_byte_stream, 2);
+  StdInputStream stream(one_byte_stream, 2);
   Buffer reader(stream);
   Error error;
   uint8_t out[5] = {0};
@@ -282,7 +309,7 @@ TEST(Buffer, StreamReadBytesAndSkipAdvanceReaderIndex) {
 TEST(Buffer, StreamSkipAndUnread) {
   std::vector<uint8_t> raw{0x01, 0x02, 0x03, 0x04, 0x05};
   OneByteIStream one_byte_stream(raw);
-  ForyInputStream stream(one_byte_stream, 2);
+  StdInputStream stream(one_byte_stream, 2);
   auto fill_result = stream.fill_buffer(4);
   ASSERT_TRUE(fill_result.ok()) << fill_result.error().to_string();
 
@@ -306,13 +333,90 @@ TEST(Buffer, StreamSkipAndUnread) {
 TEST(Buffer, StreamReadErrorWhenInsufficientData) {
   std::vector<uint8_t> raw{0x01, 0x02, 0x03};
   OneByteIStream one_byte_stream(raw);
-  ForyInputStream stream(one_byte_stream, 2);
+  StdInputStream stream(one_byte_stream, 2);
   Buffer reader(stream);
   Error error;
   EXPECT_EQ(reader.read_uint32(error), 0U);
   EXPECT_FALSE(error.ok());
   EXPECT_EQ(error.code(), ErrorCode::BufferOutOfBound);
 }
+
+TEST(Buffer, OutputStreamThresholdFlushOnWriteBytes) {
+  CountingOutputStream writer;
+  Buffer *buffer = writer.get_buffer();
+  ASSERT_NE(buffer, nullptr);
+
+  std::vector<uint8_t> payload(5000, 7);
+  buffer->write_bytes(payload.data(), static_cast<uint32_t>(payload.size()));
+
+  EXPECT_EQ(buffer->writer_index(), 0U);
+  EXPECT_EQ(writer.data().size(), payload.size());
+  EXPECT_GE(writer.write_calls(), 1U);
+}
+
+TEST(Buffer, OutputStreamThresholdFlushCanBeTemporarilyDisabled) {
+  CountingOutputStream writer;
+  Buffer *buffer = writer.get_buffer();
+  ASSERT_NE(buffer, nullptr);
+  writer.enter_flush_barrier();
+
+  std::vector<uint8_t> payload(5000, 7);
+  buffer->write_bytes(payload.data(), static_cast<uint32_t>(payload.size()));
+
+  EXPECT_EQ(buffer->writer_index(), payload.size());
+  EXPECT_EQ(writer.data().size(), 0U);
+
+  writer.exit_flush_barrier();
+  writer.try_flush();
+  ASSERT_FALSE(writer.has_error()) << writer.error().to_string();
+  EXPECT_EQ(buffer->writer_index(), 0U);
+  EXPECT_EQ(writer.data().size(), payload.size());
+}
+
+TEST(Buffer, OutputStreamForceFlush) {
+  CountingOutputStream writer;
+  Buffer *buffer = writer.get_buffer();
+  ASSERT_NE(buffer, nullptr);
+
+  std::vector<uint8_t> payload{1, 2, 3, 4, 5};
+  buffer->write_bytes(payload.data(), static_cast<uint32_t>(payload.size()));
+  EXPECT_EQ(buffer->writer_index(), payload.size());
+
+  writer.force_flush();
+  ASSERT_FALSE(writer.has_error()) << writer.error().to_string();
+  EXPECT_EQ(buffer->writer_index(), 0U);
+  EXPECT_EQ(writer.data(), payload);
+  EXPECT_EQ(writer.flush_calls(), 1U);
+}
+
+TEST(Buffer, OutputStreamRebindDetachesPreviousBufferBacklink) {
+  CountingOutputStream writer;
+  std::shared_ptr<Buffer> first;
+  std::shared_ptr<Buffer> second;
+  allocate_buffer(16, &first);
+  allocate_buffer(16, &second);
+
+  first->bind_output_stream(&writer);
+  second->bind_output_stream(&writer);
+  EXPECT_FALSE(first->is_output_stream_backed());
+  EXPECT_TRUE(second->is_output_stream_backed());
+
+  writer.enter_flush_barrier();
+  std::vector<uint8_t> second_payload(5000, 7);
+  second->write_bytes(second_payload.data(),
+                      static_cast<uint32_t>(second_payload.size()));
+  EXPECT_EQ(second->writer_index(), second_payload.size());
+  writer.exit_flush_barrier();
+
+  std::vector<uint8_t> first_payload(5000, 3);
+  first->write_bytes(first_payload.data(),
+                     static_cast<uint32_t>(first_payload.size()));
+
+  // A stale backlink on `first` would call try_flush() and flush `second`
+  // because `second` is still the stream's active buffer.
+  EXPECT_EQ(second->writer_index(), second_payload.size());
+  EXPECT_EQ(writer.data().size(), 0U);
+}
 } // namespace fory
 
 int main(int argc, char **argv) {
diff --git a/cpp/fory/util/stream.cc b/cpp/fory/util/stream.cc
index a8466ab83..0d06ec7de 100644
--- a/cpp/fory/util/stream.cc
+++ b/cpp/fory/util/stream.cc
@@ -28,7 +28,78 @@
 
 namespace fory {
 
-ForyInputStream::ForyInputStream(std::istream &stream, uint32_t buffer_size)
+OutputStream::OutputStream(uint32_t buffer_size)
+    : buffer_(std::make_unique<Buffer>()) {
+  const uint32_t actual_size = std::max<uint32_t>(buffer_size, 1U);
+  buffer_->reserve(actual_size);
+  buffer_->writer_index(0);
+  buffer_->reader_index(0);
+  buffer_->bind_output_stream(this);
+  active_buffer_ = buffer_.get();
+}
+
+OutputStream::~OutputStream() {
+  if (active_buffer_ != nullptr && active_buffer_ != buffer_.get()) {
+    active_buffer_->clear_output_stream();
+  }
+  if (buffer_ != nullptr) {
+    buffer_->clear_output_stream();
+  }
+  active_buffer_ = nullptr;
+}
+
+void OutputStream::reset() {
+  flushed_bytes_ = 0;
+  flush_barrier_depth_ = 0;
+  error_.reset();
+  Buffer *buffer = active_buffer();
+  if (buffer != nullptr) {
+    buffer->writer_index(0);
+    buffer->reader_index(0);
+  }
+}
+
+void OutputStream::bind_buffer(Buffer *buffer) {
+  Buffer *next = buffer == nullptr ? buffer_.get() : buffer;
+  if (active_buffer_ == next) {
+    return;
+  }
+  if (active_buffer_ != nullptr && active_buffer_ != buffer_.get()) {
+    // Rebinding must detach the previous external buffer to avoid stale
+    // backlinks that can trigger misdirected flushes and dangling pointers.
+    active_buffer_->output_stream_ = nullptr;
+  }
+  active_buffer_ = next;
+}
+
+void OutputStream::unbind_buffer(Buffer *buffer) {
+  if (active_buffer_ == buffer) {
+    active_buffer_ = buffer_.get();
+  }
+}
+
+uint32_t OutputStream::active_buffer_writer_index() {
+  Buffer *buffer = active_buffer();
+  return buffer == nullptr ? 0U : buffer->writer_index();
+}
+
+void OutputStream::flush_buffer_data() {
+  Buffer *buffer = active_buffer();
+  if (buffer == nullptr || buffer->writer_index() == 0) {
+    return;
+  }
+  const uint32_t bytes_to_flush = buffer->writer_index();
+  auto write_result = write_to_stream(buffer->data(), bytes_to_flush);
+  if (FORY_PREDICT_FALSE(!write_result.ok())) {
+    set_error(std::move(write_result).error());
+    return;
+  }
+  flushed_bytes_ += bytes_to_flush;
+  buffer->writer_index(0);
+  buffer->reader_index(0);
+}
+
+StdInputStream::StdInputStream(std::istream &stream, uint32_t buffer_size)
     : stream_(&stream),
       data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
       initial_buffer_size_(
@@ -37,8 +108,8 @@ ForyInputStream::ForyInputStream(std::istream &stream, 
uint32_t buffer_size)
   bind_buffer(owned_buffer_.get());
 }
 
-ForyInputStream::ForyInputStream(std::shared_ptr<std::istream> stream,
-                                 uint32_t buffer_size)
+StdInputStream::StdInputStream(std::shared_ptr<std::istream> stream,
+                               uint32_t buffer_size)
     : stream_owner_(std::move(stream)), stream_(stream_owner_.get()),
       data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
       initial_buffer_size_(
@@ -48,9 +119,9 @@ 
ForyInputStream::ForyInputStream(std::shared_ptr<std::istream> stream,
   bind_buffer(owned_buffer_.get());
 }
 
-ForyInputStream::~ForyInputStream() = default;
+StdInputStream::~StdInputStream() = default;
 
-Result<void, Error> ForyInputStream::fill_buffer(uint32_t min_fill_size) {
+Result<void, Error> StdInputStream::fill_buffer(uint32_t min_fill_size) {
   if (min_fill_size == 0 || remaining_size() >= min_fill_size) {
     return Result<void, Error>();
   }
@@ -92,7 +163,7 @@ Result<void, Error> ForyInputStream::fill_buffer(uint32_t 
min_fill_size) {
   return Result<void, Error>();
 }
 
-Result<void, Error> ForyInputStream::read_to(uint8_t *dst, uint32_t length) {
+Result<void, Error> StdInputStream::read_to(uint8_t *dst, uint32_t length) {
   if (length == 0) {
     return Result<void, Error>();
   }
@@ -106,7 +177,7 @@ Result<void, Error> ForyInputStream::read_to(uint8_t *dst, 
uint32_t length) {
   return Result<void, Error>();
 }
 
-Result<void, Error> ForyInputStream::skip(uint32_t size) {
+Result<void, Error> StdInputStream::skip(uint32_t size) {
   if (size == 0) {
     return Result<void, Error>();
   }
@@ -118,7 +189,7 @@ Result<void, Error> ForyInputStream::skip(uint32_t size) {
   return Result<void, Error>();
 }
 
-Result<void, Error> ForyInputStream::unread(uint32_t size) {
+Result<void, Error> StdInputStream::unread(uint32_t size) {
   if (FORY_PREDICT_FALSE(size > buffer_->reader_index_)) {
     return Unexpected(Error::buffer_out_of_bound(buffer_->reader_index_, size,
                                                  buffer_->size_));
@@ -127,7 +198,7 @@ Result<void, Error> ForyInputStream::unread(uint32_t size) {
   return Result<void, Error>();
 }
 
-void ForyInputStream::shrink_buffer() {
+void StdInputStream::shrink_buffer() {
   if (buffer_ == nullptr) {
     return;
   }
@@ -167,22 +238,22 @@ void ForyInputStream::shrink_buffer() {
   }
 }
 
-Buffer &ForyInputStream::get_buffer() { return *buffer_; }
+Buffer &StdInputStream::get_buffer() { return *buffer_; }
 
-uint32_t ForyInputStream::remaining_size() const {
+uint32_t StdInputStream::remaining_size() const {
   return buffer_->size_ - buffer_->reader_index_;
 }
 
-void ForyInputStream::reserve(uint32_t new_size) {
+void StdInputStream::reserve(uint32_t new_size) {
   data_.resize(new_size);
   buffer_->data_ = data_.data();
 }
 
-void ForyInputStream::bind_buffer(Buffer *buffer) {
+void StdInputStream::bind_buffer(Buffer *buffer) {
   Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
   if (target == nullptr) {
     if (buffer_ != nullptr) {
-      buffer_->stream_reader_ = nullptr;
+      buffer_->input_stream_ = nullptr;
     }
     buffer_ = nullptr;
     return;
@@ -192,7 +263,7 @@ void ForyInputStream::bind_buffer(Buffer *buffer) {
     buffer_->data_ = data_.data();
     buffer_->own_data_ = false;
     buffer_->wrapped_vector_ = nullptr;
-    buffer_->stream_reader_ = this;
+    buffer_->input_stream_ = this;
     return;
   }
 
@@ -201,7 +272,7 @@ void ForyInputStream::bind_buffer(Buffer *buffer) {
     target->size_ = source->size_;
     target->writer_index_ = source->writer_index_;
     target->reader_index_ = source->reader_index_;
-    source->stream_reader_ = nullptr;
+    source->input_stream_ = nullptr;
   } else {
     target->size_ = 0;
     target->writer_index_ = 0;
@@ -212,7 +283,46 @@ void ForyInputStream::bind_buffer(Buffer *buffer) {
   buffer_->data_ = data_.data();
   buffer_->own_data_ = false;
   buffer_->wrapped_vector_ = nullptr;
-  buffer_->stream_reader_ = this;
+  buffer_->input_stream_ = this;
+}
+
+StdOutputStream::StdOutputStream(std::ostream &stream) : stream_(&stream) {}
+
+StdOutputStream::StdOutputStream(std::shared_ptr<std::ostream> stream)
+    : stream_owner_(std::move(stream)), stream_(stream_owner_.get()) {
+  FORY_CHECK(stream_owner_ != nullptr) << "stream must not be null";
+}
+
+StdOutputStream::~StdOutputStream() = default;
+
+Result<void, Error> StdOutputStream::write_to_stream(const uint8_t *src,
+                                                     uint32_t length) {
+  if (length == 0) {
+    return Result<void, Error>();
+  }
+  if (src == nullptr) {
+    return Unexpected(Error::invalid("output source pointer is null"));
+  }
+  if (stream_ == nullptr) {
+    return Unexpected(Error::io_error("output stream is null"));
+  }
+  stream_->write(reinterpret_cast<const char *>(src),
+                 static_cast<std::streamsize>(length));
+  if (!(*stream_)) {
+    return Unexpected(Error::io_error("failed to write to output stream"));
+  }
+  return Result<void, Error>();
+}
+
+Result<void, Error> StdOutputStream::flush_stream() {
+  if (stream_ == nullptr) {
+    return Unexpected(Error::io_error("output stream is null"));
+  }
+  stream_->flush();
+  if (!(*stream_)) {
+    return Unexpected(Error::io_error("failed to flush output stream"));
+  }
+  return Result<void, Error>();
 }
 
 } // namespace fory
diff --git a/cpp/fory/util/stream.h b/cpp/fory/util/stream.h
index be37e66b8..c97ceafe0 100644
--- a/cpp/fory/util/stream.h
+++ b/cpp/fory/util/stream.h
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <istream>
 #include <memory>
+#include <ostream>
 #include <vector>
 
 #include "fory/util/error.h"
@@ -31,9 +32,98 @@ namespace fory {
 
 class Buffer;
 
-class StreamReader : public std::enable_shared_from_this<StreamReader> {
+class OutputStream {
 public:
-  virtual ~StreamReader() = default;
+  explicit OutputStream(uint32_t buffer_size = 4096);
+
+  virtual ~OutputStream();
+
+  FORY_ALWAYS_INLINE Buffer *get_buffer() { return buffer_.get(); }
+
+  FORY_ALWAYS_INLINE const Buffer *get_buffer() const { return buffer_.get(); }
+
+  FORY_ALWAYS_INLINE void enter_flush_barrier() { flush_barrier_depth_++; }
+
+  FORY_ALWAYS_INLINE void exit_flush_barrier() { flush_barrier_depth_--; }
+
+  FORY_ALWAYS_INLINE bool try_flush() {
+    if (FORY_PREDICT_FALSE(flush_barrier_depth_ != 0)) {
+      return false;
+    }
+    const uint32_t bytes_before_flush = active_buffer_writer_index();
+    if (FORY_PREDICT_FALSE(bytes_before_flush <= 4096)) {
+      return false;
+    }
+    flush_buffer_data();
+    if (FORY_PREDICT_FALSE(!error_.ok())) {
+      return false;
+    }
+    return bytes_before_flush != 0;
+  }
+
+  FORY_ALWAYS_INLINE void force_flush() {
+    if (FORY_PREDICT_FALSE(!error_.ok())) {
+      return;
+    }
+    flush_buffer_data();
+    if (FORY_PREDICT_FALSE(!error_.ok())) {
+      return;
+    }
+    auto flush_result = flush_stream();
+    if (FORY_PREDICT_FALSE(!flush_result.ok())) {
+      set_error(std::move(flush_result).error());
+    }
+  }
+
+  FORY_ALWAYS_INLINE uint32_t flush_barrier_depth() const {
+    return flush_barrier_depth_;
+  }
+
+  FORY_ALWAYS_INLINE size_t flushed_bytes() const { return flushed_bytes_; }
+
+  void reset();
+
+  FORY_ALWAYS_INLINE bool has_error() const { return !error_.ok(); }
+
+  FORY_ALWAYS_INLINE const Error &error() const { return error_; }
+
+protected:
+  virtual Result<void, Error> write_to_stream(const uint8_t *src,
+                                              uint32_t length) = 0;
+
+  virtual Result<void, Error> flush_stream() = 0;
+
+private:
+  void bind_buffer(Buffer *buffer);
+
+  void unbind_buffer(Buffer *buffer);
+
+  FORY_ALWAYS_INLINE Buffer *active_buffer() {
+    return active_buffer_ == nullptr ? buffer_.get() : active_buffer_;
+  }
+
+  void flush_buffer_data();
+
+  uint32_t active_buffer_writer_index();
+
+  FORY_ALWAYS_INLINE void set_error(Error error) {
+    if (error_.ok()) {
+      error_ = std::move(error);
+    }
+  }
+
+  std::unique_ptr<Buffer> buffer_;
+  Buffer *active_buffer_ = nullptr;
+  size_t flushed_bytes_ = 0;
+  uint32_t flush_barrier_depth_ = 0;
+  Error error_;
+
+  friend class Buffer;
+};
+
+class InputStream : public std::enable_shared_from_this<InputStream> {
+public:
+  virtual ~InputStream() = default;
 
   virtual Result<void, Error> fill_buffer(uint32_t min_fill_size) = 0;
 
@@ -52,14 +142,14 @@ public:
   virtual void bind_buffer(Buffer *buffer) = 0;
 };
 
-class ForyInputStream final : public StreamReader {
+class StdInputStream final : public InputStream {
 public:
-  explicit ForyInputStream(std::istream &stream, uint32_t buffer_size = 4096);
+  explicit StdInputStream(std::istream &stream, uint32_t buffer_size = 4096);
 
-  explicit ForyInputStream(std::shared_ptr<std::istream> stream,
-                           uint32_t buffer_size = 4096);
+  explicit StdInputStream(std::shared_ptr<std::istream> stream,
+                          uint32_t buffer_size = 4096);
 
-  ~ForyInputStream() override;
+  ~StdInputStream() override;
 
   Result<void, Error> fill_buffer(uint32_t min_fill_size) override;
 
@@ -88,4 +178,23 @@ private:
   std::unique_ptr<Buffer> owned_buffer_;
 };
 
+class StdOutputStream final : public OutputStream {
+public:
+  explicit StdOutputStream(std::ostream &stream);
+
+  explicit StdOutputStream(std::shared_ptr<std::ostream> stream);
+
+  ~StdOutputStream() override;
+
+protected:
+  Result<void, Error> write_to_stream(const uint8_t *src,
+                                      uint32_t length) override;
+
+  Result<void, Error> flush_stream() override;
+
+private:
+  std::shared_ptr<std::ostream> stream_owner_;
+  std::ostream *stream_ = nullptr;
+};
+
 } // namespace fory
diff --git a/python/pyfory/_fory.py b/python/pyfory/_fory.py
index 6a75abb03..2ccb8115b 100644
--- a/python/pyfory/_fory.py
+++ b/python/pyfory/_fory.py
@@ -157,6 +157,7 @@ class Fory:
         "is_peer_out_of_band_enabled",
         "max_depth",
         "depth",
+        "_output_stream",
         "field_nullable",
         "policy",
     )
@@ -242,6 +243,7 @@ class Fory:
         self.is_peer_out_of_band_enabled = False
         self.max_depth = max_depth
         self.depth = 0
+        self._output_stream = None
 
     def register(
         self,
@@ -394,6 +396,37 @@ class Fory:
         """
         return self.serialize(obj, buffer, buffer_callback, 
unsupported_callback)
 
+    def dump(self, obj, stream):
+        """
+        Serialize an object directly to a writable stream.
+
+        Args:
+            obj: The object to serialize
+            stream: Writable stream implementing write(...)
+
+        Notes:
+            The stream must be a non-retaining sink: ``write(data)`` must
+            synchronously consume ``data`` before returning. Fory may reuse or
+            modify the underlying buffer after ``write`` returns, so retaining
+            the passed object (or a view of it) is unsupported. If your sink
+            needs retention, copy bytes inside ``write``.
+        """
+        try:
+            self.buffer.set_writer_index(0)
+            self._output_stream = Buffer.wrap_output_stream(stream)
+            self.buffer.bind_output_stream(self._output_stream)
+            self._serialize(
+                obj,
+                self.buffer,
+                buffer_callback=None,
+                unsupported_callback=None,
+            )
+            self.force_flush()
+        finally:
+            self.buffer.bind_output_stream(None)
+            self._output_stream = None
+            self.reset_write()
+
     def loads(
         self,
         buffer: Union[Buffer, bytes],
@@ -435,12 +468,17 @@ class Fory:
             <class 'bytes'>
         """
         try:
-            return self._serialize(
+            write_buffer = self._serialize(
                 obj,
                 buffer,
                 buffer_callback=buffer_callback,
                 unsupported_callback=unsupported_callback,
             )
+            if write_buffer is not self.buffer:
+                return write_buffer
+            if write_buffer.get_output_stream() is not None:
+                return write_buffer
+            return write_buffer.to_bytes(0, write_buffer.get_writer_index())
         finally:
             self.reset_write()
 
@@ -450,7 +488,7 @@ class Fory:
         buffer: Buffer = None,
         buffer_callback=None,
         unsupported_callback=None,
-    ) -> Union[Buffer, bytes]:
+    ) -> Buffer:
         assert self.depth == 0, "Nested serialization should use 
write_ref/write_no_ref."
         self.depth += 1
         self.buffer_callback = buffer_callback
@@ -462,6 +500,7 @@ class Fory:
         # 1byte used for bit mask
         buffer.grow(1)
         buffer.set_writer_index(mask_index + 1)
+        buffer.put_int8(mask_index, 0)
         if obj is None:
             set_bit(buffer, mask_index, 0)
         else:
@@ -476,10 +515,29 @@ class Fory:
         # Type definitions are now written inline (streaming) instead of 
deferred to end
 
         self.write_ref(buffer, obj)
-        if buffer is not self.buffer:
-            return buffer
-        else:
-            return buffer.to_bytes(0, buffer.get_writer_index())
+        return buffer
+
+    def enter_flush_barrier(self):
+        output_stream = self._output_stream
+        if output_stream is not None:
+            output_stream.enter_flush_barrier()
+
+    def exit_flush_barrier(self):
+        output_stream = self._output_stream
+        if output_stream is not None:
+            output_stream.exit_flush_barrier()
+
+    def try_flush(self):
+        if self._output_stream is None or self.buffer.get_writer_index() <= 
4096:
+            return
+        output_stream = self._output_stream
+        output_stream.try_flush()
+
+    def force_flush(self):
+        output_stream = self._output_stream
+        if output_stream is None:
+            return
+        output_stream.force_flush()
 
     def write_ref(self, buffer, obj, typeinfo=None, serializer=None):
         if serializer is None and typeinfo is not None:
@@ -687,6 +745,7 @@ class Fory:
         self.metastring_resolver.reset_write()
         self.buffer_callback = None
         self._unsupported_callback = None
+        self._output_stream = None
 
     def reset_read(self):
         """
@@ -907,3 +966,20 @@ class ThreadSafeFory:
         unsupported_objects: Iterable = None,
     ):
         return self.deserialize(buffer, buffers, unsupported_objects)
+
+    def dump(self, obj, stream):
+        """
+        Serialize an object directly to a writable stream.
+
+        Notes:
+            The stream must be a non-retaining sink: ``write(data)`` must
+            synchronously consume ``data`` before returning. Fory may reuse or
+            modify the underlying buffer after ``write`` returns, so retaining
+            the passed object (or a view of it) is unsupported. If your sink
+            needs retention, copy bytes inside ``write``.
+        """
+        fory = self._get_fory()
+        try:
+            return fory.dump(obj, stream)
+        finally:
+            self._return_fory(fory)
diff --git a/python/pyfory/buffer.pxi b/python/pyfory/buffer.pxi
index 74795d823..d9c66e2b5 100644
--- a/python/pyfory/buffer.pxi
+++ b/python/pyfory/buffer.pxi
@@ -14,17 +14,22 @@ from cpython.unicode cimport (
     PyUnicode_DecodeUTF8,
 )
 from cpython.bytes cimport PyBytes_AsString, PyBytes_FromStringAndSize, 
PyBytes_AS_STRING
-from libcpp.memory cimport shared_ptr
+from libcpp.memory cimport shared_ptr, unique_ptr
 from libcpp.utility cimport move
 from cython.operator cimport dereference as deref
 from libcpp.string cimport string as c_string
 from libc.stdint cimport *
 from libcpp cimport bool as c_bool
 from pyfory.includes.libutil cimport(
-    CBuffer, allocate_buffer, get_bit as c_get_bit, set_bit as c_set_bit, 
clear_bit as c_clear_bit,
+    CBuffer, COutputStream, allocate_buffer, get_bit as c_get_bit, set_bit as 
c_set_bit, clear_bit as c_clear_bit,
     set_bit_to as c_set_bit_to, CError, CErrorCode, CResultVoidError, 
utf16_has_surrogate_pairs
 )
-from pyfory.includes.libpyfory cimport Fory_PyCreateBufferFromStream
+from pyfory.includes.libpyfory cimport (
+    Fory_PyCreateBufferFromStream,
+    Fory_PyCreateOutputStream,
+    Fory_PyBindBufferToOutputStream,
+    Fory_PyClearBufferOutputStream
+)
 import os
 from pyfory.error import raise_fory_error
 
@@ -39,6 +44,76 @@ cdef class _SharedBufferOwner:
     cdef shared_ptr[CBuffer] buffer
 
 
+cdef class Buffer
+
+
[email protected]
+cdef class PyOutputStream:
+    cdef object stream
+    cdef unique_ptr[COutputStream] c_output_stream
+
+    @staticmethod
+    cdef inline PyOutputStream from_stream(object stream):
+        cdef c_string stream_error
+        cdef COutputStream* raw_writer = NULL
+        if stream is None:
+            raise ValueError("stream must not be None")
+        if Fory_PyCreateOutputStream(
+            <PyObject*>stream, &raw_writer, &stream_error
+        ) != 0:
+            raise ValueError(stream_error.decode("UTF-8"))
+        cdef PyOutputStream writer = PyOutputStream.__new__(PyOutputStream)
+        writer.stream = stream
+        writer.c_output_stream.reset(raw_writer)
+        if raw_writer != NULL:
+            raw_writer.reset()
+        return writer
+
+    cdef inline COutputStream* get_c_output_stream(self):
+        return self.c_output_stream.get()
+
+    cpdef inline object get_output_stream(self):
+        return self.stream
+
+    cpdef inline void reset(self):
+        cdef COutputStream* output_stream = self.c_output_stream.get()
+        if output_stream == NULL:
+            raise ValueError("OutputStream is null")
+        output_stream.reset()
+
+    cpdef inline void enter_flush_barrier(self):
+        cdef COutputStream* output_stream = self.c_output_stream.get()
+        if output_stream == NULL:
+            raise ValueError("OutputStream is null")
+        output_stream.enter_flush_barrier()
+
+    cpdef inline void exit_flush_barrier(self):
+        cdef COutputStream* output_stream = self.c_output_stream.get()
+        if output_stream == NULL:
+            raise ValueError("OutputStream is null")
+        output_stream.exit_flush_barrier()
+
+    cpdef inline void try_flush(self):
+        cdef COutputStream* output_stream = self.c_output_stream.get()
+        if output_stream == NULL:
+            raise ValueError("OutputStream is null")
+        output_stream.try_flush()
+        if output_stream.has_error():
+            raise ValueError(output_stream.error().to_string().decode("UTF-8"))
+
+    cpdef inline void force_flush(self):
+        cdef COutputStream* output_stream = self.c_output_stream.get()
+        if output_stream == NULL:
+            raise ValueError("OutputStream is null")
+        output_stream.force_flush()
+        if output_stream.has_error():
+            raise ValueError(output_stream.error().to_string().decode("UTF-8"))
+
+
+cpdef inline PyOutputStream _wrap_output_stream(object stream):
+    return PyOutputStream.from_stream(stream)
+
+
 @cython.final
 cdef class Buffer:
     cdef:
@@ -46,6 +121,7 @@ cdef class Buffer:
         CError _error
         # hold python buffer reference count
         object data
+        object output_stream
         Py_ssize_t shape[1]
         Py_ssize_t stride[1]
 
@@ -67,6 +143,7 @@ cdef class Buffer:
         self.c_buffer = CBuffer(address, length_, False)
         self.c_buffer.reader_index(0)
         self.c_buffer.writer_index(0)
+        self.output_stream = None
 
     @classmethod
     def from_stream(cls, stream not None, uint32_t buffer_size=4096):
@@ -82,6 +159,7 @@ cdef class Buffer:
         buffer.c_buffer = move(deref(stream_buffer))
         del stream_buffer
         buffer.data = stream
+        buffer.output_stream = None
         buffer.c_buffer.reader_index(0)
         buffer.c_buffer.writer_index(0)
         return buffer
@@ -94,6 +172,7 @@ cdef class Buffer:
         cdef _SharedBufferOwner owner = 
_SharedBufferOwner.__new__(_SharedBufferOwner)
         owner.buffer = c_buffer
         buffer.data = owner
+        buffer.output_stream = None
         buffer.c_buffer.reader_index(0)
         buffer.c_buffer.writer_index(0)
         return buffer
@@ -107,10 +186,37 @@ cdef class Buffer:
         buffer.c_buffer = move(deref(buf))
         del buf
         buffer.data = None
+        buffer.output_stream = None
         buffer.c_buffer.reader_index(0)
         buffer.c_buffer.writer_index(0)
         return buffer
 
+    @staticmethod
+    def wrap_output_stream(stream):
+        return _wrap_output_stream(stream)
+
+    cpdef inline void bind_output_stream(self, object output):
+        cdef c_string stream_error
+        cdef PyOutputStream output_stream
+        if Fory_PyClearBufferOutputStream(&self.c_buffer, &stream_error) != 0:
+            raise ValueError(stream_error.decode("UTF-8"))
+        if output is None:
+            self.output_stream = None
+            return
+        if isinstance(output, PyOutputStream):
+            output_stream = <PyOutputStream>output
+        else:
+            output_stream = _wrap_output_stream(output)
+        output_stream.reset()
+        if Fory_PyBindBufferToOutputStream(
+            &self.c_buffer, output_stream.get_c_output_stream(), &stream_error
+        ) != 0:
+            raise ValueError(stream_error.decode("UTF-8"))
+        self.output_stream = output_stream
+
+    cpdef inline object get_output_stream(self):
+        return self.output_stream
+
     cdef inline void _raise_if_error(self):
         cdef CErrorCode code
         cdef c_string message
diff --git a/python/pyfory/collection.pxi b/python/pyfory/collection.pxi
index 48bd3efca..150f354e6 100644
--- a/python/pyfory/collection.pxi
+++ b/python/pyfory/collection.pxi
@@ -800,6 +800,7 @@ cdef class MapSerializer(Serializer):
                 break
             key_cls = type(key)
             value_cls = type(value)
+            fory.enter_flush_barrier()
             buffer.write_int16(-1)
             chunk_size_offset = buffer.get_writer_index() - 1
             chunk_header = 0
@@ -888,6 +889,8 @@ cdef class MapSerializer(Serializer):
             key_serializer = self.key_serializer
             value_serializer = self.value_serializer
             buffer.put_int8(chunk_size_offset, chunk_size)
+            fory.exit_flush_barrier()
+            fory.try_flush()
 
     cpdef inline read(self, Buffer buffer):
         cdef Fory fory = self.fory
diff --git a/python/pyfory/collection.py b/python/pyfory/collection.py
index c7d6b9c37..5bb88cbaa 100644
--- a/python/pyfory/collection.py
+++ b/python/pyfory/collection.py
@@ -420,6 +420,7 @@ class MapSerializer(Serializer):
 
             key_cls = type(key)
             value_cls = type(value)
+            fory.enter_flush_barrier()
             buffer.write_int16(-1)
             chunk_size_offset = buffer.get_writer_index() - 1
             chunk_header = 0
@@ -472,6 +473,8 @@ class MapSerializer(Serializer):
             key_serializer = self.key_serializer
             value_serializer = self.value_serializer
             buffer.put_uint8(chunk_size_offset, chunk_size)
+            fory.exit_flush_barrier()
+            fory.try_flush()
 
     def read(self, buffer):
         fory = self.fory
diff --git a/python/pyfory/cpp/pyfory.cc b/python/pyfory/cpp/pyfory.cc
index 62251b7f6..13c28103b 100644
--- a/python/pyfory/cpp/pyfory.cc
+++ b/python/pyfory/cpp/pyfory.cc
@@ -135,10 +135,145 @@ static bool resolve_python_stream_read_method(PyObject 
*stream,
   return false;
 }
 
-class PythonStreamReader final : public StreamReader {
+static bool resolve_python_stream_write_method(PyObject *stream,
+                                               std::string *error_message) {
+  const int has_write = PyObject_HasAttrString(stream, "write");
+  if (has_write < 0) {
+    *error_message = fetch_python_error_message();
+    return false;
+  }
+  if (has_write == 0) {
+    *error_message = "stream object must provide write(data) method";
+    return false;
+  }
+  PyObject *method_obj = PyObject_GetAttrString(stream, "write");
+  if (method_obj == nullptr) {
+    *error_message = fetch_python_error_message();
+    return false;
+  }
+  const bool is_callable = PyCallable_Check(method_obj) != 0;
+  Py_DECREF(method_obj);
+  if (!is_callable) {
+    *error_message = "stream.write must be callable";
+    return false;
+  }
+  return true;
+}
+
+class PyOutputStream final : public OutputStream {
 public:
-  explicit PythonStreamReader(PyObject *stream, uint32_t buffer_size,
-                              PythonStreamReadMethod read_method)
+  explicit PyOutputStream(PyObject *stream, uint32_t buffer_size = 4096)
+      : OutputStream(buffer_size), stream_(stream) {
+    FORY_CHECK(stream_ != nullptr) << "stream must not be null";
+    Py_INCREF(stream_);
+  }
+
+  ~PyOutputStream() override {
+    if (stream_ != nullptr) {
+      PyGILState_STATE gil_state = PyGILState_Ensure();
+      Py_DECREF(stream_);
+      PyGILState_Release(gil_state);
+      stream_ = nullptr;
+    }
+  }
+
+protected:
+  Result<void, Error> write_to_stream(const uint8_t *src,
+                                      uint32_t length) override {
+    if (length == 0) {
+      return Result<void, Error>();
+    }
+    if (src == nullptr) {
+      return Unexpected(Error::invalid("output source pointer is null"));
+    }
+    if (stream_ == nullptr) {
+      return Unexpected(Error::io_error("output stream is null"));
+    }
+    PyGILState_STATE gil_state = PyGILState_Ensure();
+    uint32_t total_written = 0;
+    while (total_written < length) {
+      const uint32_t remaining = length - total_written;
+      // Contract: stream.write must consume bytes synchronously before return.
+      // The memoryview below is a transient view over serializer-managed
+      // storage and is not safe to retain after write(...) returns.
+      PyObject *chunk = PyMemoryView_FromMemory(
+          reinterpret_cast<char *>(
+              const_cast<uint8_t *>(src + static_cast<size_t>(total_written))),
+          static_cast<Py_ssize_t>(remaining), PyBUF_READ);
+      if (chunk == nullptr) {
+        const std::string message = fetch_python_error_message();
+        PyGILState_Release(gil_state);
+        return Unexpected(Error::io_error(message));
+      }
+      PyObject *written_obj = PyObject_CallMethod(stream_, "write", "O", 
chunk);
+      Py_DECREF(chunk);
+      if (written_obj == nullptr) {
+        const std::string message = fetch_python_error_message();
+        PyGILState_Release(gil_state);
+        return Unexpected(Error::io_error(message));
+      }
+      if (written_obj == Py_None) {
+        Py_DECREF(written_obj);
+        total_written = length;
+        break;
+      }
+      const long long wrote_value = PyLong_AsLongLong(written_obj);
+      Py_DECREF(written_obj);
+      if (wrote_value == -1 && PyErr_Occurred() != nullptr) {
+        const std::string message = fetch_python_error_message();
+        PyGILState_Release(gil_state);
+        return Unexpected(Error::io_error(message));
+      }
+      if (wrote_value <= 0) {
+        PyGILState_Release(gil_state);
+        return Unexpected(
+            Error::io_error("stream write returned non-positive bytes"));
+      }
+      const uint64_t wrote_u64 = static_cast<uint64_t>(wrote_value);
+      if (wrote_u64 >= remaining) {
+        total_written = length;
+      } else {
+        total_written += static_cast<uint32_t>(wrote_u64);
+      }
+    }
+    PyGILState_Release(gil_state);
+    return Result<void, Error>();
+  }
+
+  Result<void, Error> flush_stream() override {
+    if (stream_ == nullptr) {
+      return Unexpected(Error::io_error("output stream is null"));
+    }
+    PyGILState_STATE gil_state = PyGILState_Ensure();
+    const int has_flush = PyObject_HasAttrString(stream_, "flush");
+    if (has_flush < 0) {
+      const std::string message = fetch_python_error_message();
+      PyGILState_Release(gil_state);
+      return Unexpected(Error::io_error(message));
+    }
+    if (has_flush == 0) {
+      PyGILState_Release(gil_state);
+      return Result<void, Error>();
+    }
+    PyObject *result = PyObject_CallMethod(stream_, "flush", nullptr);
+    if (result == nullptr) {
+      const std::string message = fetch_python_error_message();
+      PyGILState_Release(gil_state);
+      return Unexpected(Error::io_error(message));
+    }
+    Py_DECREF(result);
+    PyGILState_Release(gil_state);
+    return Result<void, Error>();
+  }
+
+private:
+  PyObject *stream_ = nullptr;
+};
+
+class PyInputStream final : public InputStream {
+public:
+  explicit PyInputStream(PyObject *stream, uint32_t buffer_size,
+                         PythonStreamReadMethod read_method)
       : stream_(stream), read_method_(read_method),
         read_method_name_(python_stream_read_method_name(read_method)),
         data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
@@ -150,7 +285,7 @@ public:
     bind_buffer(owned_buffer_.get());
   }
 
-  ~PythonStreamReader() override {
+  ~PyInputStream() override {
     if (stream_ != nullptr) {
       PyGILState_STATE gil_state = PyGILState_Ensure();
       Py_DECREF(stream_);
@@ -279,7 +414,7 @@ public:
     Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
     if (target == nullptr) {
       if (buffer_ != nullptr) {
-        buffer_->stream_reader_ = nullptr;
+        buffer_->input_stream_ = nullptr;
       }
       buffer_ = nullptr;
       return;
@@ -289,7 +424,7 @@ public:
       buffer_->data_ = data_.data();
       buffer_->own_data_ = false;
       buffer_->wrapped_vector_ = nullptr;
-      buffer_->stream_reader_ = this;
+      buffer_->input_stream_ = this;
       return;
     }
 
@@ -298,7 +433,7 @@ public:
       target->size_ = source->size_;
       target->writer_index_ = source->writer_index_;
       target->reader_index_ = source->reader_index_;
-      source->stream_reader_ = nullptr;
+      source->input_stream_ = nullptr;
     } else {
       target->size_ = 0;
       target->writer_index_ = 0;
@@ -309,7 +444,7 @@ public:
     buffer_->data_ = data_.data();
     buffer_->own_data_ = false;
     buffer_->wrapped_vector_ = nullptr;
-    buffer_->stream_reader_ = this;
+    buffer_->input_stream_ = this;
   }
 
 private:
@@ -1409,13 +1544,56 @@ int Fory_PyCreateBufferFromStream(PyObject *stream, 
uint32_t buffer_size,
     return -1;
   }
   try {
-    auto stream_reader =
-        std::make_shared<PythonStreamReader>(stream, buffer_size, read_method);
-    *out = new Buffer(*stream_reader);
+    auto input_stream =
+        std::make_shared<PyInputStream>(stream, buffer_size, read_method);
+    *out = new Buffer(*input_stream);
     return 0;
   } catch (const std::exception &e) {
     *error_message = e.what();
     return -1;
   }
 }
+
+int Fory_PyCreateOutputStream(PyObject *stream, OutputStream **out,
+                              std::string *error_message) {
+  if (stream == nullptr) {
+    *error_message = "stream must not be null";
+    return -1;
+  }
+  // See PyOutputStream::write_to_stream contract: the provided sink must not
+  // retain passed write buffers after write(...) returns.
+  if (!resolve_python_stream_write_method(stream, error_message)) {
+    return -1;
+  }
+  try {
+    *out = new PyOutputStream(stream, 4096);
+    return 0;
+  } catch (const std::exception &e) {
+    *error_message = e.what();
+    return -1;
+  }
+}
+
+int Fory_PyBindBufferToOutputStream(Buffer *buffer, OutputStream 
*output_stream,
+                                    std::string *error_message) {
+  if (buffer == nullptr) {
+    *error_message = "buffer must not be null";
+    return -1;
+  }
+  if (output_stream == nullptr) {
+    *error_message = "output stream must not be null";
+    return -1;
+  }
+  buffer->bind_output_stream(output_stream);
+  return 0;
+}
+
+int Fory_PyClearBufferOutputStream(Buffer *buffer, std::string *error_message) 
{
+  if (buffer == nullptr) {
+    *error_message = "buffer must not be null";
+    return -1;
+  }
+  buffer->clear_output_stream();
+  return 0;
+}
 } // namespace fory
diff --git a/python/pyfory/cpp/pyfory.h b/python/pyfory/cpp/pyfory.h
index 1003733cc..3801929c9 100644
--- a/python/pyfory/cpp/pyfory.h
+++ b/python/pyfory/cpp/pyfory.h
@@ -24,6 +24,7 @@
 #include "Python.h"
 #include "fory/type/type.h"
 #include "fory/util/buffer.h"
+#include "fory/util/stream.h"
 
 namespace fory {
 inline constexpr bool Fory_IsInternalTypeId(uint8_t type_id) {
@@ -70,4 +71,9 @@ int Fory_PyWriteBasicFieldToBuffer(PyObject *value, Buffer 
*buffer,
 PyObject *Fory_PyReadBasicFieldFromBuffer(Buffer *buffer, uint8_t type_id);
 int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
                                   Buffer **out, std::string *error_message);
+int Fory_PyCreateOutputStream(PyObject *stream, OutputStream **out,
+                              std::string *error_message);
+int Fory_PyBindBufferToOutputStream(Buffer *buffer, OutputStream 
*output_stream,
+                                    std::string *error_message);
+int Fory_PyClearBufferOutputStream(Buffer *buffer, std::string *error_message);
 } // namespace fory
diff --git a/python/pyfory/includes/libpyfory.pxd 
b/python/pyfory/includes/libpyfory.pxd
index c0e888c18..6e1d0542f 100644
--- a/python/pyfory/includes/libpyfory.pxd
+++ b/python/pyfory/includes/libpyfory.pxd
@@ -19,8 +19,13 @@ from cpython.object cimport PyObject
 from libc.stdint cimport uint32_t
 from libcpp.string cimport string as c_string
 
-from pyfory.includes.libutil cimport CBuffer
+from pyfory.includes.libutil cimport CBuffer, COutputStream
 
 cdef extern from "fory/python/pyfory.h" namespace "fory":
     int Fory_PyCreateBufferFromStream(PyObject* stream, uint32_t buffer_size,
                                       CBuffer** out, c_string* error_message)
+    int Fory_PyCreateOutputStream(PyObject* stream, COutputStream** out,
+                                  c_string* error_message)
+    int Fory_PyBindBufferToOutputStream(CBuffer* buffer, COutputStream* 
output_stream,
+                                        c_string* error_message)
+    int Fory_PyClearBufferOutputStream(CBuffer* buffer, c_string* 
error_message)
diff --git a/python/pyfory/includes/libutil.pxd 
b/python/pyfory/includes/libutil.pxd
index aeaa60a7d..b0d8914e3 100644
--- a/python/pyfory/includes/libutil.pxd
+++ b/python/pyfory/includes/libutil.pxd
@@ -16,6 +16,7 @@
 # under the License.
 
 from libc.stdint cimport *
+from libc.stddef cimport size_t
 from libcpp cimport bool as c_bool
 from libcpp.memory cimport shared_ptr
 from libcpp.string cimport string as c_string
@@ -212,6 +213,18 @@ cdef extern from "fory/util/buffer.h" namespace "fory" 
nogil:
     CBuffer* allocate_buffer(uint32_t size)
     c_bool allocate_buffer(uint32_t size, shared_ptr[CBuffer]* out)
 
+cdef extern from "fory/util/stream.h" namespace "fory" nogil:
+    cdef cppclass COutputStream "fory::OutputStream":
+        CBuffer* get_buffer()
+        void enter_flush_barrier()
+        void exit_flush_barrier()
+        c_bool try_flush()
+        void force_flush()
+        size_t flushed_bytes() const
+        void reset()
+        c_bool has_error() const
+        const CError& error() const
+
 
 cdef extern from "fory/util/bit_util.h" namespace "fory::util" nogil:
     c_bool get_bit(const uint8_t *bits, uint32_t i)
diff --git a/python/pyfory/serialization.pyx b/python/pyfory/serialization.pyx
index e28aa06da..4cfe64894 100644
--- a/python/pyfory/serialization.pyx
+++ b/python/pyfory/serialization.pyx
@@ -1078,6 +1078,7 @@ cdef class Fory:
     cdef public bint is_peer_out_of_band_enabled
     cdef int32_t max_depth
     cdef int32_t depth
+    cdef object _output_stream
 
     def __init__(
             self,
@@ -1156,6 +1157,7 @@ cdef class Fory:
         self.is_peer_out_of_band_enabled = False
         self.depth = 0
         self.max_depth = max_depth
+        self._output_stream = None
 
     def register_serializer(self, cls: Union[type, TypeVar], Serializer 
serializer):
         """
@@ -1289,6 +1291,37 @@ cdef class Fory:
         """
         return self.serialize(obj, buffer, buffer_callback, 
unsupported_callback)
 
+    def dump(self, obj, stream):
+        """
+        Serialize an object directly to a writable stream.
+
+        Args:
+            obj: The object to serialize
+            stream: Writable stream implementing write(...)
+
+        Notes:
+            The stream must be a non-retaining sink: ``write(data)`` must
+            synchronously consume ``data`` before returning. Fory may reuse or
+            modify the underlying buffer after ``write`` returns, so retaining
+            the passed object (or a view of it) is unsupported. If your sink
+            needs retention, copy bytes inside ``write``.
+        """
+        try:
+            self.buffer.set_writer_index(0)
+            self._output_stream = Buffer.wrap_output_stream(stream)
+            self.buffer.bind_output_stream(self._output_stream)
+            self._serialize(
+                obj,
+                self.buffer,
+                buffer_callback=None,
+                unsupported_callback=None,
+            )
+            self.force_flush()
+        finally:
+            self.buffer.bind_output_stream(None)
+            self._output_stream = None
+            self.reset_write()
+
     def loads(
         self,
         buffer: Union[Buffer, bytes],
@@ -1328,12 +1361,18 @@ cdef class Fory:
             >>> print(type(data))
             <class 'bytes'>
         """
+        cdef Buffer write_buffer
         try:
-            return self._serialize(
+            write_buffer = self._serialize(
                 obj,
                 buffer,
                 buffer_callback=buffer_callback,
                 unsupported_callback=unsupported_callback)
+            if write_buffer is not self.buffer:
+                return write_buffer
+            if write_buffer.get_output_stream() is not None:
+                return write_buffer
+            return write_buffer.to_bytes(0, write_buffer.get_writer_index())
         finally:
             self.reset_write()
 
@@ -1350,6 +1389,7 @@ cdef class Fory:
         # 1byte used for bit mask
         buffer.grow(1)
         buffer.set_writer_index(mask_index + 1)
+        buffer.put_int8(mask_index, 0)
         if obj is None:
             set_bit(buffer, mask_index, 0)
         else:
@@ -1362,11 +1402,35 @@ cdef class Fory:
         else:
             clear_bit(buffer, mask_index, 2)
         self.write_ref(buffer, obj)
+        return buffer
 
-        if buffer is not self.buffer:
-            return buffer
-        else:
-            return buffer.to_bytes(0, buffer.get_writer_index())
+    cpdef inline enter_flush_barrier(self):
+        cdef PyOutputStream output_stream
+        if self._output_stream is None:
+            return
+        output_stream = <PyOutputStream>self._output_stream
+        output_stream.enter_flush_barrier()
+
+    cpdef inline exit_flush_barrier(self):
+        cdef PyOutputStream output_stream
+        if self._output_stream is None:
+            return
+        output_stream = <PyOutputStream>self._output_stream
+        output_stream.exit_flush_barrier()
+
+    cpdef inline try_flush(self):
+        cdef PyOutputStream output_stream
+        if self._output_stream is None or self.buffer.get_writer_index() <= 
4096:
+            return
+        output_stream = <PyOutputStream>self._output_stream
+        output_stream.try_flush()
+
+    cpdef inline force_flush(self):
+        cdef PyOutputStream output_stream
+        if self._output_stream is None:
+            return
+        output_stream = <PyOutputStream>self._output_stream
+        output_stream.force_flush()
 
     cpdef inline write_ref(
             self, Buffer buffer, obj, TypeInfo typeinfo=None, Serializer 
serializer=None):
@@ -1623,6 +1687,7 @@ cdef class Fory:
         self.metastring_resolver.reset_write()
         self.serialization_context.reset_write()
         self._unsupported_callback = None
+        self._output_stream = None
 
     cpdef inline reset_read(self):
         """
diff --git a/python/pyfory/struct.pxi b/python/pyfory/struct.pxi
index 2e024b3fa..659f28121 100644
--- a/python/pyfory/struct.pxi
+++ b/python/pyfory/struct.pxi
@@ -287,6 +287,7 @@ cdef class DataClassSerializer(Serializer):
             self._write_slots(buffer, value)
         else:
             self._write_dict(buffer, value)
+        self.fory.try_flush()
 
     cdef inline void _write_dict(self, Buffer buffer, object value):
         cdef dict value_dict = value.__dict__
diff --git a/python/pyfory/struct.py b/python/pyfory/struct.py
index 7923dd673..cb61db342 100644
--- a/python/pyfory/struct.py
+++ b/python/pyfory/struct.py
@@ -536,6 +536,7 @@ class DataClassSerializer(Serializer):
                     is_tracking_ref = self._ref_fields.get(field_name, False)
                     is_basic = self._basic_field_flags[index]
                     self._write_field_value(buffer, serializer, field_value, 
is_nullable, is_dynamic, is_basic, is_tracking_ref)
+        self.fory.try_flush()
 
     def read(self, buffer):
         if not self.fory.strict:
diff --git a/python/pyfory/tests/test_buffer.py 
b/python/pyfory/tests/test_buffer.py
index 6533ed94d..021412e57 100644
--- a/python/pyfory/tests/test_buffer.py
+++ b/python/pyfory/tests/test_buffer.py
@@ -65,6 +65,22 @@ class LegacyRecvIntoOnlyStream:
         return read_size
 
 
+class PartialWriteStream:
+    def __init__(self):
+        self._data = bytearray()
+
+    def write(self, payload):
+        if not payload:
+            return 0
+        view = memoryview(payload).cast("B")
+        wrote = min(2, len(view))
+        self._data.extend(view[:wrote])
+        return wrote
+
+    def to_bytes(self):
+        return bytes(self._data)
+
+
 def test_buffer():
     buffer = Buffer.allocate(8)
     buffer.write_bool(True)
@@ -253,15 +269,68 @@ def test_write_var_uint64():
 
 
 def check_varuint64(buf: Buffer, value: int, bytes_written: int):
-    reader_index = buf.get_reader_index()
     assert buf.get_writer_index() == buf.get_reader_index()
     actual_bytes_written = buf.write_var_uint64(value)
     assert actual_bytes_written == bytes_written
     varint = buf.read_var_uint64()
     assert buf.get_writer_index() == buf.get_reader_index()
     assert value == varint
-    # test slow read branch in `read_varint64`
-    assert buf.slice(reader_index, buf.get_reader_index() - 
reader_index).read_var_uint64() == value
+
+
+def test_buffer_flush_stream():
+    stream = PartialWriteStream()
+    buffer = Buffer.allocate(16)
+    output_stream = Buffer.wrap_output_stream(stream)
+    buffer.bind_output_stream(output_stream)
+    payload = b"stream-flush-buffer"
+    buffer.write_bytes(payload)
+    output_stream.force_flush()
+    assert stream.to_bytes() == payload
+    assert buffer.get_writer_index() == 0
+
+
+def test_wrap_output_stream_invalid_target_raises():
+    with pytest.raises(ValueError):
+        Buffer.wrap_output_stream(object())
+
+
+def test_output_stream_try_flush_preserves_bound_buffer_when_barrier_active():
+    stream = PartialWriteStream()
+    output_stream = Buffer.wrap_output_stream(stream)
+    buffer = Buffer.allocate(32)
+    buffer.bind_output_stream(output_stream)
+    payload = b"x" * 5000
+
+    output_stream.enter_flush_barrier()
+    buffer.write_bytes(payload)
+    output_stream.try_flush()
+    output_stream.try_flush()
+    assert buffer.get_writer_index() == len(payload)
+    assert stream.to_bytes() == b""
+
+    output_stream.exit_flush_barrier()
+    output_stream.try_flush()
+    assert buffer.get_writer_index() == 0
+
+    output_stream.force_flush()
+    assert stream.to_bytes() == payload
+
+
+def test_output_stream_try_flush_small_payload_needs_force_flush():
+    stream = PartialWriteStream()
+    output_stream = Buffer.wrap_output_stream(stream)
+    buffer = Buffer.allocate(32)
+    buffer.bind_output_stream(output_stream)
+    payload = b"small-payload"
+    buffer.write_bytes(payload)
+
+    output_stream.try_flush()
+    assert buffer.get_writer_index() == len(payload)
+    assert stream.to_bytes() == b""
+
+    output_stream.force_flush()
+    assert buffer.get_writer_index() == 0
+    assert stream.to_bytes() == payload
 
 
 def test_write_buffer():
diff --git a/python/pyfory/tests/test_stream.py 
b/python/pyfory/tests/test_stream.py
index c567420e4..3276c4054 100644
--- a/python/pyfory/tests/test_stream.py
+++ b/python/pyfory/tests/test_stream.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from dataclasses import dataclass
+
 import pytest
 
 import pyfory
@@ -68,6 +70,47 @@ class OneByteStream:
         return self.recv_into(buffer, size)
 
 
+class OneByteWriteStream:
+    def __init__(self):
+        self._data = bytearray()
+
+    def write(self, payload):
+        if not payload:
+            return 0
+        view = memoryview(payload).cast("B")
+        self._data.append(view[0])
+        return 1
+
+    def to_bytes(self):
+        return bytes(self._data)
+
+
+class CountingWriteStream:
+    def __init__(self):
+        self._data = bytearray()
+        self.write_calls = 0
+        self.flush_calls = 0
+
+    def write(self, payload):
+        view = memoryview(payload).cast("B")
+        self.write_calls += 1
+        self._data.extend(view)
+        return len(view)
+
+    def flush(self):
+        self.flush_calls += 1
+
+    def to_bytes(self):
+        return bytes(self._data)
+
+
+@dataclass
+class StreamStructValue:
+    idx: int
+    name: str
+    values: list
+
+
 @pytest.mark.parametrize("xlang", [False, True])
 def test_stream_roundtrip_primitives_and_strings(xlang):
     fory = pyfory.Fory(xlang=xlang, ref=True)
@@ -148,3 +191,62 @@ def test_stream_deserialize_truncated_error(xlang):
 
     with pytest.raises(Exception):
         fory.deserialize(Buffer.from_stream(OneByteStream(truncated)))
+
+
[email protected]("xlang", [False, True])
+def test_dump_matches_dumps_bytes(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    value = {
+        "k": [1, 2, 3, 4],
+        "nested": {"x": True, "y": "hello"},
+        "f": 3.14,
+    }
+
+    sink = OneByteWriteStream()
+    fory.dump(value, sink)
+    expected = fory.dumps(value)
+    assert sink.to_bytes() == expected
+
+
[email protected]("xlang", [False, True])
+def test_dump_map_chunk_path_matches_dumps(xlang):
+    fory = pyfory.Fory(xlang=xlang, ref=True)
+    value = {f"k{i}": i for i in range(300)}
+
+    sink = OneByteWriteStream()
+    fory.dump(value, sink)
+    expected = fory.dumps(value)
+    assert sink.to_bytes() == expected
+
+    restored = 
fory.deserialize(Buffer.from_stream(OneByteStream(sink.to_bytes())))
+    assert restored == value
+
+
+def test_dump_large_list_of_structs_multiple_flushes_matches_dumps():
+    fory = pyfory.Fory(xlang=False, ref=True, strict=False)
+    fory.register(StreamStructValue)
+    value = [StreamStructValue(i, f"item-{i}-{'x' * 56}", [i, i + 1, i + 2, i 
+ 3, i + 4]) for i in range(1800)]
+
+    sink = CountingWriteStream()
+    fory.dump(value, sink)
+    expected = fory.dumps(value)
+    assert sink.to_bytes() == expected
+    assert len(expected) > 4096 * 4
+    assert sink.write_calls >= 4
+
+    restored = 
fory.deserialize(Buffer.from_stream(OneByteStream(sink.to_bytes())))
+    assert restored == value
+
+
+def test_dump_large_map_with_struct_values_matches_dumps():
+    fory = pyfory.Fory(xlang=False, ref=True, strict=False)
+    fory.register(StreamStructValue)
+    value = {f"k{i}": StreamStructValue(i, "y" * 96, [i, i + 1, i + 2, i + 3]) 
for i in range(900)}
+
+    sink = OneByteWriteStream()
+    fory.dump(value, sink)
+    expected = fory.dumps(value)
+    assert sink.to_bytes() == expected
+
+    restored = 
fory.deserialize(Buffer.from_stream(OneByteStream(sink.to_bytes())))
+    assert restored == value


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to