This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new 812ade8bb feat(c++/python): support stream deserialization for c++ and
python (#3307)
812ade8bb is described below
commit 812ade8bbbcb50fb26e9a795a7f2c21b2b7732bb
Author: Shawn Yang <[email protected]>
AuthorDate: Fri Feb 27 00:30:04 2026 +0800
feat(c++/python): support stream deserialization for c++ and python (#3307)
## Why?
C++ and Python deserialization currently assumes data is already
materialized in memory-backed buffers. This PR adds stream-backed
deserialization support so payloads can be read incrementally from input
streams while preserving existing serialization behavior and error
handling.
## What does this PR do?
- Adds stream infrastructure in C++
(`ForyInputStreamBuf`/`ForyInputStream`) and integrates it with `Buffer`
so reads can request more bytes on demand.
- Adds Python-to-C++ stream bridge (`Fory_PyCreateBufferFromStream`) so
`pyfory.buffer.Buffer` can be constructed from Python objects that
implement `read(size)`.
- Updates deserialization paths to be stream-safe by using `ensure_size`
checks for header/string/fixed-field reads and falling back from batched
varint reads on stream-backed buffers.
- Extends build rules to compile/link new stream components and adds
stream-focused C++ tests (`stream_test.cc`, buffer stream tests).
- Adds Python stream tests (`python/pyfory/tests/test_stream.py`) and
buffer stream coverage in `python/pyfory/tests/test_buffer.py`.
## Related issues
N/A
## Does this PR introduce any user-facing change?
- [x] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
N/A
---
AGENTS.md | 1 +
BUILD | 1 +
cpp/fory/python/pyfory.cc | 359 ++++++++++++++++++++
cpp/fory/python/pyfory.h | 6 +-
cpp/fory/serialization/BUILD | 10 +
cpp/fory/serialization/CMakeLists.txt | 4 +
cpp/fory/serialization/collection_serializer.h | 5 +-
cpp/fory/serialization/fory.h | 54 ++-
cpp/fory/serialization/skip.cc | 37 +-
cpp/fory/serialization/stream_test.cc | 240 +++++++++++++
cpp/fory/serialization/string_serializer.h | 43 +--
cpp/fory/serialization/struct_serializer.h | 33 +-
cpp/fory/util/CMakeLists.txt | 2 +
cpp/fory/util/buffer.cc | 11 +
cpp/fory/util/buffer.h | 450 +++++++++++++++++--------
cpp/fory/util/buffer_test.cc | 143 ++++++++
cpp/fory/util/stream.cc | 218 ++++++++++++
cpp/fory/util/stream.h | 91 +++++
python/pyfory/buffer.pxd | 2 -
python/pyfory/buffer.pyx | 65 +++-
python/pyfory/includes/libpyfory.pxd | 26 ++
python/pyfory/includes/libutil.pxd | 6 +-
python/pyfory/tests/test_buffer.py | 104 ++++++
python/pyfory/tests/test_stream.py | 150 +++++++++
24 files changed, 1838 insertions(+), 223 deletions(-)
diff --git a/AGENTS.md b/AGENTS.md
index 398237fd0..bcc882131 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -6,6 +6,7 @@ This file provides comprehensive guidance to AI coding agents
when working with
While working on Fory, please remember:
+- **Do not reserve any legacy code/docs unless requested clearly**.
- **Performance First**: Performance is the top priority. Never introduce code
that reduces performance without explicit justification.
- **English Only**: Always use English in code, comments, and documentation.
- **Meaningful Comments**: Only add comments when the code's behavior is
difficult to understand or when documenting complex algorithms.
diff --git a/BUILD b/BUILD
index 809727199..51bdffc8d 100644
--- a/BUILD
+++ b/BUILD
@@ -31,6 +31,7 @@ pyx_library(
linkstatic = 1,
),
deps = [
+ "//cpp/fory/python:_pyfory",
"//cpp/fory/util:fory_util",
],
)
diff --git a/cpp/fory/python/pyfory.cc b/cpp/fory/python/pyfory.cc
index 04117a7b5..fb6abd184 100644
--- a/cpp/fory/python/pyfory.cc
+++ b/cpp/fory/python/pyfory.cc
@@ -19,6 +19,15 @@
#include "fory/python/pyfory.h"
+#include <algorithm>
+#include <cstring>
+#include <exception>
+#include <limits>
+#include <memory>
+#include <vector>
+
+#include "fory/util/stream.h"
+
static PyObject **py_sequence_get_items(PyObject *collection) {
if (PyList_CheckExact(collection)) {
return ((PyListObject *)collection)->ob_item;
@@ -29,6 +38,335 @@ static PyObject **py_sequence_get_items(PyObject
*collection) {
}
namespace fory {
+
+static std::string fetch_python_error_message() {
+ PyObject *type = nullptr;
+ PyObject *value = nullptr;
+ PyObject *traceback = nullptr;
+ PyErr_Fetch(&type, &value, &traceback);
+ PyErr_NormalizeException(&type, &value, &traceback);
+ std::string message = "python stream read failed";
+ if (value != nullptr) {
+ PyObject *value_str = PyObject_Str(value);
+ if (value_str != nullptr) {
+ const char *c_str = PyUnicode_AsUTF8(value_str);
+ if (c_str != nullptr) {
+ message = c_str;
+ }
+ Py_DECREF(value_str);
+ } else {
+ PyErr_Clear();
+ }
+ }
+ Py_XDECREF(type);
+ Py_XDECREF(value);
+ Py_XDECREF(traceback);
+ return message;
+}
+
+enum class PythonStreamReadMethod {
+ ReadInto,
+ RecvInto,
+ RecvIntoUnderscore,
+};
+
+static const char *
+python_stream_read_method_name(PythonStreamReadMethod method) {
+ switch (method) {
+ case PythonStreamReadMethod::ReadInto:
+ return "readinto";
+ case PythonStreamReadMethod::RecvInto:
+ return "recvinto";
+ case PythonStreamReadMethod::RecvIntoUnderscore:
+ return "recv_into";
+ }
+ return "readinto";
+}
+
+static bool resolve_python_stream_read_method(PyObject *stream,
+ PythonStreamReadMethod *method,
+ std::string *error_message) {
+ struct MethodCandidate {
+ const char *name;
+ PythonStreamReadMethod method;
+ };
+ constexpr MethodCandidate k_candidates[] = {
+ {"readinto", PythonStreamReadMethod::ReadInto},
+ {"recv_into", PythonStreamReadMethod::RecvIntoUnderscore},
+ {"recvinto", PythonStreamReadMethod::RecvInto},
+ };
+ for (const auto &candidate : k_candidates) {
+ const int has_method = PyObject_HasAttrString(stream, candidate.name);
+ if (has_method < 0) {
+ *error_message = fetch_python_error_message();
+ return false;
+ }
+ if (has_method == 0) {
+ continue;
+ }
+ PyObject *method_obj = PyObject_GetAttrString(stream, candidate.name);
+ if (method_obj == nullptr) {
+ *error_message = fetch_python_error_message();
+ return false;
+ }
+ const bool is_callable = PyCallable_Check(method_obj) != 0;
+ Py_DECREF(method_obj);
+ if (is_callable) {
+ *method = candidate.method;
+ return true;
+ }
+ }
+ *error_message = "stream object must provide readinto(buffer), "
+ "recv_into(buffer, size) or recvinto(buffer, size) method";
+ return false;
+}
+
+class PythonStreamReader final : public StreamReader {
+public:
+ explicit PythonStreamReader(PyObject *stream, uint32_t buffer_size,
+ PythonStreamReadMethod read_method)
+ : stream_(stream), read_method_(read_method),
+ read_method_name_(python_stream_read_method_name(read_method)),
+ data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+ initial_buffer_size_(
+ std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+ owned_buffer_(std::make_unique<Buffer>()) {
+ FORY_CHECK(stream_ != nullptr) << "stream must not be null";
+ Py_INCREF(stream_);
+ bind_buffer(owned_buffer_.get());
+ }
+
+ ~PythonStreamReader() override {
+ if (stream_ != nullptr) {
+ PyGILState_STATE gil_state = PyGILState_Ensure();
+ Py_DECREF(stream_);
+ PyGILState_Release(gil_state);
+ stream_ = nullptr;
+ }
+ }
+
+ Result<void, Error> fill_buffer(uint32_t min_fill_size) override {
+ if (min_fill_size == 0 || remaining_size() >= min_fill_size) {
+ return Result<void, Error>();
+ }
+
+ const uint32_t read_pos = buffer_->reader_index_;
+ const uint32_t deficit = min_fill_size - remaining_size();
+ constexpr uint64_t k_max_u32 = std::numeric_limits<uint32_t>::max();
+ const uint64_t required = static_cast<uint64_t>(buffer_->size_) + deficit;
+ if (required > k_max_u32) {
+ return Unexpected(
+ Error::out_of_bound("stream buffer size exceeds uint32 range"));
+ }
+ if (required > data_.size()) {
+ uint64_t new_size =
+ std::max<uint64_t>(required, static_cast<uint64_t>(data_.size()) *
2);
+ if (new_size > k_max_u32) {
+ new_size = k_max_u32;
+ }
+ reserve(static_cast<uint32_t>(new_size));
+ }
+
+ uint32_t write_pos = buffer_->size_;
+ while (remaining_size() < min_fill_size) {
+ uint32_t writable = static_cast<uint32_t>(data_.size()) - write_pos;
+ auto read_result = recv_into(data_.data() + write_pos, writable);
+ if (FORY_PREDICT_FALSE(!read_result.ok())) {
+ return Unexpected(std::move(read_result).error());
+ }
+ uint32_t read_bytes = std::move(read_result).value();
+ if (read_bytes == 0) {
+ return Unexpected(Error::buffer_out_of_bound(read_pos, min_fill_size,
+ remaining_size()));
+ }
+ write_pos += read_bytes;
+ buffer_->size_ = write_pos;
+ }
+ return Result<void, Error>();
+ }
+
+ Result<void, Error> read_to(uint8_t *dst, uint32_t length) override {
+ if (length == 0) {
+ return Result<void, Error>();
+ }
+ Error error;
+ if (FORY_PREDICT_FALSE(!buffer_->ensure_readable(length, error))) {
+ return Unexpected(std::move(error));
+ }
+ std::memcpy(dst, buffer_->data_ + buffer_->reader_index_,
+ static_cast<size_t>(length));
+ buffer_->reader_index_ += length;
+ return Result<void, Error>();
+ }
+
+ Result<void, Error> skip(uint32_t size) override {
+ if (size == 0) {
+ return Result<void, Error>();
+ }
+ Error error;
+ buffer_->increase_reader_index(size, error);
+ if (FORY_PREDICT_FALSE(!error.ok())) {
+ return Unexpected(std::move(error));
+ }
+ return Result<void, Error>();
+ }
+
+ void shrink_buffer() override {
+ if (buffer_ == nullptr) {
+ return;
+ }
+
+ const uint32_t read_pos = buffer_->reader_index_;
+ const uint32_t remaining = remaining_size();
+ if (read_pos > 0) {
+ if (remaining > 0) {
+ std::memmove(data_.data(), data_.data() + read_pos,
+ static_cast<size_t>(remaining));
+ }
+ buffer_->reader_index_ = 0;
+ buffer_->size_ = remaining;
+ buffer_->writer_index_ = remaining;
+ }
+
+ const uint32_t current_capacity = static_cast<uint32_t>(data_.size());
+ uint32_t target_capacity = current_capacity;
+ if (current_capacity > initial_buffer_size_) {
+ if (remaining == 0) {
+ target_capacity = initial_buffer_size_;
+ } else if (remaining <= current_capacity / 4) {
+ const uint32_t doubled =
+ remaining > std::numeric_limits<uint32_t>::max() / 2
+ ? std::numeric_limits<uint32_t>::max()
+ : remaining * 2;
+ target_capacity = std::max<uint32_t>(
+ initial_buffer_size_,
+ std::max<uint32_t>(doubled, static_cast<uint32_t>(1)));
+ }
+ }
+ if (target_capacity < current_capacity) {
+ data_.resize(target_capacity);
+ data_.shrink_to_fit();
+ buffer_->data_ = data_.data();
+ }
+ }
+
+ Result<void, Error> unread(uint32_t size) override {
+ if (FORY_PREDICT_FALSE(size > buffer_->reader_index_)) {
+ return Unexpected(Error::buffer_out_of_bound(buffer_->reader_index_,
size,
+ buffer_->size_));
+ }
+ buffer_->reader_index_ -= size;
+ return Result<void, Error>();
+ }
+
+ Buffer &get_buffer() override { return *buffer_; }
+
+ void bind_buffer(Buffer *buffer) override {
+ Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
+ if (target == nullptr) {
+ if (buffer_ != nullptr) {
+ buffer_->stream_reader_ = nullptr;
+ }
+ buffer_ = nullptr;
+ return;
+ }
+
+ if (buffer_ == target) {
+ buffer_->data_ = data_.data();
+ buffer_->own_data_ = false;
+ buffer_->wrapped_vector_ = nullptr;
+ buffer_->stream_reader_ = this;
+ return;
+ }
+
+ Buffer *source = buffer_;
+ if (source != nullptr) {
+ target->size_ = source->size_;
+ target->writer_index_ = source->writer_index_;
+ target->reader_index_ = source->reader_index_;
+ source->stream_reader_ = nullptr;
+ } else {
+ target->size_ = 0;
+ target->writer_index_ = 0;
+ target->reader_index_ = 0;
+ }
+
+ buffer_ = target;
+ buffer_->data_ = data_.data();
+ buffer_->own_data_ = false;
+ buffer_->wrapped_vector_ = nullptr;
+ buffer_->stream_reader_ = this;
+ }
+
+private:
+ Result<uint32_t, Error> recv_into(void *dst, uint32_t length) {
+ if (length == 0) {
+ return 0U;
+ }
+ PyGILState_STATE gil_state = PyGILState_Ensure();
+ PyObject *memory_view =
+ PyMemoryView_FromMemory(reinterpret_cast<char *>(dst),
+ static_cast<Py_ssize_t>(length), PyBUF_WRITE);
+ if (memory_view == nullptr) {
+ std::string message = fetch_python_error_message();
+ PyGILState_Release(gil_state);
+ return Unexpected(Error::io_error(message));
+ }
+ PyObject *read_bytes_obj = nullptr;
+ switch (read_method_) {
+ case PythonStreamReadMethod::ReadInto:
+ read_bytes_obj =
+ PyObject_CallMethod(stream_, read_method_name_, "O", memory_view);
+ break;
+ case PythonStreamReadMethod::RecvInto:
+ case PythonStreamReadMethod::RecvIntoUnderscore:
+ read_bytes_obj =
+ PyObject_CallMethod(stream_, read_method_name_, "On", memory_view,
+ static_cast<Py_ssize_t>(length));
+ break;
+ }
+ Py_DECREF(memory_view);
+ if (read_bytes_obj == nullptr) {
+ std::string message = fetch_python_error_message();
+ PyGILState_Release(gil_state);
+ return Unexpected(Error::io_error(message));
+ }
+
+ Py_ssize_t read_bytes = PyLong_AsSsize_t(read_bytes_obj);
+ Py_DECREF(read_bytes_obj);
+ if (read_bytes == -1 && PyErr_Occurred()) {
+ std::string message = fetch_python_error_message();
+ PyGILState_Release(gil_state);
+ return Unexpected(Error::io_error(message));
+ }
+ PyGILState_Release(gil_state);
+ if (read_bytes < 0 ||
+ static_cast<uint64_t>(read_bytes) > static_cast<uint64_t>(length)) {
+ return Unexpected(Error::io_error("python stream " +
+ std::string(read_method_name_) +
+ " returned invalid length"));
+ }
+ return static_cast<uint32_t>(read_bytes);
+ }
+
+ uint32_t remaining_size() const {
+ return buffer_->size_ - buffer_->reader_index_;
+ }
+
+ void reserve(uint32_t new_size) {
+ data_.resize(new_size);
+ buffer_->data_ = data_.data();
+ }
+
+ PyObject *stream_ = nullptr;
+ PythonStreamReadMethod read_method_;
+ const char *read_method_name_ = nullptr;
+ std::vector<uint8_t> data_;
+ uint32_t initial_buffer_size_ = 1;
+ Buffer *buffer_ = nullptr;
+ std::unique_ptr<Buffer> owned_buffer_;
+};
+
int Fory_PyBooleanSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
Py_ssize_t start_index) {
PyObject **items = py_sequence_get_items(collection);
@@ -58,4 +396,25 @@ int Fory_PyFloatSequenceWriteToBuffer(PyObject *collection,
Buffer *buffer,
}
return 0;
}
+
+int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
+ Buffer **out, std::string *error_message) {
+ if (stream == nullptr) {
+ *error_message = "stream must not be null";
+ return -1;
+ }
+ PythonStreamReadMethod read_method = PythonStreamReadMethod::ReadInto;
+ if (!resolve_python_stream_read_method(stream, &read_method, error_message))
{
+ return -1;
+ }
+ try {
+ auto stream_reader =
+ std::make_shared<PythonStreamReader>(stream, buffer_size, read_method);
+ *out = new Buffer(*stream_reader);
+ return 0;
+ } catch (const std::exception &e) {
+ *error_message = e.what();
+ return -1;
+ }
+}
} // namespace fory
diff --git a/cpp/fory/python/pyfory.h b/cpp/fory/python/pyfory.h
index 467a86970..780ef3534 100644
--- a/cpp/fory/python/pyfory.h
+++ b/cpp/fory/python/pyfory.h
@@ -18,6 +18,8 @@
*/
#pragma once
+#include <string>
+
#include "Python.h"
#include "fory/util/buffer.h"
@@ -26,4 +28,6 @@ int Fory_PyBooleanSequenceWriteToBuffer(PyObject *collection,
Buffer *buffer,
Py_ssize_t start_index);
int Fory_PyFloatSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
Py_ssize_t start_index);
-} // namespace fory
\ No newline at end of file
+int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
+ Buffer **out, std::string *error_message);
+} // namespace fory
diff --git a/cpp/fory/serialization/BUILD b/cpp/fory/serialization/BUILD
index 5f737ae5f..c3dfe5bd5 100644
--- a/cpp/fory/serialization/BUILD
+++ b/cpp/fory/serialization/BUILD
@@ -174,6 +174,16 @@ cc_test(
],
)
+cc_test(
+ name = "stream_test",
+ srcs = ["stream_test.cc"],
+ deps = [
+ ":fory_serialization",
+ "@googletest//:gtest",
+ "@googletest//:gtest_main",
+ ],
+)
+
cc_binary(
name = "xlang_test_main",
srcs = ["xlang_test_main.cc"],
diff --git a/cpp/fory/serialization/CMakeLists.txt
b/cpp/fory/serialization/CMakeLists.txt
index d4852742e..0582554c7 100644
--- a/cpp/fory/serialization/CMakeLists.txt
+++ b/cpp/fory/serialization/CMakeLists.txt
@@ -113,6 +113,10 @@ if(FORY_BUILD_TESTS)
add_executable(fory_serialization_any_test any_serializer_test.cc)
target_link_libraries(fory_serialization_any_test fory_serialization
GTest::gtest GTest::gtest_main)
gtest_discover_tests(fory_serialization_any_test)
+
+ add_executable(fory_serialization_stream_test stream_test.cc)
+ target_link_libraries(fory_serialization_stream_test fory_serialization
GTest::gtest GTest::gtest_main)
+ gtest_discover_tests(fory_serialization_stream_test)
endif()
# xlang test binary
diff --git a/cpp/fory/serialization/collection_serializer.h
b/cpp/fory/serialization/collection_serializer.h
index a780d1c0c..3768275d1 100644
--- a/cpp/fory/serialization/collection_serializer.h
+++ b/cpp/fory/serialization/collection_serializer.h
@@ -905,7 +905,10 @@ template <typename Alloc> struct
Serializer<std::vector<bool, Alloc>> {
for (uint32_t i = 0; i < size; ++i) {
result[i] = (src[i] != 0);
}
- buffer.increase_reader_index(size);
+ buffer.increase_reader_index(size, ctx.error());
+ if (FORY_PREDICT_FALSE(ctx.has_error())) {
+ return std::vector<bool, Alloc>();
+ }
} else {
// Fallback: read byte-by-byte with bounds checking
for (uint32_t i = 0; i < size; ++i) {
diff --git a/cpp/fory/serialization/fory.h b/cpp/fory/serialization/fory.h
index 53735400b..09a0bfa48 100644
--- a/cpp/fory/serialization/fory.h
+++ b/cpp/fory/serialization/fory.h
@@ -37,6 +37,7 @@
#include "fory/util/error.h"
#include "fory/util/pool.h"
#include "fory/util/result.h"
+#include "fory/util/stream.h"
#include <cstring>
#include <memory>
#include <mutex>
@@ -513,9 +514,9 @@ public:
ensure_finalized();
}
// Swap in the caller's buffer so all writes go there.
- std::swap(buffer, write_ctx_->buffer());
+ buffer.swap(write_ctx_->buffer());
auto result = serialize_impl(obj, write_ctx_->buffer());
- std::swap(buffer, write_ctx_->buffer());
+ buffer.swap(write_ctx_->buffer());
// reset internal state after use without clobbering caller buffer.
write_ctx_->reset();
return result;
@@ -606,7 +607,11 @@ public:
if (FORY_PREDICT_FALSE(!finalized_)) {
ensure_finalized();
}
- FORY_TRY(header, read_header(buffer));
+ auto header_result = read_header(buffer);
+ if (FORY_PREDICT_FALSE(!header_result.ok())) {
+ return Unexpected(std::move(header_result).error());
+ }
+ auto header = std::move(header_result).value();
if (header.is_null) {
return Unexpected(Error::invalid_data("Cannot deserialize null object"));
}
@@ -622,6 +627,38 @@ public:
return deserialize_impl<T>(buffer);
}
+ /// Deserialize an object from a stream reader.
+ ///
+ /// This overload obtains the reader-owned Buffer via get_buffer() and
+ /// continues deserialization on that buffer.
+ ///
+ /// @tparam T The type of object to deserialize.
+ /// @param stream_reader Stream reader to read from.
+ /// @return Deserialized object, or error.
+ template <typename T>
+ Result<T, Error> deserialize(StreamReader &stream_reader) {
+ struct StreamShrinkGuard {
+ StreamReader *stream_reader = nullptr;
+ ~StreamShrinkGuard() {
+ if (stream_reader != nullptr) {
+ stream_reader->shrink_buffer();
+ }
+ }
+ };
+ StreamShrinkGuard shrink_guard{&stream_reader};
+ Buffer &buffer = stream_reader.get_buffer();
+ return deserialize<T>(buffer);
+ }
+
+ /// Deserialize an object from ForyInputStream.
+ ///
+ /// @tparam T The type of object to deserialize.
+ /// @param stream Input stream wrapper to read from.
+ /// @return Deserialized object, or error.
+ template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
+ return deserialize<T>(static_cast<StreamReader &>(stream));
+ }
+
// ==========================================================================
// Advanced Access
// ==========================================================================
@@ -792,6 +829,17 @@ public:
return deserialize<T>(data.data(), data.size());
}
+ template <typename T>
+ Result<T, Error> deserialize(StreamReader &stream_reader) {
+ auto fory_handle = fory_pool_.acquire();
+ return fory_handle->template deserialize<T>(stream_reader);
+ }
+
+ template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
+ auto fory_handle = fory_pool_.acquire();
+ return fory_handle->template deserialize<T>(stream);
+ }
+
private:
explicit ThreadSafeFory(const Config &config,
std::shared_ptr<TypeResolver> resolver)
diff --git a/cpp/fory/serialization/skip.cc b/cpp/fory/serialization/skip.cc
index 946d6fbf4..3a89e295a 100644
--- a/cpp/fory/serialization/skip.cc
+++ b/cpp/fory/serialization/skip.cc
@@ -48,7 +48,7 @@ void skip_string(ReadContext &ctx) {
uint64_t size = size_encoding >> 2;
// skip string data
- ctx.buffer().increase_reader_index(size);
+ ctx.buffer().increase_reader_index(size, ctx.error());
}
void skip_list(ReadContext &ctx, const FieldType &field_type) {
@@ -494,13 +494,13 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
case TypeId::BOOL:
case TypeId::INT8:
case TypeId::FLOAT8:
- ctx.buffer().increase_reader_index(1);
+ ctx.buffer().increase_reader_index(1, ctx.error());
return;
case TypeId::INT16:
case TypeId::BFLOAT16:
case TypeId::FLOAT16:
- ctx.buffer().increase_reader_index(2);
+ ctx.buffer().increase_reader_index(2, ctx.error());
return;
case TypeId::INT32: {
@@ -512,7 +512,7 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
}
case TypeId::FLOAT32:
- ctx.buffer().increase_reader_index(4);
+ ctx.buffer().increase_reader_index(4, ctx.error());
return;
case TypeId::INT64: {
@@ -524,7 +524,7 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
}
case TypeId::FLOAT64:
- ctx.buffer().increase_reader_index(8);
+ ctx.buffer().increase_reader_index(8, ctx.error());
return;
case TypeId::VARINT32:
@@ -551,36 +551,21 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
case TypeId::DURATION: {
// Duration is stored as fixed 8-byte nanosecond count.
constexpr uint32_t k_bytes = static_cast<uint32_t>(sizeof(int64_t));
- if (ctx.buffer().reader_index() + k_bytes > ctx.buffer().size()) {
- ctx.set_error(Error::buffer_out_of_bound(ctx.buffer().reader_index(),
- k_bytes, ctx.buffer().size()));
- return;
- }
- ctx.buffer().increase_reader_index(k_bytes);
+ ctx.buffer().increase_reader_index(k_bytes, ctx.error());
return;
}
case TypeId::TIMESTAMP: {
// Timestamp is stored as int64 seconds + uint32 nanoseconds.
constexpr uint32_t k_bytes =
static_cast<uint32_t>(sizeof(int64_t) + sizeof(uint32_t));
- if (ctx.buffer().reader_index() + k_bytes > ctx.buffer().size()) {
- ctx.set_error(Error::buffer_out_of_bound(ctx.buffer().reader_index(),
- k_bytes, ctx.buffer().size()));
- return;
- }
- ctx.buffer().increase_reader_index(k_bytes);
+ ctx.buffer().increase_reader_index(k_bytes, ctx.error());
return;
}
case TypeId::DATE: {
// Date is stored as fixed 4-byte day count.
constexpr uint32_t k_bytes = static_cast<uint32_t>(sizeof(int32_t));
- if (ctx.buffer().reader_index() + k_bytes > ctx.buffer().size()) {
- ctx.set_error(Error::buffer_out_of_bound(ctx.buffer().reader_index(),
- k_bytes, ctx.buffer().size()));
- return;
- }
- ctx.buffer().increase_reader_index(k_bytes);
+ ctx.buffer().increase_reader_index(k_bytes, ctx.error());
return;
}
@@ -603,7 +588,7 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
case TypeId::FLOAT32_ARRAY:
case TypeId::FLOAT64_ARRAY: {
if (tid == TypeId::FLOAT8_ARRAY) {
- ctx.buffer().increase_reader_index(1);
+ ctx.buffer().increase_reader_index(1, ctx.error());
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return;
}
@@ -634,7 +619,7 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
break;
}
- ctx.buffer().increase_reader_index(len * elem_size);
+ ctx.buffer().increase_reader_index(len * elem_size, ctx.error());
return;
}
@@ -644,7 +629,7 @@ void skip_field_value(ReadContext &ctx, const FieldType
&field_type,
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return;
}
- ctx.buffer().increase_reader_index(len);
+ ctx.buffer().increase_reader_index(len, ctx.error());
return;
}
diff --git a/cpp/fory/serialization/stream_test.cc
b/cpp/fory/serialization/stream_test.cc
new file mode 100644
index 000000000..edbefd63d
--- /dev/null
+++ b/cpp/fory/serialization/stream_test.cc
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <istream>
+#include <map>
+#include <memory>
+#include <streambuf>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "fory/serialization/fory.h"
+#include "fory/util/stream.h"
+
+namespace fory {
+namespace serialization {
+namespace test {
+
+struct StreamPoint {
+ int32_t x;
+ int32_t y;
+
+ bool operator==(const StreamPoint &other) const {
+ return x == other.x && y == other.y;
+ }
+
+ FORY_STRUCT(StreamPoint, x, y);
+};
+
+struct StreamEnvelope {
+ std::string name;
+ std::vector<int32_t> values;
+ std::map<std::string, int64_t> metrics;
+ StreamPoint point;
+ bool active;
+
+ bool operator==(const StreamEnvelope &other) const {
+ return name == other.name && values == other.values &&
+ metrics == other.metrics && point == other.point &&
+ active == other.active;
+ }
+
+ FORY_STRUCT(StreamEnvelope, name, values, metrics, point, active);
+};
+
+struct SharedIntPair {
+ std::shared_ptr<int32_t> first;
+ std::shared_ptr<int32_t> second;
+
+ FORY_STRUCT(SharedIntPair, first, second);
+};
+
+class OneByteStreamBuf final : public std::streambuf {
+public:
+ explicit OneByteStreamBuf(std::vector<uint8_t> data)
+ : data_(std::move(data)), pos_(0) {}
+
+protected:
+ std::streamsize xsgetn(char *s, std::streamsize count) override {
+ if (pos_ >= data_.size() || count <= 0) {
+ return 0;
+ }
+ s[0] = static_cast<char>(data_[pos_]);
+ ++pos_;
+ return 1;
+ }
+
+ int_type underflow() override {
+ if (pos_ >= data_.size()) {
+ return traits_type::eof();
+ }
+ current_ = static_cast<char>(data_[pos_]);
+ setg(¤t_, ¤t_, ¤t_ + 1);
+ return traits_type::to_int_type(current_);
+ }
+
+private:
+ std::vector<uint8_t> data_;
+ size_t pos_;
+ char current_ = 0;
+};
+
+class OneByteIStream final : public std::istream {
+public:
+ explicit OneByteIStream(std::vector<uint8_t> data)
+ : std::istream(nullptr), buf_(std::move(data)) {
+ rdbuf(&buf_);
+ }
+
+private:
+ OneByteStreamBuf buf_;
+};
+
+static inline void register_stream_types(Fory &fory) {
+ uint32_t type_id = 1;
+ fory.register_struct<StreamPoint>(type_id++);
+ fory.register_struct<StreamEnvelope>(type_id++);
+ fory.register_struct<SharedIntPair>(type_id++);
+}
+
+TEST(StreamSerializationTest, PrimitiveAndStringRoundTrip) {
+ auto fory = Fory::builder().xlang(true).track_ref(false).build();
+
+ auto number_bytes_result = fory.serialize<int64_t>(-9876543212345LL);
+ ASSERT_TRUE(number_bytes_result.ok())
+ << number_bytes_result.error().to_string();
+ OneByteIStream number_source(std::move(number_bytes_result).value());
+ ForyInputStream number_stream(number_source, 8);
+ auto number_result = fory.deserialize<int64_t>(number_stream);
+ ASSERT_TRUE(number_result.ok()) << number_result.error().to_string();
+ EXPECT_EQ(number_result.value(), -9876543212345LL);
+
+ auto string_bytes_result = fory.serialize<std::string>("stream-hello-世界");
+ ASSERT_TRUE(string_bytes_result.ok())
+ << string_bytes_result.error().to_string();
+ OneByteIStream string_source(std::move(string_bytes_result).value());
+ ForyInputStream string_stream(string_source, 8);
+ auto string_result = fory.deserialize<std::string>(string_stream);
+ ASSERT_TRUE(string_result.ok()) << string_result.error().to_string();
+ EXPECT_EQ(string_result.value(), "stream-hello-世界");
+}
+
+TEST(StreamSerializationTest, StructRoundTrip) {
+ auto fory = Fory::builder().xlang(true).track_ref(true).build();
+ register_stream_types(fory);
+
+ StreamEnvelope original{
+ "payload-name",
+ {1, 3, 5, 7, 9},
+ {{"count", 5}, {"sum", 25}, {"max", 9}},
+ {42, -7},
+ true,
+ };
+
+ auto bytes_result = fory.serialize(original);
+ ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
+
+ OneByteIStream source(std::move(bytes_result).value());
+ ForyInputStream stream(source, 4);
+ auto result = fory.deserialize<StreamEnvelope>(stream);
+ ASSERT_TRUE(result.ok()) << result.error().to_string();
+ EXPECT_EQ(result.value(), original);
+}
+
+TEST(StreamSerializationTest, SequentialDeserializeFromSingleStream) {
+ auto fory = Fory::builder().xlang(true).track_ref(true).build();
+ register_stream_types(fory);
+
+ StreamEnvelope envelope{
+ "batch", {10, 20, 30}, {{"a", 1}, {"b", 2}}, {9, 8}, false,
+ };
+
+ std::vector<uint8_t> bytes;
+ ASSERT_TRUE(fory.serialize_to(bytes, static_cast<int32_t>(12345)).ok());
+ ASSERT_TRUE(fory.serialize_to(bytes, std::string("next-value")).ok());
+ ASSERT_TRUE(fory.serialize_to(bytes, envelope).ok());
+
+ OneByteIStream source(bytes);
+ ForyInputStream stream(source, 3);
+
+ auto first = fory.deserialize<int32_t>(stream);
+ ASSERT_TRUE(first.ok()) << first.error().to_string();
+ EXPECT_EQ(first.value(), 12345);
+ EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+
+ auto second = fory.deserialize<std::string>(stream);
+ ASSERT_TRUE(second.ok()) << second.error().to_string();
+ EXPECT_EQ(second.value(), "next-value");
+ EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+
+ auto third = fory.deserialize<StreamEnvelope>(stream);
+ ASSERT_TRUE(third.ok()) << third.error().to_string();
+ EXPECT_EQ(third.value(), envelope);
+ EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
+
+ EXPECT_EQ(stream.get_buffer().remaining_size(), 0U);
+}
+
+TEST(StreamSerializationTest, SharedPointerIdentityRoundTrip) {
+ auto fory = Fory::builder().xlang(true).track_ref(true).build();
+ register_stream_types(fory);
+
+ auto shared = std::make_shared<int32_t>(2026);
+ SharedIntPair pair{shared, shared};
+
+ auto bytes_result = fory.serialize(pair);
+ ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
+
+ OneByteIStream source(std::move(bytes_result).value());
+ ForyInputStream stream(source, 2);
+ auto result = fory.deserialize<SharedIntPair>(stream);
+ ASSERT_TRUE(result.ok()) << result.error().to_string();
+ ASSERT_NE(result.value().first, nullptr);
+ ASSERT_NE(result.value().second, nullptr);
+ EXPECT_EQ(*result.value().first, 2026);
+ EXPECT_EQ(result.value().first, result.value().second);
+}
+
+TEST(StreamSerializationTest, TruncatedStreamReturnsError) {
+ auto fory = Fory::builder().xlang(true).track_ref(true).build();
+ register_stream_types(fory);
+
+ StreamEnvelope original{
+ "truncated", {1, 2, 3, 4}, {{"k", 99}}, {7, 7}, true,
+ };
+ auto bytes_result = fory.serialize(original);
+ ASSERT_TRUE(bytes_result.ok()) << bytes_result.error().to_string();
+
+ std::vector<uint8_t> truncated = std::move(bytes_result).value();
+ ASSERT_GT(truncated.size(), 1u);
+ truncated.pop_back();
+
+ OneByteIStream source(truncated);
+ ForyInputStream stream(source, 4);
+ auto result = fory.deserialize<StreamEnvelope>(stream);
+ EXPECT_FALSE(result.ok());
+}
+
+} // namespace test
+} // namespace serialization
+} // namespace fory
diff --git a/cpp/fory/serialization/string_serializer.h
b/cpp/fory/serialization/string_serializer.h
index 139564b93..c570e0c25 100644
--- a/cpp/fory/serialization/string_serializer.h
+++ b/cpp/fory/serialization/string_serializer.h
@@ -25,6 +25,7 @@
#include "fory/util/error.h"
#include "fory/util/string_util.h"
#include <cstdint>
+#include <limits>
#include <string>
#include <string_view>
#include <vector>
@@ -113,29 +114,29 @@ inline std::string read_string_data(ReadContext &ctx) {
return std::string();
}
- // Validate length against buffer remaining size
- if (length > ctx.buffer().remaining_size()) {
- ctx.set_error(Error::invalid_data("String length exceeds buffer size"));
+ if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max())) {
+ ctx.set_error(Error::invalid_data("String length exceeds uint32 range"));
return std::string();
}
+ const uint32_t length_u32 = static_cast<uint32_t>(length);
// Handle different encodings
switch (encoding) {
case StringEncoding::LATIN1: {
- std::vector<uint8_t> bytes(length);
- ctx.read_bytes(bytes.data(), length, ctx.error());
+ std::vector<uint8_t> bytes(length_u32);
+ ctx.read_bytes(bytes.data(), length_u32, ctx.error());
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return std::string();
}
- return latin1_to_utf8(bytes.data(), length);
+ return latin1_to_utf8(bytes.data(), length_u32);
}
case StringEncoding::UTF16: {
- if (length % 2 != 0) {
+ if ((length_u32 & 1) != 0) {
ctx.set_error(Error::invalid_data("UTF-16 length must be even"));
return std::string();
}
- std::vector<uint16_t> utf16_chars(length / 2);
- ctx.read_bytes(reinterpret_cast<uint8_t *>(utf16_chars.data()), length,
+ std::vector<uint16_t> utf16_chars(length_u32 / 2);
+ ctx.read_bytes(reinterpret_cast<uint8_t *>(utf16_chars.data()), length_u32,
ctx.error());
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return std::string();
@@ -144,8 +145,8 @@ inline std::string read_string_data(ReadContext &ctx) {
}
case StringEncoding::UTF8: {
// UTF-8: read bytes directly
- std::string result(length, '\0');
- ctx.read_bytes(&result[0], length, ctx.error());
+ std::string result(length_u32, '\0');
+ ctx.read_bytes(&result[0], length_u32, ctx.error());
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return std::string();
}
@@ -176,18 +177,18 @@ inline std::u16string read_u16string_data(ReadContext
&ctx) {
return std::u16string();
}
- // Validate length against buffer remaining size
- if (length > ctx.buffer().remaining_size()) {
- ctx.set_error(Error::invalid_data("String length exceeds buffer size"));
+ if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max())) {
+ ctx.set_error(Error::invalid_data("String length exceeds uint32 range"));
return std::u16string();
}
+ const uint32_t length_u32 = static_cast<uint32_t>(length);
// Handle different encodings
switch (encoding) {
case StringEncoding::LATIN1: {
// Latin1 bytes map directly to char16_t (codepoints 0-255)
- std::u16string result(length, u'\0');
- for (size_t i = 0; i < length; ++i) {
+ std::u16string result(length_u32, u'\0');
+ for (size_t i = 0; i < length_u32; ++i) {
result[i] = static_cast<char16_t>(ctx.read_uint8(ctx.error()));
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return std::u16string();
@@ -196,12 +197,12 @@ inline std::u16string read_u16string_data(ReadContext
&ctx) {
return result;
}
case StringEncoding::UTF16: {
- if (length % 2 != 0) {
+ if ((length_u32 & 1) != 0) {
ctx.set_error(Error::invalid_data("UTF-16 length must be even"));
return std::u16string();
}
- std::u16string result(length / 2, u'\0');
- ctx.read_bytes(reinterpret_cast<uint8_t *>(&result[0]), length,
+ std::u16string result(length_u32 / 2, u'\0');
+ ctx.read_bytes(reinterpret_cast<uint8_t *>(&result[0]), length_u32,
ctx.error());
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return std::u16string();
@@ -210,8 +211,8 @@ inline std::u16string read_u16string_data(ReadContext &ctx)
{
}
case StringEncoding::UTF8: {
// Read UTF-8 bytes and convert to UTF-16
- std::string utf8(length, '\0');
- ctx.read_bytes(&utf8[0], length, ctx.error());
+ std::string utf8(length_u32, '\0');
+ ctx.read_bytes(&utf8[0], length_u32, ctx.error());
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return std::u16string();
}
diff --git a/cpp/fory/serialization/struct_serializer.h
b/cpp/fory/serialization/struct_serializer.h
index bb4a24bb8..812586114 100644
--- a/cpp/fory/serialization/struct_serializer.h
+++ b/cpp/fory/serialization/struct_serializer.h
@@ -31,6 +31,7 @@
#include "fory/util/string_util.h"
#include <algorithm>
#include <array>
+#include <limits>
#include <memory>
#include <numeric>
#include <string_view>
@@ -831,6 +832,12 @@ template <typename T> struct CompileTimeFieldHelpers {
if constexpr (is_configurable_int_v<FieldType>) {
return configurable_int_max_varint_bytes<FieldType, T, Index>();
+ } else if constexpr (std::is_same_v<FieldType, int32_t> ||
+ std::is_same_v<FieldType, int>) {
+ return 5;
+ } else if constexpr (std::is_same_v<FieldType, int64_t> ||
+ std::is_same_v<FieldType, long long>) {
+ return 10;
}
return 0;
}
@@ -2753,11 +2760,8 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
// Phase 1: Read leading fixed-size primitives if any
if constexpr (fixed_count > 0 && fixed_bytes > 0) {
- // Pre-check bounds for all fixed-size fields at once
- if (FORY_PREDICT_FALSE(buffer.reader_index() + fixed_bytes >
- buffer.size())) {
- ctx.set_error(Error::buffer_out_of_bound(buffer.reader_index(),
- fixed_bytes, buffer.size()));
+ if (FORY_PREDICT_FALSE(!buffer.ensure_readable(
+ static_cast<uint32_t>(fixed_bytes), ctx.error()))) {
return;
}
// Fast read fixed-size primitives
@@ -2769,6 +2773,12 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
// Note: varint bounds checking is done per-byte during reading since
// varint lengths are variable (actual size << max possible size)
if constexpr (varint_count > 0) {
+ if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+ // Stream-backed buffers may not have all varint bytes materialized
yet.
+ // Fall back to per-field readers that propagate stream read errors.
+ read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
+ return;
+ }
// Track offset locally for batch varint reading
uint32_t offset = buffer.reader_index();
// Fast read varint primitives (bounds checking happens in
@@ -2807,11 +2817,8 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
// Phase 1: Read leading fixed-size primitives if any
if constexpr (fixed_count > 0 && fixed_bytes > 0) {
- // Pre-check bounds for all fixed-size fields at once
- if (FORY_PREDICT_FALSE(buffer.reader_index() + fixed_bytes >
- buffer.size())) {
- ctx.set_error(Error::buffer_out_of_bound(buffer.reader_index(),
- fixed_bytes, buffer.size()));
+ if (FORY_PREDICT_FALSE(!buffer.ensure_readable(
+ static_cast<uint32_t>(fixed_bytes), ctx.error()))) {
return;
}
// Fast read fixed-size primitives
@@ -2821,6 +2828,12 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,
// Phase 2: Read consecutive varint primitives (int32, int64) if any
if constexpr (varint_count > 0) {
+ if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
+ // Stream-backed buffers may not have all varint bytes materialized yet.
+ // Fall back to per-field readers that propagate stream read errors.
+ read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
+ return;
+ }
// Track offset locally for batch varint reading
uint32_t offset = buffer.reader_index();
// Fast read varint primitives (bounds checking happens in
diff --git a/cpp/fory/util/CMakeLists.txt b/cpp/fory/util/CMakeLists.txt
index 2bc4f30c5..9b7718d67 100644
--- a/cpp/fory/util/CMakeLists.txt
+++ b/cpp/fory/util/CMakeLists.txt
@@ -19,6 +19,7 @@ set(FORY_UTIL_SOURCES
buffer.cc
error.cc
logging.cc
+ stream.cc
string_util.cc
time_util.cc
)
@@ -32,6 +33,7 @@ set(FORY_UTIL_HEADERS
platform.h
pool.h
result.h
+ stream.h
string_util.h
time_util.h
flat_int_map.h
diff --git a/cpp/fory/util/buffer.cc b/cpp/fory/util/buffer.cc
index 4c1e5f593..e7006ae38 100644
--- a/cpp/fory/util/buffer.cc
+++ b/cpp/fory/util/buffer.cc
@@ -31,6 +31,7 @@ Buffer::Buffer() {
writer_index_ = 0;
reader_index_ = 0;
wrapped_vector_ = nullptr;
+ stream_reader_ = nullptr;
}
Buffer::Buffer(Buffer &&buffer) noexcept {
@@ -40,6 +41,10 @@ Buffer::Buffer(Buffer &&buffer) noexcept {
writer_index_ = buffer.writer_index_;
reader_index_ = buffer.reader_index_;
wrapped_vector_ = buffer.wrapped_vector_;
+ stream_reader_ = buffer.stream_reader_;
+ stream_reader_owner_ = std::move(buffer.stream_reader_owner_);
+ rebind_stream_reader_to_this();
+ buffer.stream_reader_ = nullptr;
buffer.data_ = nullptr;
buffer.size_ = 0;
buffer.own_data_ = false;
@@ -47,6 +52,7 @@ Buffer::Buffer(Buffer &&buffer) noexcept {
}
Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
+ detach_stream_reader_from_this();
if (own_data_) {
free(data_);
data_ = nullptr;
@@ -57,6 +63,10 @@ Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
writer_index_ = buffer.writer_index_;
reader_index_ = buffer.reader_index_;
wrapped_vector_ = buffer.wrapped_vector_;
+ stream_reader_ = buffer.stream_reader_;
+ stream_reader_owner_ = std::move(buffer.stream_reader_owner_);
+ rebind_stream_reader_to_this();
+ buffer.stream_reader_ = nullptr;
buffer.data_ = nullptr;
buffer.size_ = 0;
buffer.own_data_ = false;
@@ -65,6 +75,7 @@ Buffer &Buffer::operator=(Buffer &&buffer) noexcept {
}
Buffer::~Buffer() {
+ detach_stream_reader_from_this();
if (own_data_) {
free(data_);
data_ = nullptr;
diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h
index b626fb7d9..857902fc4 100644
--- a/cpp/fory/util/buffer.h
+++ b/cpp/fory/util/buffer.h
@@ -24,15 +24,20 @@
#include <limits>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "fory/util/bit_util.h"
#include "fory/util/error.h"
#include "fory/util/logging.h"
#include "fory/util/result.h"
+#include "fory/util/stream.h"
namespace fory {
+class ForyInputStream;
+class PythonStreamReader;
+
// A buffer class for storing raw bytes with various methods for reading and
// writing the bytes.
class Buffer {
@@ -40,8 +45,8 @@ public:
Buffer();
Buffer(uint8_t *data, uint32_t size, bool own_data = true)
- : data_(data), size_(size), own_data_(own_data),
- wrapped_vector_(nullptr) {
+ : data_(data), size_(size), own_data_(own_data),
wrapped_vector_(nullptr),
+ stream_reader_(nullptr) {
writer_index_ = 0;
reader_index_ = 0;
}
@@ -54,7 +59,17 @@ public:
explicit Buffer(std::vector<uint8_t> &vec)
: data_(vec.data()), size_(static_cast<uint32_t>(vec.size())),
own_data_(false), writer_index_(static_cast<uint32_t>(vec.size())),
- reader_index_(0), wrapped_vector_(&vec) {}
+ reader_index_(0), wrapped_vector_(&vec), stream_reader_(nullptr) {}
+
+ explicit Buffer(StreamReader &stream_reader)
+ : data_(nullptr), size_(0), own_data_(false), writer_index_(0),
+ reader_index_(0), wrapped_vector_(nullptr),
+ stream_reader_(&stream_reader) {
+ stream_reader_->bind_buffer(this);
+ stream_reader_owner_ = stream_reader_->weak_from_this().lock();
+ FORY_CHECK(&stream_reader_->get_buffer() == this)
+ << "StreamReader must hold and return the same Buffer instance";
+ }
Buffer(Buffer &&buffer) noexcept;
@@ -62,6 +77,23 @@ public:
virtual ~Buffer();
+ FORY_ALWAYS_INLINE void swap(Buffer &other) noexcept {
+ if (this == &other) {
+ return;
+ }
+ using std::swap;
+ swap(data_, other.data_);
+ swap(size_, other.size_);
+ swap(own_data_, other.own_data_);
+ swap(writer_index_, other.writer_index_);
+ swap(reader_index_, other.reader_index_);
+ swap(wrapped_vector_, other.wrapped_vector_);
+ swap(stream_reader_, other.stream_reader_);
+ swap(stream_reader_owner_, other.stream_reader_owner_);
+ rebind_stream_reader_to_this();
+ other.rebind_stream_reader_to_this();
+ }
+
/// \brief Return a pointer to the buffer's data
FORY_ALWAYS_INLINE uint8_t *data() const { return data_; }
@@ -70,6 +102,16 @@ public:
FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }
+ FORY_ALWAYS_INLINE bool is_stream_backed() const {
+ return stream_reader_ != nullptr;
+ }
+
+ FORY_ALWAYS_INLINE void shrink_stream_buffer() {
+ if (stream_reader_ != nullptr) {
+ stream_reader_->shrink_buffer();
+ }
+ }
+
FORY_ALWAYS_INLINE uint32_t writer_index() { return writer_index_; }
FORY_ALWAYS_INLINE uint32_t reader_index() { return reader_index_; }
@@ -79,6 +121,22 @@ public:
return size_ - reader_index_;
}
+ FORY_ALWAYS_INLINE bool ensure_readable(uint32_t length, Error &error) {
+ if (FORY_PREDICT_TRUE(length <= size_ - reader_index_)) {
+ return true;
+ }
+ if (FORY_PREDICT_FALSE(length > std::numeric_limits<uint32_t>::max() -
+ reader_index_)) {
+ error.set_error(ErrorCode::OutOfBound,
+ "reader index exceeds uint32 range");
+ return false;
+ }
+ if (FORY_PREDICT_FALSE(!fill_buffer(length, error))) {
+ return false;
+ }
+ return true;
+ }
+
FORY_ALWAYS_INLINE void writer_index(uint32_t writer_index) {
FORY_CHECK(writer_index < std::numeric_limits<uint32_t>::max())
<< "Buffer overflow writer_index" << writer_index_
@@ -93,18 +151,36 @@ public:
writer_index_ = writer_index;
}
- FORY_ALWAYS_INLINE void reader_index(uint32_t reader_index) {
- FORY_CHECK(reader_index < std::numeric_limits<uint32_t>::max())
- << "Buffer overflow reader_index" << reader_index_
- << " target reader_index " << reader_index;
+ FORY_ALWAYS_INLINE bool reader_index(uint32_t reader_index, Error &error) {
+ if (FORY_PREDICT_FALSE(reader_index > size_ && stream_reader_ != nullptr))
{
+ if (FORY_PREDICT_FALSE(
+ !fill_buffer(reader_index - reader_index_, error))) {
+ return false;
+ }
+ }
+ if (FORY_PREDICT_FALSE(reader_index > size_)) {
+ const uint32_t diff =
+ reader_index > reader_index_ ? reader_index - reader_index_ : 0;
+ error.set_buffer_out_of_bound(reader_index_, diff, size_);
+ return false;
+ }
reader_index_ = reader_index;
+ return true;
}
- FORY_ALWAYS_INLINE void increase_reader_index(uint32_t diff) {
- uint64_t reader_index = reader_index_ + diff;
- FORY_CHECK(reader_index < std::numeric_limits<uint32_t>::max())
- << "Buffer overflow reader_index" << reader_index_ << " diff " << diff;
- reader_index_ = reader_index;
+ FORY_ALWAYS_INLINE void reader_index(uint32_t reader_index) {
+ Error error;
+ const bool ok = this->reader_index(reader_index, error);
+ FORY_CHECK(ok) << "Buffer overflow reader_index " << reader_index_
+ << " target reader_index " << reader_index << " size "
+ << size_ << ", " << error.to_string();
+ }
+
+ FORY_ALWAYS_INLINE void increase_reader_index(uint32_t diff, Error &error) {
+ if (FORY_PREDICT_FALSE(!ensure_readable(diff, error))) {
+ return;
+ }
+ reader_index_ += diff;
}
// Unsafe methods don't check bound
@@ -143,8 +219,9 @@ public:
}
template <typename T> FORY_ALWAYS_INLINE T get(uint32_t relative_offset) {
- FORY_CHECK(relative_offset < size_) << "Out of range " << relative_offset
- << " should be less than " << size_;
+ FORY_CHECK(relative_offset + sizeof(T) <= size_)
+ << "Out of range " << relative_offset << " should be less than "
+ << size_;
T value = reinterpret_cast<const T *>(data_ + relative_offset)[0];
return value;
}
@@ -303,11 +380,11 @@ public:
return result;
}
// Slow path: byte-by-byte read
- return get_var_uint32_slow(offset, read_bytes_length);
+ return read_var_uint32_slow(offset, read_bytes_length);
}
- /// Slow path for get_var_uint32 when not enough bytes for bulk read.
- uint32_t get_var_uint32_slow(uint32_t offset, uint32_t *read_bytes_length) {
+ /// Slow path for varuint32 decode when not enough bytes for bulk read.
+ uint32_t read_var_uint32_slow(uint32_t offset, uint32_t *read_bytes_length) {
if (FORY_PREDICT_FALSE(offset >= size_)) {
*read_bytes_length = 0;
return 0;
@@ -739,8 +816,7 @@ public:
/// Read uint8_t value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE uint8_t read_uint8(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
return 0;
}
uint8_t value = data_[reader_index_];
@@ -750,8 +826,7 @@ public:
/// Read int8_t value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE int8_t read_int8(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
return 0;
}
int8_t value = static_cast<int8_t>(data_[reader_index_]);
@@ -761,8 +836,7 @@ public:
/// Read uint16_t value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE uint16_t read_uint16(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 2 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 2, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(2, error))) {
return 0;
}
uint16_t value =
@@ -773,8 +847,7 @@ public:
/// Read int16_t value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE int16_t read_int16(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 2 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 2, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(2, error))) {
return 0;
}
int16_t value = reinterpret_cast<const int16_t *>(data_ +
reader_index_)[0];
@@ -784,8 +857,7 @@ public:
/// Read int24 value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE int32_t read_int24(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 3 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 3, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(3, error))) {
return 0;
}
int32_t b0 = data_[reader_index_];
@@ -798,8 +870,7 @@ public:
/// Read uint32_t value from buffer (fixed 4 bytes). Sets error on bounds
/// violation.
FORY_ALWAYS_INLINE uint32_t read_uint32(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 4, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
return 0;
}
uint32_t value =
@@ -811,8 +882,7 @@ public:
/// Read int32_t value from buffer (fixed 4 bytes). Sets error on bounds
/// violation.
FORY_ALWAYS_INLINE int32_t read_int32(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 4, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
return 0;
}
int32_t value = reinterpret_cast<const int32_t *>(data_ +
reader_index_)[0];
@@ -823,8 +893,7 @@ public:
/// Read uint64_t value from buffer (fixed 8 bytes). Sets error on bounds
/// violation.
FORY_ALWAYS_INLINE uint64_t read_uint64(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 8 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 8, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(8, error))) {
return 0;
}
uint64_t value =
@@ -836,8 +905,7 @@ public:
/// Read int64_t value from buffer (fixed 8 bytes). Sets error on bounds
/// violation.
FORY_ALWAYS_INLINE int64_t read_int64(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 8 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 8, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(8, error))) {
return 0;
}
int64_t value = reinterpret_cast<const int64_t *>(data_ +
reader_index_)[0];
@@ -847,8 +915,7 @@ public:
/// Read float value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE float read_float(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 4, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
return 0.0f;
}
float value = reinterpret_cast<const float *>(data_ + reader_index_)[0];
@@ -858,8 +925,7 @@ public:
/// Read double value from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE double read_double(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 8 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 8, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(8, error))) {
return 0.0;
}
double value = reinterpret_cast<const double *>(data_ + reader_index_)[0];
@@ -869,50 +935,65 @@ public:
/// Read uint32_t value as varint from buffer. Sets error on bounds
violation.
FORY_ALWAYS_INLINE uint32_t read_var_uint32(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
return 0;
}
- uint32_t read_bytes = 0;
- uint32_t value = get_var_uint32(reader_index_, &read_bytes);
- if (FORY_PREDICT_FALSE(read_bytes == 0)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
- return 0;
+ if (FORY_PREDICT_FALSE(size_ - reader_index_ < 5)) {
+ return read_var_uint32_slow(error);
}
- increase_reader_index(read_bytes);
- return value;
+ uint32_t offset = reader_index_;
+ uint32_t bulk = *reinterpret_cast<uint32_t *>(data_ + offset);
+
+ uint32_t result = bulk & 0x7F;
+ if ((bulk & 0x80) == 0) {
+ reader_index_ = offset + 1;
+ return result;
+ }
+ result |= (bulk >> 1) & 0x3F80;
+ if ((bulk & 0x8000) == 0) {
+ reader_index_ = offset + 2;
+ return result;
+ }
+ result |= (bulk >> 2) & 0x1FC000;
+ if ((bulk & 0x800000) == 0) {
+ reader_index_ = offset + 3;
+ return result;
+ }
+ result |= (bulk >> 3) & 0xFE00000;
+ if ((bulk & 0x80000000) == 0) {
+ reader_index_ = offset + 4;
+ return result;
+ }
+ result |= static_cast<uint32_t>(data_[offset + 4] & 0x7F) << 28;
+ reader_index_ = offset + 5;
+ return result;
}
/// Read int32_t value as varint (zigzag encoded). Sets error on bounds
/// violation.
FORY_ALWAYS_INLINE int32_t read_var_int32(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
+ uint32_t raw = read_var_uint32(error);
+ if (FORY_PREDICT_FALSE(!error.ok())) {
return 0;
}
- uint32_t read_bytes = 0;
- uint32_t raw = get_var_uint32(reader_index_, &read_bytes);
- if (FORY_PREDICT_FALSE(read_bytes == 0)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
- return 0;
- }
- increase_reader_index(read_bytes);
return static_cast<int32_t>((raw >> 1) ^ (~(raw & 1) + 1));
}
/// Read uint64_t value as varint from buffer. Sets error on bounds
violation.
FORY_ALWAYS_INLINE uint64_t read_var_uint64(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
return 0;
}
+ if (FORY_PREDICT_FALSE(size_ - reader_index_ < 9)) {
+ return read_var_uint64_slow(error);
+ }
uint32_t read_bytes = 0;
uint64_t value = get_var_uint64(reader_index_, &read_bytes);
if (FORY_PREDICT_FALSE(read_bytes == 0)) {
error.set_buffer_out_of_bound(reader_index_, 1, size_);
return 0;
}
- increase_reader_index(read_bytes);
+ reader_index_ += read_bytes;
return value;
}
@@ -943,8 +1024,7 @@ public:
/// If bit 0 is 0, return value >> 1 (arithmetic shift).
/// Otherwise, skip flag byte and read 8 bytes as int64.
FORY_ALWAYS_INLINE int64_t read_tagged_int64(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 4, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
return 0;
}
int32_t i = reinterpret_cast<const int32_t *>(data_ + reader_index_)[0];
@@ -952,8 +1032,7 @@ public:
reader_index_ += 4;
return static_cast<int64_t>(i >> 1); // arithmetic right shift
} else {
- if (FORY_PREDICT_FALSE(reader_index_ + 9 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 9, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(9, error))) {
return 0;
}
int64_t value =
@@ -982,8 +1061,7 @@ public:
/// If bit 0 is 0, return value >> 1.
/// Otherwise, skip flag byte and read 8 bytes as uint64.
FORY_ALWAYS_INLINE uint64_t read_tagged_uint64(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 4 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 4, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
return 0;
}
uint32_t i = reinterpret_cast<const uint32_t *>(data_ + reader_index_)[0];
@@ -991,8 +1069,7 @@ public:
reader_index_ += 4;
return static_cast<uint64_t>(i >> 1);
} else {
- if (FORY_PREDICT_FALSE(reader_index_ + 9 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 9, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(9, error))) {
return 0;
}
uint64_t value =
@@ -1004,98 +1081,54 @@ public:
/// Read uint64_t value as varuint36small. Sets error on bounds violation.
FORY_ALWAYS_INLINE uint64_t read_var_uint36_small(Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + 1 > size_)) {
- error.set_buffer_out_of_bound(reader_index_, 1, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
return 0;
}
uint32_t offset = reader_index_;
- // Fast path: need at least 8 bytes for safe bulk read
- if (FORY_PREDICT_TRUE(size_ - offset >= 8)) {
- uint64_t bulk = *reinterpret_cast<uint64_t *>(data_ + offset);
-
- uint64_t result = bulk & 0x7F;
- if ((bulk & 0x80) == 0) {
- increase_reader_index(1);
- return result;
- }
- result |= (bulk >> 1) & 0x3F80;
- if ((bulk & 0x8000) == 0) {
- increase_reader_index(2);
- return result;
- }
- result |= (bulk >> 2) & 0x1FC000;
- if ((bulk & 0x800000) == 0) {
- increase_reader_index(3);
- return result;
- }
- result |= (bulk >> 3) & 0xFE00000;
- if ((bulk & 0x80000000) == 0) {
- increase_reader_index(4);
- return result;
- }
- // 5th byte for bits 28-35 (up to 36 bits)
- result |= (bulk >> 4) & 0xFF0000000ULL;
- increase_reader_index(5);
+ if (FORY_PREDICT_FALSE(size_ - offset < 8)) {
+ return read_var_uint36_small_slow(error);
+ }
+ // Fast path: need at least 8 bytes for safe bulk read.
+ uint64_t bulk = *reinterpret_cast<uint64_t *>(data_ + offset);
+ uint64_t result = bulk & 0x7F;
+ if ((bulk & 0x80) == 0) {
+ reader_index_ = offset + 1;
return result;
}
- // Slow path: byte-by-byte read
- uint32_t position = offset;
- uint8_t b = data_[position++];
- uint64_t result = b & 0x7F;
- if ((b & 0x80) != 0) {
- if (FORY_PREDICT_FALSE(position >= size_)) {
- error.set_buffer_out_of_bound(position, 1, size_);
- return 0;
- }
- b = data_[position++];
- result |= static_cast<uint64_t>(b & 0x7F) << 7;
- if ((b & 0x80) != 0) {
- if (FORY_PREDICT_FALSE(position >= size_)) {
- error.set_buffer_out_of_bound(position, 1, size_);
- return 0;
- }
- b = data_[position++];
- result |= static_cast<uint64_t>(b & 0x7F) << 14;
- if ((b & 0x80) != 0) {
- if (FORY_PREDICT_FALSE(position >= size_)) {
- error.set_buffer_out_of_bound(position, 1, size_);
- return 0;
- }
- b = data_[position++];
- result |= static_cast<uint64_t>(b & 0x7F) << 21;
- if ((b & 0x80) != 0) {
- if (FORY_PREDICT_FALSE(position >= size_)) {
- error.set_buffer_out_of_bound(position, 1, size_);
- return 0;
- }
- b = data_[position++];
- result |= static_cast<uint64_t>(b & 0xFF) << 28;
- }
- }
- }
+ result |= (bulk >> 1) & 0x3F80;
+ if ((bulk & 0x8000) == 0) {
+ reader_index_ = offset + 2;
+ return result;
+ }
+ result |= (bulk >> 2) & 0x1FC000;
+ if ((bulk & 0x800000) == 0) {
+ reader_index_ = offset + 3;
+ return result;
}
- increase_reader_index(position - offset);
+ result |= (bulk >> 3) & 0xFE00000;
+ if ((bulk & 0x80000000) == 0) {
+ reader_index_ = offset + 4;
+ return result;
+ }
+ // 5th byte for bits 28-35 (up to 36 bits)
+ result |= (bulk >> 4) & 0xFF0000000ULL;
+ reader_index_ = offset + 5;
return result;
}
/// Read raw bytes from buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE void read_bytes(void *data, uint32_t length,
Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + length > size_)) {
- error.set_buffer_out_of_bound(reader_index_, length, size_);
+ if (FORY_PREDICT_FALSE(!ensure_readable(length, error))) {
return;
}
copy(reader_index_, length, static_cast<uint8_t *>(data));
- increase_reader_index(length);
+ reader_index_ += length;
}
/// skip bytes in buffer. Sets error on bounds violation.
FORY_ALWAYS_INLINE void skip(uint32_t length, Error &error) {
- if (FORY_PREDICT_FALSE(reader_index_ + length > size_)) {
- error.set_buffer_out_of_bound(reader_index_, length, size_);
- return;
- }
- increase_reader_index(length);
+ increase_reader_index(length, error);
}
/// Return true if both buffers are the same size and contain the same bytes
@@ -1190,12 +1223,147 @@ public:
std::string hex() const;
private:
+ friend class ForyInputStream;
+ friend class PythonStreamReader;
+
+ FORY_ALWAYS_INLINE void rebind_stream_reader_to_this() {
+ if (stream_reader_ == nullptr) {
+ return;
+ }
+ stream_reader_->bind_buffer(this);
+ FORY_CHECK(&stream_reader_->get_buffer() == this)
+ << "StreamReader must hold and return the same Buffer instance";
+ }
+
+ FORY_ALWAYS_INLINE void detach_stream_reader_from_this() {
+ if (stream_reader_ == nullptr) {
+ return;
+ }
+ if (&stream_reader_->get_buffer() == this) {
+ stream_reader_->bind_buffer(nullptr);
+ }
+ }
+
+ FORY_ALWAYS_INLINE bool fill_buffer(uint32_t min_fill_size, Error &error) {
+ if (FORY_PREDICT_TRUE(min_fill_size <= size_ - reader_index_)) {
+ return true;
+ }
+ if (FORY_PREDICT_TRUE(stream_reader_ == nullptr)) {
+ error.set_buffer_out_of_bound(reader_index_, min_fill_size, size_);
+ return false;
+ }
+ auto fill_result = stream_reader_->fill_buffer(min_fill_size);
+ if (FORY_PREDICT_FALSE(!fill_result.ok())) {
+ error = std::move(fill_result).error();
+ return false;
+ }
+ if (FORY_PREDICT_FALSE(min_fill_size > size_ - reader_index_)) {
+ error.set_buffer_out_of_bound(reader_index_, min_fill_size, size_);
+ return false;
+ }
+ return true;
+ }
+
+ FORY_ALWAYS_INLINE uint32_t read_var_uint32_slow(Error &error) {
+ uint32_t position = reader_index_;
+ uint32_t result = 0;
+ for (int i = 0; i < 5; ++i) {
+ if (FORY_PREDICT_FALSE(!ensure_readable(i + 1, error))) {
+ return 0;
+ }
+ uint8_t b = data_[position++];
+ result |= static_cast<uint32_t>(b & 0x7F) << (i * 7);
+ if ((b & 0x80) == 0) {
+ reader_index_ = position;
+ return result;
+ }
+ }
+ error.set_error(ErrorCode::InvalidData, "Invalid var_uint32 encoding");
+ return 0;
+ }
+
+ FORY_ALWAYS_INLINE uint64_t read_var_uint64_slow(Error &error) {
+ uint32_t position = reader_index_;
+ uint64_t result = 0;
+ for (int i = 0; i < 8; ++i) {
+ if (FORY_PREDICT_FALSE(!ensure_readable(i + 1, error))) {
+ return 0;
+ }
+ uint8_t b = data_[position++];
+ result |= static_cast<uint64_t>(b & 0x7F) << (i * 7);
+ if ((b & 0x80) == 0) {
+ reader_index_ = position;
+ return result;
+ }
+ }
+ if (FORY_PREDICT_FALSE(!ensure_readable(9, error))) {
+ return 0;
+ }
+ uint8_t b = data_[position++];
+ result |= static_cast<uint64_t>(b) << 56;
+ reader_index_ = position;
+ return result;
+ }
+
+ FORY_ALWAYS_INLINE uint64_t read_var_uint36_small_slow(Error &error) {
+ uint32_t position = reader_index_;
+ if (FORY_PREDICT_FALSE(!ensure_readable(1, error))) {
+ return 0;
+ }
+ uint8_t b = data_[position++];
+ uint64_t result = b & 0x7F;
+ if ((b & 0x80) == 0) {
+ reader_index_ = position;
+ return result;
+ }
+
+ if (FORY_PREDICT_FALSE(!ensure_readable(2, error))) {
+ return 0;
+ }
+ b = data_[position++];
+ result |= static_cast<uint64_t>(b & 0x7F) << 7;
+ if ((b & 0x80) == 0) {
+ reader_index_ = position;
+ return result;
+ }
+
+ if (FORY_PREDICT_FALSE(!ensure_readable(3, error))) {
+ return 0;
+ }
+ b = data_[position++];
+ result |= static_cast<uint64_t>(b & 0x7F) << 14;
+ if ((b & 0x80) == 0) {
+ reader_index_ = position;
+ return result;
+ }
+
+ if (FORY_PREDICT_FALSE(!ensure_readable(4, error))) {
+ return 0;
+ }
+ b = data_[position++];
+ result |= static_cast<uint64_t>(b & 0x7F) << 21;
+ if ((b & 0x80) == 0) {
+ reader_index_ = position;
+ return result;
+ }
+
+ if (FORY_PREDICT_FALSE(!ensure_readable(5, error))) {
+ return 0;
+ }
+ b = data_[position++];
+ result |= static_cast<uint64_t>(b) << 28;
+ reader_index_ = position;
+ return result;
+ }
+
uint8_t *data_;
uint32_t size_;
bool own_data_;
uint32_t writer_index_;
uint32_t reader_index_;
std::vector<uint8_t> *wrapped_vector_ = nullptr;
+ StreamReader *stream_reader_ = nullptr;
+ std::shared_ptr<StreamReader> stream_reader_owner_;
};
/// \brief Allocate a fixed-size mutable buffer from the default memory pool
diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc
index 1a49f02a5..d06a65ba9 100644
--- a/cpp/fory/util/buffer_test.cc
+++ b/cpp/fory/util/buffer_test.cc
@@ -25,9 +25,51 @@
#include "gtest/gtest.h"
#include "fory/util/buffer.h"
+#include "fory/util/stream.h"
namespace fory {
+class OneByteStreamBuf : public std::streambuf {
+public:
+ explicit OneByteStreamBuf(std::vector<uint8_t> data)
+ : data_(std::move(data)), pos_(0) {}
+
+protected:
+ std::streamsize xsgetn(char *s, std::streamsize count) override {
+ if (pos_ >= data_.size() || count <= 0) {
+ return 0;
+ }
+ s[0] = static_cast<char>(data_[pos_]);
+ ++pos_;
+ return 1;
+ }
+
+ int_type underflow() override {
+ if (pos_ >= data_.size()) {
+ return traits_type::eof();
+ }
+ current_ = static_cast<char>(data_[pos_]);
+ setg(¤t_, ¤t_, ¤t_ + 1);
+ return traits_type::to_int_type(current_);
+ }
+
+private:
+ std::vector<uint8_t> data_;
+ size_t pos_;
+ char current_ = 0;
+};
+
+class OneByteIStream : public std::istream {
+public:
+ explicit OneByteIStream(std::vector<uint8_t> data)
+ : std::istream(nullptr), buf_(std::move(data)) {
+ rdbuf(&buf_);
+ }
+
+private:
+ OneByteStreamBuf buf_;
+};
+
TEST(Buffer, to_string) {
std::shared_ptr<Buffer> buffer;
allocate_buffer(16, &buffer);
@@ -170,6 +212,107 @@ TEST(Buffer, TestReadVarUint36SmallTruncated) {
EXPECT_FALSE(error.ok());
EXPECT_EQ(buffer.reader_index(), 0U);
}
+
+TEST(Buffer, StreamReadFromOneByteSource) {
+ std::vector<uint8_t> raw;
+ raw.reserve(64);
+ Buffer writer(raw);
+ writer.write_uint32(0x01020304U);
+ writer.write_int64(-1234567890LL);
+ writer.write_var_uint32(300);
+ writer.write_var_int64(-4567890123LL);
+ writer.write_tagged_uint64(0x123456789ULL);
+ writer.write_var_uint36_small(0x1FFFFULL);
+
+ raw.resize(writer.writer_index());
+ OneByteIStream one_byte_stream(raw);
+ ForyInputStream stream(one_byte_stream, 8);
+ Buffer reader(stream);
+ Error error;
+
+ EXPECT_EQ(reader.read_uint32(error), 0x01020304U);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.read_int64(error), -1234567890LL);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.read_var_uint32(error), 300U);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.read_var_int64(error), -4567890123LL);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.read_tagged_uint64(error), 0x123456789ULL);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.read_var_uint36_small(error), 0x1FFFFULL);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+}
+
+TEST(Buffer, StreamGetAndReaderIndexFromOneByteSource) {
+ std::vector<uint8_t> raw{0x11, 0x22, 0x33, 0x44, 0x55};
+ OneByteIStream one_byte_stream(raw);
+ ForyInputStream stream(one_byte_stream, 2);
+ Buffer reader(stream);
+ Error error;
+ ASSERT_TRUE(reader.ensure_readable(4, error)) << error.to_string();
+
+ EXPECT_EQ(reader.get<uint32_t>(0), 0x44332211U);
+ reader.reader_index(4);
+ EXPECT_EQ(reader.read_uint8(error), 0x55);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+}
+
+TEST(Buffer, StreamReadBytesAndSkipAdvanceReaderIndex) {
+ std::vector<uint8_t> raw{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+ OneByteIStream one_byte_stream(raw);
+ ForyInputStream stream(one_byte_stream, 2);
+ Buffer reader(stream);
+ Error error;
+ uint8_t out[5] = {0};
+
+ reader.read_bytes(out, 5, error);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.reader_index(), 5U);
+ EXPECT_EQ(out[0], 0U);
+ EXPECT_EQ(out[4], 4U);
+
+ reader.skip(3, error);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+ EXPECT_EQ(reader.reader_index(), 8U);
+ EXPECT_EQ(reader.read_uint8(error), 8U);
+ ASSERT_TRUE(error.ok()) << error.to_string();
+}
+
+TEST(Buffer, StreamSkipAndUnread) {
+ std::vector<uint8_t> raw{0x01, 0x02, 0x03, 0x04, 0x05};
+ OneByteIStream one_byte_stream(raw);
+ ForyInputStream stream(one_byte_stream, 2);
+ auto fill_result = stream.fill_buffer(4);
+ ASSERT_TRUE(fill_result.ok()) << fill_result.error().to_string();
+
+ Buffer &view = stream.get_buffer();
+ EXPECT_EQ(view.size(), 4U);
+ EXPECT_EQ(view.reader_index(), 0U);
+
+ auto skip_result = stream.skip(3);
+ ASSERT_TRUE(skip_result.ok()) << skip_result.error().to_string();
+ EXPECT_EQ(view.reader_index(), 3U);
+
+ auto unread_result = stream.unread(2);
+ ASSERT_TRUE(unread_result.ok()) << unread_result.error().to_string();
+ EXPECT_EQ(view.reader_index(), 1U);
+
+ skip_result = stream.skip(1);
+ ASSERT_TRUE(skip_result.ok()) << skip_result.error().to_string();
+ EXPECT_EQ(view.reader_index(), 2U);
+}
+
+TEST(Buffer, StreamReadErrorWhenInsufficientData) {
+ std::vector<uint8_t> raw{0x01, 0x02, 0x03};
+ OneByteIStream one_byte_stream(raw);
+ ForyInputStream stream(one_byte_stream, 2);
+ Buffer reader(stream);
+ Error error;
+ EXPECT_EQ(reader.read_uint32(error), 0U);
+ EXPECT_FALSE(error.ok());
+ EXPECT_EQ(error.code(), ErrorCode::BufferOutOfBound);
+}
} // namespace fory
int main(int argc, char **argv) {
diff --git a/cpp/fory/util/stream.cc b/cpp/fory/util/stream.cc
new file mode 100644
index 000000000..a8466ab83
--- /dev/null
+++ b/cpp/fory/util/stream.cc
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fory/util/stream.h"
+
+#include <algorithm>
+#include <cstring>
+#include <limits>
+
+#include "fory/util/buffer.h"
+#include "fory/util/logging.h"
+
+namespace fory {
+
+ForyInputStream::ForyInputStream(std::istream &stream, uint32_t buffer_size)
+ : stream_(&stream),
+ data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+ initial_buffer_size_(
+ std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+ owned_buffer_(std::make_unique<Buffer>()) {
+ bind_buffer(owned_buffer_.get());
+}
+
+ForyInputStream::ForyInputStream(std::shared_ptr<std::istream> stream,
+ uint32_t buffer_size)
+ : stream_owner_(std::move(stream)), stream_(stream_owner_.get()),
+ data_(std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+ initial_buffer_size_(
+ std::max<uint32_t>(buffer_size, static_cast<uint32_t>(1))),
+ owned_buffer_(std::make_unique<Buffer>()) {
+ FORY_CHECK(stream_owner_ != nullptr) << "stream must not be null";
+ bind_buffer(owned_buffer_.get());
+}
+
+ForyInputStream::~ForyInputStream() = default;
+
+Result<void, Error> ForyInputStream::fill_buffer(uint32_t min_fill_size) {
+ if (min_fill_size == 0 || remaining_size() >= min_fill_size) {
+ return Result<void, Error>();
+ }
+
+ const uint32_t read_pos = buffer_->reader_index_;
+ const uint32_t deficit = min_fill_size - remaining_size();
+ constexpr uint64_t k_max_u32 = std::numeric_limits<uint32_t>::max();
+ const uint64_t required = static_cast<uint64_t>(buffer_->size_) + deficit;
+ if (required > k_max_u32) {
+ return Unexpected(
+ Error::out_of_bound("stream buffer size exceeds uint32 range"));
+ }
+ if (required > data_.size()) {
+ uint64_t new_size =
+ std::max<uint64_t>(required, static_cast<uint64_t>(data_.size()) * 2);
+ if (new_size > k_max_u32) {
+ new_size = k_max_u32;
+ }
+ reserve(static_cast<uint32_t>(new_size));
+ }
+
+ std::streambuf *source = stream_->rdbuf();
+ if (source == nullptr) {
+ return Unexpected(Error::io_error("input stream has no stream buffer"));
+ }
+ uint32_t write_pos = buffer_->size_;
+ while (remaining_size() < min_fill_size) {
+ uint32_t writable = static_cast<uint32_t>(data_.size()) - write_pos;
+ const std::streamsize read_bytes =
+ source->sgetn(reinterpret_cast<char *>(data_.data() + write_pos),
+ static_cast<std::streamsize>(writable));
+ if (read_bytes <= 0) {
+ return Unexpected(Error::buffer_out_of_bound(read_pos, min_fill_size,
+ remaining_size()));
+ }
+ write_pos += static_cast<uint32_t>(read_bytes);
+ buffer_->size_ = write_pos;
+ }
+ return Result<void, Error>();
+}
+
+Result<void, Error> ForyInputStream::read_to(uint8_t *dst, uint32_t length) {
+ if (length == 0) {
+ return Result<void, Error>();
+ }
+ Error error;
+ if (FORY_PREDICT_FALSE(!buffer_->ensure_readable(length, error))) {
+ return Unexpected(std::move(error));
+ }
+ std::memcpy(dst, buffer_->data_ + buffer_->reader_index_,
+ static_cast<size_t>(length));
+ buffer_->reader_index_ += length;
+ return Result<void, Error>();
+}
+
+Result<void, Error> ForyInputStream::skip(uint32_t size) {
+ if (size == 0) {
+ return Result<void, Error>();
+ }
+ Error error;
+ buffer_->increase_reader_index(size, error);
+ if (FORY_PREDICT_FALSE(!error.ok())) {
+ return Unexpected(std::move(error));
+ }
+ return Result<void, Error>();
+}
+
+Result<void, Error> ForyInputStream::unread(uint32_t size) {
+ if (FORY_PREDICT_FALSE(size > buffer_->reader_index_)) {
+ return Unexpected(Error::buffer_out_of_bound(buffer_->reader_index_, size,
+ buffer_->size_));
+ }
+ buffer_->reader_index_ -= size;
+ return Result<void, Error>();
+}
+
+void ForyInputStream::shrink_buffer() {
+ if (buffer_ == nullptr) {
+ return;
+ }
+
+ const uint32_t read_pos = buffer_->reader_index_;
+ const uint32_t remaining = remaining_size();
+ if (read_pos > 0) {
+ if (remaining > 0) {
+ std::memmove(data_.data(), data_.data() + read_pos,
+ static_cast<size_t>(remaining));
+ }
+ buffer_->reader_index_ = 0;
+ buffer_->size_ = remaining;
+ buffer_->writer_index_ = remaining;
+ }
+
+ const uint32_t current_capacity = static_cast<uint32_t>(data_.size());
+ uint32_t target_capacity = current_capacity;
+ if (current_capacity > initial_buffer_size_) {
+ if (remaining == 0) {
+ target_capacity = initial_buffer_size_;
+ } else if (remaining <= current_capacity / 4) {
+ const uint32_t doubled =
+ remaining > std::numeric_limits<uint32_t>::max() / 2
+ ? std::numeric_limits<uint32_t>::max()
+ : remaining * 2;
+ target_capacity = std::max<uint32_t>(
+ initial_buffer_size_,
+ std::max<uint32_t>(doubled, static_cast<uint32_t>(1)));
+ }
+ }
+
+ if (target_capacity < current_capacity) {
+ data_.resize(target_capacity);
+ data_.shrink_to_fit();
+ buffer_->data_ = data_.data();
+ }
+}
+
+Buffer &ForyInputStream::get_buffer() { return *buffer_; }
+
+uint32_t ForyInputStream::remaining_size() const {
+ return buffer_->size_ - buffer_->reader_index_;
+}
+
+void ForyInputStream::reserve(uint32_t new_size) {
+ data_.resize(new_size);
+ buffer_->data_ = data_.data();
+}
+
+void ForyInputStream::bind_buffer(Buffer *buffer) {
+ Buffer *target = buffer == nullptr ? owned_buffer_.get() : buffer;
+ if (target == nullptr) {
+ if (buffer_ != nullptr) {
+ buffer_->stream_reader_ = nullptr;
+ }
+ buffer_ = nullptr;
+ return;
+ }
+
+ if (buffer_ == target) {
+ buffer_->data_ = data_.data();
+ buffer_->own_data_ = false;
+ buffer_->wrapped_vector_ = nullptr;
+ buffer_->stream_reader_ = this;
+ return;
+ }
+
+ Buffer *source = buffer_;
+ if (source != nullptr) {
+ target->size_ = source->size_;
+ target->writer_index_ = source->writer_index_;
+ target->reader_index_ = source->reader_index_;
+ source->stream_reader_ = nullptr;
+ } else {
+ target->size_ = 0;
+ target->writer_index_ = 0;
+ target->reader_index_ = 0;
+ }
+
+ buffer_ = target;
+ buffer_->data_ = data_.data();
+ buffer_->own_data_ = false;
+ buffer_->wrapped_vector_ = nullptr;
+ buffer_->stream_reader_ = this;
+}
+
+} // namespace fory
diff --git a/cpp/fory/util/stream.h b/cpp/fory/util/stream.h
new file mode 100644
index 000000000..be37e66b8
--- /dev/null
+++ b/cpp/fory/util/stream.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <istream>
+#include <memory>
+#include <vector>
+
+#include "fory/util/error.h"
+#include "fory/util/result.h"
+
+namespace fory {
+
+class Buffer;
+
+class StreamReader : public std::enable_shared_from_this<StreamReader> {
+public:
+ virtual ~StreamReader() = default;
+
+ virtual Result<void, Error> fill_buffer(uint32_t min_fill_size) = 0;
+
+ virtual Result<void, Error> read_to(uint8_t *dst, uint32_t length) = 0;
+
+ virtual Result<void, Error> skip(uint32_t size) = 0;
+
+ virtual Result<void, Error> unread(uint32_t size) = 0;
+
+ virtual void shrink_buffer() = 0;
+
+ virtual Buffer &get_buffer() = 0;
+
+ // Bind the reader to an external Buffer. Passing nullptr rebinds to the
+ // reader-owned internal buffer.
+ virtual void bind_buffer(Buffer *buffer) = 0;
+};
+
+class ForyInputStream final : public StreamReader {
+public:
+ explicit ForyInputStream(std::istream &stream, uint32_t buffer_size = 4096);
+
+ explicit ForyInputStream(std::shared_ptr<std::istream> stream,
+ uint32_t buffer_size = 4096);
+
+ ~ForyInputStream() override;
+
+ Result<void, Error> fill_buffer(uint32_t min_fill_size) override;
+
+ Result<void, Error> read_to(uint8_t *dst, uint32_t length) override;
+
+ Result<void, Error> skip(uint32_t size) override;
+
+ Result<void, Error> unread(uint32_t size) override;
+
+ void shrink_buffer() override;
+
+ Buffer &get_buffer() override;
+
+ void bind_buffer(Buffer *buffer) override;
+
+private:
+ uint32_t remaining_size() const;
+
+ void reserve(uint32_t new_size);
+
+ std::shared_ptr<std::istream> stream_owner_;
+ std::istream *stream_ = nullptr;
+ std::vector<uint8_t> data_;
+ uint32_t initial_buffer_size_ = 1;
+ Buffer *buffer_ = nullptr;
+ std::unique_ptr<Buffer> owned_buffer_;
+};
+
+} // namespace fory
diff --git a/python/pyfory/buffer.pxd b/python/pyfory/buffer.pxd
index d6f02f133..a4a33667c 100644
--- a/python/pyfory/buffer.pxd
+++ b/python/pyfory/buffer.pxd
@@ -186,8 +186,6 @@ cdef class Buffer:
cdef inline write_c_buffer(self, const uint8_t* value, int32_t length)
- cdef inline int32_t read_c_buffer(self, uint8_t** buf)
-
cpdef inline write_bytes_and_size(self, bytes value)
cpdef inline bytes read_bytes_and_size(self)
diff --git a/python/pyfory/buffer.pyx b/python/pyfory/buffer.pyx
index 3f8e0935c..e0a3a0bf3 100644
--- a/python/pyfory/buffer.pyx
+++ b/python/pyfory/buffer.pyx
@@ -34,6 +34,7 @@ from pyfory.includes.libutil cimport(
CBuffer, allocate_buffer, get_bit as c_get_bit, set_bit as c_set_bit,
clear_bit as c_clear_bit,
set_bit_to as c_set_bit_to, CError, CErrorCode, CResultVoidError,
utf16_has_surrogate_pairs
)
+from pyfory.includes.libpyfory cimport Fory_PyCreateBufferFromStream
import os
from pyfory.error import raise_fory_error
@@ -69,6 +70,24 @@ cdef class Buffer:
self.c_buffer.reader_index(0)
self.c_buffer.writer_index(0)
+ @classmethod
+ def from_stream(cls, stream not None, uint32_t buffer_size=4096):
+ cdef CBuffer* stream_buffer
+ cdef c_string stream_error
+ if Fory_PyCreateBufferFromStream(
+ <PyObject*>stream, buffer_size, &stream_buffer, &stream_error
+ ) != 0:
+ raise ValueError(stream_error.decode("UTF-8"))
+ if stream_buffer == NULL:
+ raise ValueError("failed to create stream buffer")
+ cdef Buffer buffer = Buffer.__new__(Buffer)
+ buffer.c_buffer = move(deref(stream_buffer))
+ del stream_buffer
+ buffer.data = stream
+ buffer.c_buffer.reader_index(0)
+ buffer.c_buffer.writer_index(0)
+ return buffer
+
@staticmethod
cdef Buffer wrap(shared_ptr[CBuffer] c_buffer):
cdef Buffer buffer = Buffer.__new__(Buffer)
@@ -109,7 +128,8 @@ cdef class Buffer:
cpdef inline void set_reader_index(self, int32_t value):
if value < 0:
raise ValueError("reader_index must be >= 0")
- self.c_buffer.reader_index(<uint32_t>value)
+ if not self.c_buffer.reader_index(<uint32_t>value, self._error):
+ self._raise_if_error()
cpdef inline int32_t get_writer_index(self):
return <int32_t>self.c_buffer.writer_index()
@@ -287,11 +307,23 @@ cdef class Buffer:
cpdef inline int64_t read_bytes_as_int64(self, int32_t length):
cdef int64_t result = 0
- cdef uint32_t offset = self.c_buffer.reader_index()
- cdef CResultVoidError res = self.c_buffer.get_bytes_as_int64(offset,
length, &result)
+ cdef uint32_t offset
+ cdef CResultVoidError res
+ if length == 0:
+ return 0
+ if length < 0 or length > 8:
+ raise_fory_error(
+ CErrorCode.InvalidData,
+ f"get_bytes_as_int64 length should be in range [0, 8], but got
{length}",
+ )
+ if not self.c_buffer.ensure_readable(<uint32_t>length, self._error):
+ self._raise_if_error()
+ offset = self.c_buffer.reader_index()
+ res = self.c_buffer.get_bytes_as_int64(offset, <uint32_t>length,
&result)
if not res.ok():
raise_fory_error(res.error().code(), res.error().message())
- self.c_buffer.increase_reader_index(length)
+ self.c_buffer.increase_reader_index(<uint32_t>length, self._error)
+ self._raise_if_error()
return result
cpdef inline put_bytes(self, uint32_t offset, bytes value):
@@ -502,15 +534,6 @@ cdef class Buffer:
self.c_buffer.copy_from(offset, value, 0, length)
self.c_buffer.increase_writer_index(length)
- cdef inline int32_t read_c_buffer(self, uint8_t** buf):
- cdef int32_t length = self.read_var_uint32()
- cdef uint8_t* binary_data = self.c_buffer.data()
- cdef uint32_t offset = self.c_buffer.reader_index()
- self.check_bound(offset, length)
- buf[0] = binary_data + offset
- self.c_buffer.increase_reader_index(length)
- return length
-
cpdef inline write_string(self, str value):
cdef Py_ssize_t length = PyUnicode_GET_LENGTH(value)
cdef int32_t kind = PyUnicode_KIND(value)
@@ -540,11 +563,19 @@ cdef class Buffer:
cpdef inline str read_string(self):
cdef uint64_t header = self.read_var_uint64()
cdef uint32_t size = header >> 2
- cdef uint32_t offset = self.c_buffer.reader_index()
- self.check_bound(offset, size)
- cdef const char * buf = <const char *>(self.c_buffer.data() + offset)
- self.c_buffer.increase_reader_index(size)
cdef uint32_t encoding = header & <uint32_t>0b11
+ if size == 0:
+ return ""
+ cdef uint32_t offset = self.c_buffer.reader_index()
+ cdef uint32_t available = self.c_buffer.size() - offset
+ cdef const char * buf
+ cdef bytes py_bytes
+ if available >= size:
+ buf = <const char *>(self.c_buffer.data() + offset)
+ self.c_buffer.reader_index(offset + size)
+ else:
+ py_bytes = self.read_bytes(<int32_t>size)
+ buf = <const char *>PyBytes_AS_STRING(py_bytes)
if encoding == 0:
# PyUnicode_FromASCII
return PyUnicode_DecodeLatin1(buf, size, "strict")
diff --git a/python/pyfory/includes/libpyfory.pxd
b/python/pyfory/includes/libpyfory.pxd
new file mode 100644
index 000000000..c0e888c18
--- /dev/null
+++ b/python/pyfory/includes/libpyfory.pxd
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cpython.object cimport PyObject
+from libc.stdint cimport uint32_t
+from libcpp.string cimport string as c_string
+
+from pyfory.includes.libutil cimport CBuffer
+
+cdef extern from "fory/python/pyfory.h" namespace "fory":
+ int Fory_PyCreateBufferFromStream(PyObject* stream, uint32_t buffer_size,
+ CBuffer** out, c_string* error_message)
diff --git a/python/pyfory/includes/libutil.pxd
b/python/pyfory/includes/libutil.pxd
index 3328889b0..50373e19c 100644
--- a/python/pyfory/includes/libutil.pxd
+++ b/python/pyfory/includes/libutil.pxd
@@ -71,13 +71,17 @@ cdef extern from "fory/util/buffer.h" namespace "fory"
nogil:
inline uint32_t reader_index()
+ inline c_bool ensure_readable(uint32_t length, CError& error)
+
inline void writer_index(uint32_t writer_index)
inline void increase_writer_index(uint32_t diff)
inline void reader_index(uint32_t reader_index)
- inline void increase_reader_index(uint32_t diff)
+ inline c_bool reader_index(uint32_t reader_index, CError& error)
+
+ inline void increase_reader_index(uint32_t diff, CError& error)
void grow(uint32_t min_capacity)
diff --git a/python/pyfory/tests/test_buffer.py
b/python/pyfory/tests/test_buffer.py
index 9080d966b..c5e0c9007 100644
--- a/python/pyfory/tests/test_buffer.py
+++ b/python/pyfory/tests/test_buffer.py
@@ -15,13 +15,56 @@
# specific language governing permissions and limitations
# under the License.
+import pytest
+
from pyfory.buffer import Buffer
from pyfory.tests.core import require_pyarrow
+from pyfory.tests.test_stream import OneByteStream
from pyfory.utils import lazy_import
pa = lazy_import("pyarrow")
+class RecvIntoOnlyStream:
+ def __init__(self, data: bytes):
+ self._data = data
+ self._offset = 0
+
+ def recv_into(self, buffer, size=-1):
+ if self._offset >= len(self._data):
+ return 0
+ view = memoryview(buffer).cast("B")
+ if size < 0 or size > len(view):
+ size = len(view)
+ if size == 0:
+ return 0
+ read_size = min(1, size, len(self._data) - self._offset)
+ start = self._offset
+ self._offset += read_size
+ view[:read_size] = self._data[start : start + read_size]
+ return read_size
+
+
+class LegacyRecvIntoOnlyStream:
+ def __init__(self, data: bytes):
+ self._data = data
+ self._offset = 0
+
+ def recvinto(self, buffer, size=-1):
+ if self._offset >= len(self._data):
+ return 0
+ view = memoryview(buffer).cast("B")
+ if size < 0 or size > len(view):
+ size = len(view)
+ if size == 0:
+ return 0
+ read_size = min(1, size, len(self._data) - self._offset)
+ start = self._offset
+ self._offset += read_size
+ view[:read_size] = self._data[start : start + read_size]
+ return read_size
+
+
def test_buffer():
buffer = Buffer.allocate(8)
buffer.write_bool(True)
@@ -248,5 +291,66 @@ def test_read_bytes_as_int64():
buf.read_bytes_as_int64(8)
+def test_stream_buffer_read():
+ writer = Buffer.allocate(32)
+ writer.write_uint32(0x01020304)
+ writer.write_int64(-1234567890)
+ writer.write_var_uint32(300)
+ writer.write_varint64(-4567890123)
+ writer.write_tagged_uint64(0x123456789)
+ writer.write_var_uint64(0x1FFFF)
+ writer.write_bytes_and_size(b"stream-data")
+ writer.write_string("hello-stream")
+
+ data = writer.get_bytes(0, writer.get_writer_index())
+ stream = OneByteStream(data)
+ reader = Buffer.from_stream(stream)
+
+ assert reader.read_uint32() == 0x01020304
+ assert reader.read_int64() == -1234567890
+ assert reader.read_var_uint32() == 300
+ assert reader.read_varint64() == -4567890123
+ assert reader.read_tagged_uint64() == 0x123456789
+ assert reader.read_var_uint64() == 0x1FFFF
+ assert reader.read_bytes_and_size() == b"stream-data"
+ assert reader.read_string() == "hello-stream"
+
+
+def test_stream_buffer_read_with_recv_into():
+ reader = Buffer.from_stream(RecvIntoOnlyStream(bytes([0x11, 0x22, 0x33,
0x44])))
+ assert reader.read_uint32() == 0x44332211
+
+
+def test_stream_buffer_read_with_legacy_recvinto():
+ reader = Buffer.from_stream(LegacyRecvIntoOnlyStream(bytes([0x11, 0x22,
0x33, 0x44])))
+ assert reader.read_uint32() == 0x44332211
+
+
+def test_stream_buffer_set_reader_index():
+ reader = Buffer.from_stream(OneByteStream(bytes([0x11, 0x22, 0x33, 0x44,
0x55])))
+ reader.set_reader_index(4)
+ assert reader.read_uint8() == 0x55
+
+
+def test_stream_buffer_set_reader_index_out_of_bound():
+ reader = Buffer.from_stream(OneByteStream(b"\x11\x22\x33"))
+ with pytest.raises(Exception, match="Buffer out of bound"):
+ reader.set_reader_index(10)
+
+
+def test_stream_buffer_read_bytes_and_skip_update_reader_index():
+ reader = Buffer.from_stream(OneByteStream(bytes(range(20))), buffer_size=2)
+ assert reader.read_bytes(5) == bytes([0, 1, 2, 3, 4])
+ assert reader.get_reader_index() == 5
+ reader.skip(5)
+ assert reader.get_reader_index() == 10
+
+
+def test_stream_buffer_short_read_error():
+ reader = Buffer.from_stream(OneByteStream(b"\x01\x02\x03"))
+ with pytest.raises(Exception, match="Buffer out of bound"):
+ reader.read_uint32()
+
+
if __name__ == "__main__":
test_grow()
diff --git a/python/pyfory/tests/test_stream.py
b/python/pyfory/tests/test_stream.py
new file mode 100644
index 000000000..a8d6e6e68
--- /dev/null
+++ b/python/pyfory/tests/test_stream.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import pyfory
+from pyfory.buffer import Buffer
+
+
+class OneByteStream:
+ def __init__(self, data: bytes):
+ self._data = data
+ self._offset = 0
+
+ def read(self, size=-1):
+ if self._offset >= len(self._data):
+ return b""
+ if size < 0:
+ size = len(self._data) - self._offset
+ if size == 0:
+ return b""
+ read_size = min(1, size, len(self._data) - self._offset)
+ start = self._offset
+ self._offset += read_size
+ return self._data[start : start + read_size]
+
+ def readinto(self, buffer):
+ if self._offset >= len(self._data):
+ return 0
+ view = memoryview(buffer).cast("B")
+ if len(view) == 0:
+ return 0
+ read_size = min(1, len(view), len(self._data) - self._offset)
+ start = self._offset
+ self._offset += read_size
+ view[:read_size] = self._data[start : start + read_size]
+ return read_size
+
+ def recv_into(self, buffer, size=-1):
+ if self._offset >= len(self._data):
+ return 0
+ view = memoryview(buffer).cast("B")
+ if size < 0 or size > len(view):
+ size = len(view)
+ if size == 0:
+ return 0
+ read_size = min(1, size, len(self._data) - self._offset)
+ start = self._offset
+ self._offset += read_size
+ view[:read_size] = self._data[start : start + read_size]
+ return read_size
+
+ def recvinto(self, buffer, size=-1):
+ return self.recv_into(buffer, size)
+
+
[email protected]("xlang", [False, True])
+def test_stream_roundtrip_primitives_and_strings(xlang):
+ fory = pyfory.Fory(xlang=xlang, ref=True)
+ values = [
+ 0,
+ -123456789,
+ 3.1415926,
+ "stream-hello",
+ "stream-你好-😀",
+ b"binary-data" * 8,
+ [1, 2, 3, 5, 8],
+ ]
+
+ for value in values:
+ data = fory.serialize(value)
+ restored = fory.deserialize(Buffer.from_stream(OneByteStream(data)))
+ assert restored == value
+
+
[email protected]("xlang", [False, True])
+def test_stream_roundtrip_nested_collections(xlang):
+ fory = pyfory.Fory(xlang=xlang, ref=True)
+ value = {
+ "name": "stream-object",
+ "items": [1, 2, {"k1": "v1", "k2": [3, 4, 5]}],
+ "scores": {"a": 10, "b": 20},
+ "flags": [True, False, True],
+ }
+
+ data = fory.serialize(value)
+ restored = fory.deserialize(Buffer.from_stream(OneByteStream(data)))
+ assert restored == value
+
+
+def test_stream_roundtrip_reference_graph_python_mode():
+ fory = pyfory.Fory(xlang=False, ref=True)
+ shared = ["x", 1, 2]
+ value = {"a": shared, "b": shared}
+ cycle = []
+ cycle.append(cycle)
+
+ data_ref = fory.serialize(value)
+ restored_ref =
fory.deserialize(Buffer.from_stream(OneByteStream(data_ref)))
+ assert restored_ref["a"] == shared
+ assert restored_ref["a"] is restored_ref["b"]
+
+ data_cycle = fory.serialize(cycle)
+ restored_cycle =
fory.deserialize(Buffer.from_stream(OneByteStream(data_cycle)))
+ assert restored_cycle[0] is restored_cycle
+
+
[email protected]("xlang", [False, True])
+def test_stream_deserialize_multiple_objects_from_single_stream(xlang):
+ fory = pyfory.Fory(xlang=xlang, ref=True)
+ expected = [
+ 2026,
+ "multi-object-stream",
+ {"k": [1, 2, 3], "nested": {"x": True}},
+ [10, 20, 30, 40],
+ ]
+
+ write_buffer = Buffer.allocate(1024)
+ for obj in expected:
+ fory.serialize(obj, write_buffer)
+
+ reader = Buffer.from_stream(OneByteStream(write_buffer.get_bytes(0,
write_buffer.get_writer_index())))
+ for obj in expected:
+ assert fory.deserialize(reader) == obj
+
+ assert reader.get_reader_index() == reader.size()
+
+
[email protected]("xlang", [False, True])
+def test_stream_deserialize_truncated_error(xlang):
+ fory = pyfory.Fory(xlang=xlang, ref=True)
+ data = fory.serialize({"k": "value", "numbers": [1, 2, 3, 4]})
+ truncated = data[:-1]
+
+ with pytest.raises(Exception):
+ fory.deserialize(Buffer.from_stream(OneByteStream(truncated)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]