This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new 15ff23ddc feat(python): make fory out-of-band serialization compatible
with pickle5 (#2732)
15ff23ddc is described below
commit 15ff23ddc973dd76a8e264af92b2375e56cbbeff
Author: Shawn Yang <[email protected]>
AuthorDate: Fri Oct 10 00:41:11 2025 +0800
feat(python): make fory out-of-band serialization compatible with pickle5
(#2732)
## Why?
Pickle protocol 5, introduced in Python 3.8 (and available via a
backport for earlier versions), provides support for out-of-band
buffers, which significantly improves the efficiency of serializing and
deserializing certain objects, especially those with large memory
footprints like NumPy arrays or dataframes.
pyfory has similar support, we need to make it compatible with pickle5
## What does this PR do?
make fory out-of-band serialization compatible with pickle5
## Related issues
<!--
Is there any related issue? If this PR closes them you say say
fix/closes:
- #xxxx0
- #xxxx1
- Fixes #xxxx2
-->
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fory/issues/new/choose) describing the
need to do so and update the document if necessary.
Delete section if not applicable.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
Delete section if not applicable.
-->
---
python/README.md | 154 ++++++++++++++
python/pyfory/_fory.py | 22 +-
python/pyfory/_registry.py | 3 +
python/pyfory/_serialization.pyx | 37 ++--
python/pyfory/format/serializer.py | 35 +++-
python/pyfory/serializer.py | 92 +++++++--
python/pyfory/tests/test_cross_language.py | 6 +-
python/pyfory/tests/test_pickle_buffer.py | 320 +++++++++++++++++++++++++++++
python/pyfory/tests/test_serializer.py | 2 +-
9 files changed, 622 insertions(+), 49 deletions(-)
diff --git a/python/README.md b/python/README.md
index cf4e46a7f..77b54f036 100644
--- a/python/README.md
+++ b/python/README.md
@@ -24,6 +24,7 @@
- **Shared/circular reference support** for complex object graphs in both
Python-native and cross-language modes
- **Polymorphism support** for customized types with automatic type dispatching
- **Schema evolution** support for backward/forward compatibility when using
dataclasses in cross-language mode
+- **Out-of-band buffer support** for zero-copy serialization of large data
structures like NumPy arrays and Pandas DataFrames, compatible with pickle
protocol 5
### ⚡ **Blazing Fast Performance**
@@ -279,6 +280,159 @@ data = fory.dumps(create_local_class().h)
print(fory.loads(data)(10)) # 100
```
+### Out-of-Band Buffer Serialization
+
+Fory supports pickle5-compatible out-of-band buffer serialization for
efficient zero-copy handling of large data structures. This is particularly
useful for NumPy arrays, Pandas DataFrames, and other objects with large memory
footprints.
+
+Out-of-band serialization separates metadata from the actual data buffers,
allowing for:
+
+- **Zero-copy transfers** when sending data over networks or IPC using
`memoryview`
+- **Improved performance** for large datasets
+- **Pickle5 compatibility** using `pickle.PickleBuffer`
+- **Flexible stream support** - write to any writable object (files, BytesIO,
sockets, etc.)
+
+#### Basic Out-of-Band Serialization
+
+```python
+import pyfory
+import numpy as np
+
+fory = pyfory.Fory(xlang=False, ref=False, strict=False)
+
+# Large numpy array
+array = np.arange(10000, dtype=np.float64)
+
+# Serialize with out-of-band buffers
+buffer_objects = []
+serialized_data = fory.serialize(array, buffer_callback=buffer_objects.append)
+
+# Convert buffer objects to memoryview for zero-copy transmission
+# For contiguous buffers (bytes, numpy arrays), this is zero-copy
+# For non-contiguous data, a copy may be created to ensure contiguity
+buffers = [obj.getbuffer() for obj in buffer_objects]
+
+# Deserialize with out-of-band buffers (accepts memoryview, bytes, or Buffer)
+deserialized_array = fory.deserialize(serialized_data, buffers=buffers)
+
+assert np.array_equal(array, deserialized_array)
+```
+
+#### Out-of-Band with Pandas DataFrames
+
+```python
+import pyfory
+import pandas as pd
+import numpy as np
+
+fory = pyfory.Fory(xlang=False, ref=False, strict=False)
+
+# Create a DataFrame with numeric columns
+df = pd.DataFrame({
+ 'a': np.arange(1000, dtype=np.float64),
+ 'b': np.arange(1000, dtype=np.int64),
+ 'c': ['text'] * 1000
+})
+
+# Serialize with out-of-band buffers
+buffer_objects = []
+serialized_data = fory.serialize(df, buffer_callback=buffer_objects.append)
+buffers = [obj.getbuffer() for obj in buffer_objects]
+
+# Deserialize
+deserialized_df = fory.deserialize(serialized_data, buffers=buffers)
+
+assert df.equals(deserialized_df)
+```
+
+#### Selective Out-of-Band Serialization
+
+You can control which buffers go out-of-band by providing a callback that
returns `True` to keep data in-band or `False` (and appending to a list) to
send it out-of-band:
+
+```python
+import pyfory
+import numpy as np
+
+fory = pyfory.Fory(xlang=False, ref=True, strict=False)
+
+arr1 = np.arange(1000, dtype=np.float64)
+arr2 = np.arange(2000, dtype=np.float64)
+data = [arr1, arr2]
+
+buffer_objects = []
+counter = 0
+
+def selective_callback(buffer_object):
+ global counter
+ counter += 1
+ # Only send even-numbered buffers out-of-band
+ if counter % 2 == 0:
+ buffer_objects.append(buffer_object)
+ return False # Out-of-band
+ return True # In-band
+
+serialized = fory.serialize(data, buffer_callback=selective_callback)
+buffers = [obj.getbuffer() for obj in buffer_objects]
+deserialized = fory.deserialize(serialized, buffers=buffers)
+```
+
+#### Pickle5 Compatibility
+
+Fory's out-of-band serialization is fully compatible with pickle protocol 5.
When objects implement `__reduce_ex__(protocol)`, Fory automatically uses
protocol 5 to enable `pickle.PickleBuffer` support:
+
+```python
+import pyfory
+import pickle
+
+fory = pyfory.Fory(xlang=False, ref=False, strict=False)
+
+# PickleBuffer objects are automatically supported
+data = b"Large binary data"
+pickle_buffer = pickle.PickleBuffer(data)
+
+# Serialize with buffer callback for out-of-band handling
+buffer_objects = []
+serialized = fory.serialize(pickle_buffer,
buffer_callback=buffer_objects.append)
+buffers = [obj.getbuffer() for obj in buffer_objects]
+
+# Deserialize with buffers
+deserialized = fory.deserialize(serialized, buffers=buffers)
+assert bytes(deserialized.raw()) == data
+```
+
+#### Writing Buffers to Different Streams
+
+The `BufferObject.write_to()` method accepts any writable stream object,
making it flexible for various use cases:
+
+```python
+import pyfory
+import numpy as np
+import io
+
+fory = pyfory.Fory(xlang=False, ref=False, strict=False)
+
+array = np.arange(1000, dtype=np.float64)
+
+# Collect out-of-band buffers
+buffer_objects = []
+serialized = fory.serialize(array, buffer_callback=buffer_objects.append)
+
+# Write to different stream types
+for buffer_obj in buffer_objects:
+ # Write to BytesIO (in-memory stream)
+ bytes_stream = io.BytesIO()
+ buffer_obj.write_to(bytes_stream)
+
+ # Write to file
+ with open('/tmp/buffer_data.bin', 'wb') as f:
+ buffer_obj.write_to(f)
+
+ # Get zero-copy memoryview (for contiguous buffers)
+ mv = buffer_obj.getbuffer()
+ assert isinstance(mv, memoryview)
+```
+
+**Note**: For contiguous memory buffers (like bytes, numpy arrays),
`getbuffer()` returns a zero-copy `memoryview`. For non-contiguous data, a copy
may be created to ensure contiguity.
+
## 🏃♂️ Cross-Language Object Graph Serialization
`pyfory` supports cross-language object graph serialization, allowing you to
serialize data in Python and deserialize it in Java, Go, Rust, or other
supported languages.
diff --git a/python/pyfory/_fory.py b/python/pyfory/_fory.py
index 0df532355..170bce9a2 100644
--- a/python/pyfory/_fory.py
+++ b/python/pyfory/_fory.py
@@ -79,15 +79,27 @@ class BufferObject(ABC):
@abstractmethod
def total_bytes(self) -> int:
- """total size for serialized bytes of an object"""
+ """Total size for serialized bytes of an object."""
@abstractmethod
- def write_to(self, buffer: "Buffer"):
- """Write serialized object to a buffer."""
+ def write_to(self, stream):
+ """
+ Write serialized object to a writable stream.
+
+ Args:
+ stream: Any writable object with write() method (Buffer, file,
BytesIO, etc.)
+ """
@abstractmethod
- def to_buffer(self) -> "Buffer":
- """Write serialized data as Buffer."""
+ def getbuffer(self) -> memoryview:
+ """
+ Return serialized data as memoryview for zero-copy access.
+
+ Returns:
+ memoryview: A memoryview of the serialized data. For objects with
+ contiguous memory (bytes, bytearray, numpy arrays), this is
zero-copy.
+ For non-contiguous data, a copy may be created to ensure
contiguity.
+ """
class Fory:
diff --git a/python/pyfory/_registry.py b/python/pyfory/_registry.py
index 7605c2606..5cbf09c06 100644
--- a/python/pyfory/_registry.py
+++ b/python/pyfory/_registry.py
@@ -21,6 +21,7 @@ import datetime
import enum
import functools
import logging
+import pickle
import types
from typing import TypeVar, Union
from enum import Enum
@@ -65,6 +66,7 @@ from pyfory.serializer import (
MethodSerializer,
UnsupportedSerializer,
NativeFuncMethodSerializer,
+ PickleBufferSerializer,
)
from pyfory.meta.metastring import MetaStringEncoder, MetaStringDecoder
from pyfory.meta.meta_compressor import DeflaterMetaCompressor
@@ -219,6 +221,7 @@ class TypeResolver:
register(np.ndarray, serializer=NDArraySerializer)
register(array.array, serializer=DynamicPyArraySerializer)
register(types.MappingProxyType, serializer=MappingProxySerializer)
+ register(pickle.PickleBuffer, serializer=PickleBufferSerializer)
if not self.require_registration:
register(types.ModuleType, serializer=ModuleSerializer)
self._internal_py_serializer_map = {
diff --git a/python/pyfory/_serialization.pyx b/python/pyfory/_serialization.pyx
index 54b0d275a..8339ae2ba 100644
--- a/python/pyfory/_serialization.pyx
+++ b/python/pyfory/_serialization.pyx
@@ -809,7 +809,7 @@ cdef class Fory:
cdef readonly MetaStringResolver metastring_resolver
cdef readonly SerializationContext serialization_context
cdef Buffer buffer
- cdef object _buffer_callback
+ cdef public object buffer_callback
cdef object _buffers # iterator
cdef object _unsupported_callback
cdef object _unsupported_objects # iterator
@@ -877,7 +877,7 @@ cdef class Fory:
self.serialization_context = SerializationContext(fory=self,
scoped_meta_share_enabled=compatible)
self.type_resolver.initialize()
self.buffer = Buffer.allocate(32)
- self._buffer_callback = None
+ self.buffer_callback = None
self._buffers = None
self._unsupported_callback = None
self._unsupported_objects = None
@@ -952,7 +952,7 @@ cdef class Fory:
cpdef inline _serialize(
self, obj, Buffer buffer, buffer_callback=None,
unsupported_callback=None):
- self._buffer_callback = buffer_callback
+ self.buffer_callback = buffer_callback
self._unsupported_callback = unsupported_callback
if buffer is None:
self.buffer.writer_index = 0
@@ -981,7 +981,7 @@ cdef class Fory:
else:
# set reader as native.
clear_bit(buffer, mask_index, 2)
- if self._buffer_callback is not None:
+ if self.buffer_callback is not None:
set_bit(buffer, mask_index, 3)
else:
clear_bit(buffer, mask_index, 3)
@@ -1237,20 +1237,23 @@ cdef class Fory:
)
cpdef inline write_buffer_object(self, Buffer buffer, buffer_object):
- if self._buffer_callback is not None and
self._buffer_callback(buffer_object):
+ cdef int32_t size
+ cdef int32_t writer_index
+ cdef Buffer buf
+ if self.buffer_callback is None or self.buffer_callback(buffer_object):
+ buffer.write_bool(True)
+ size = buffer_object.total_bytes()
+ # writer length.
+ buffer.write_varuint32(size)
+ writer_index = buffer.writer_index
+ buffer.ensure(writer_index + size)
+ buf = buffer.slice(buffer.writer_index, size)
+ buffer_object.write_to(buf)
+ buffer.writer_index += size
+ else:
buffer.write_bool(False)
- return
- buffer.write_bool(True)
- cdef int32_t size = buffer_object.total_bytes()
- # writer length.
- buffer.write_varuint32(size)
- cdef int32_t writer_index = buffer.writer_index
- buffer.ensure(writer_index + size)
- cdef Buffer buf = buffer.slice(buffer.writer_index, size)
- buffer_object.write_to(buf)
- buffer.writer_index += size
-
- cpdef inline Buffer read_buffer_object(self, Buffer buffer):
+
+ cpdef inline object read_buffer_object(self, Buffer buffer):
cdef c_bool in_band = buffer.read_bool()
if not in_band:
assert self._buffers is not None
diff --git a/python/pyfory/format/serializer.py
b/python/pyfory/format/serializer.py
index b3e2b8066..f176e5756 100644
--- a/python/pyfory/format/serializer.py
+++ b/python/pyfory/format/serializer.py
@@ -45,16 +45,22 @@ class ArrowRecordBatchBufferObject(BufferObject):
def total_bytes(self) -> int:
return self.nbytes
- def write_to(self, buffer: Buffer):
- assert isinstance(buffer, Buffer)
-
- sink = pa.FixedSizeBufferWriter(pa.py_buffer(buffer))
+ def write_to(self, stream):
+ if isinstance(stream, Buffer):
+ sink = pa.FixedSizeBufferWriter(pa.py_buffer(stream))
+ else:
+ sink = pa.BufferOutputStream()
self._write(self.batch, sink)
+ if not isinstance(stream, Buffer):
+ data = sink.getvalue()
+ if hasattr(stream, "write"):
+ stream.write(data.to_pybytes())
- def to_buffer(self) -> Buffer:
+ def getbuffer(self) -> memoryview:
sink = pa.BufferOutputStream()
ArrowRecordBatchBufferObject._write(self.batch, sink)
- return Buffer(sink.getvalue())
+ arrow_buffer = sink.getvalue()
+ return memoryview(arrow_buffer)
@staticmethod
def _write(batch, sink):
@@ -88,15 +94,22 @@ class ArrowTableBufferObject(BufferObject):
def total_bytes(self) -> int:
return self.nbytes
- def write_to(self, buffer: Buffer):
- assert isinstance(buffer, Buffer)
- sink = pa.FixedSizeBufferWriter(pa.py_buffer(buffer))
+ def write_to(self, stream):
+ if isinstance(stream, Buffer):
+ sink = pa.FixedSizeBufferWriter(pa.py_buffer(stream))
+ else:
+ sink = pa.BufferOutputStream()
ArrowTableBufferObject._write(self.table, sink)
+ if not isinstance(stream, Buffer):
+ data = sink.getvalue()
+ if hasattr(stream, "write"):
+ stream.write(data.to_pybytes())
- def to_buffer(self) -> Buffer:
+ def getbuffer(self) -> memoryview:
sink = pa.BufferOutputStream()
self._write(self.table, sink)
- return Buffer(sink.getvalue())
+ arrow_buffer = sink.getvalue()
+ return memoryview(arrow_buffer)
@staticmethod
def _write(table, sink):
diff --git a/python/pyfory/serializer.py b/python/pyfory/serializer.py
index bea983862..a1732d2b4 100644
--- a/python/pyfory/serializer.py
+++ b/python/pyfory/serializer.py
@@ -24,6 +24,7 @@ import itertools
import marshal
import logging
import os
+import pickle
import types
import typing
from typing import List
@@ -886,9 +887,8 @@ class NDArraySerializer(Serializer):
raise NotImplementedError("Multi-dimensional array not supported
currently")
def write(self, buffer, value):
- # Serialize numpy ND array using native format
- dtype = value.dtype
fory = self.fory
+ dtype = value.dtype
fory.serialize_ref(buffer, dtype)
buffer.write_varuint32(len(value.shape))
for dim in value.shape:
@@ -898,8 +898,7 @@ class NDArraySerializer(Serializer):
for item in value:
fory.serialize_ref(buffer, item)
else:
- data = value.tobytes()
- buffer.write_bytes_and_size(data)
+ fory.write_buffer_object(buffer, NDArrayBufferObject(value))
def read(self, buffer):
fory = self.fory
@@ -910,8 +909,12 @@ class NDArraySerializer(Serializer):
length = buffer.read_varint32()
items = [fory.deserialize_ref(buffer) for _ in range(length)]
return np.array(items, dtype=object)
- data = buffer.read_bytes_and_size()
- return np.frombuffer(data, dtype=dtype).reshape(shape)
+ fory_buf = fory.read_buffer_object(buffer)
+ if isinstance(fory_buf, memoryview):
+ return np.frombuffer(fory_buf, dtype=dtype).reshape(shape)
+ elif isinstance(fory_buf, bytes):
+ return np.frombuffer(fory_buf, dtype=dtype).reshape(shape)
+ return np.frombuffer(fory_buf.to_pybytes(), dtype=dtype).reshape(shape)
class BytesSerializer(CrossLanguageCompatibleSerializer):
@@ -920,6 +923,10 @@ class BytesSerializer(CrossLanguageCompatibleSerializer):
def read(self, buffer):
fory_buf = self.fory.read_buffer_object(buffer)
+ if isinstance(fory_buf, memoryview):
+ return bytes(fory_buf)
+ elif isinstance(fory_buf, bytes):
+ return fory_buf
return fory_buf.to_pybytes()
@@ -932,11 +939,72 @@ class BytesBufferObject(BufferObject):
def total_bytes(self) -> int:
return len(self.binary)
- def write_to(self, buffer: "Buffer"):
- buffer.write_bytes(self.binary)
+ def write_to(self, stream):
+ if hasattr(stream, "write_bytes"):
+ stream.write_bytes(self.binary)
+ else:
+ stream.write(self.binary)
+
+ def getbuffer(self) -> memoryview:
+ return memoryview(self.binary)
+
+
+class PickleBufferSerializer(CrossLanguageCompatibleSerializer):
+ def write(self, buffer, value):
+ self.fory.write_buffer_object(buffer, PickleBufferObject(value))
+
+ def read(self, buffer):
+ fory_buf = self.fory.read_buffer_object(buffer)
+ if isinstance(fory_buf, (bytes, memoryview, bytearray, Buffer)):
+ return pickle.PickleBuffer(fory_buf)
+ return pickle.PickleBuffer(fory_buf.to_pybytes())
+
+
+class PickleBufferObject(BufferObject):
+ __slots__ = ("pickle_buffer",)
+
+ def __init__(self, pickle_buffer):
+ self.pickle_buffer = pickle_buffer
+
+ def total_bytes(self) -> int:
+ return len(self.pickle_buffer.raw())
+
+ def write_to(self, stream):
+ raw = self.pickle_buffer.raw()
+ if hasattr(stream, "write_buffer"):
+ stream.write_buffer(raw)
+ else:
+ stream.write(bytes(raw) if isinstance(raw, memoryview) else raw)
+
+ def getbuffer(self) -> memoryview:
+ raw = self.pickle_buffer.raw()
+ if isinstance(raw, memoryview):
+ return raw
+ return memoryview(bytes(raw))
+
+
+class NDArrayBufferObject(BufferObject):
+ __slots__ = ("array", "dtype", "shape")
+
+ def __init__(self, array):
+ self.array = array
+ self.dtype = array.dtype
+ self.shape = array.shape
+
+ def total_bytes(self) -> int:
+ return self.array.nbytes
+
+ def write_to(self, stream):
+ data = self.array.tobytes()
+ if hasattr(stream, "write_buffer"):
+ stream.write_buffer(data)
+ else:
+ stream.write(data)
- def to_buffer(self) -> "Buffer":
- return Buffer(self.binary)
+ def getbuffer(self) -> memoryview:
+ if self.array.flags.c_contiguous:
+ return memoryview(self.array.data)
+ return memoryview(self.array.tobytes())
class StatefulSerializer(CrossLanguageCompatibleSerializer):
@@ -1002,11 +1070,11 @@ class
ReduceSerializer(CrossLanguageCompatibleSerializer):
self._getnewargs = getattr(cls, "__getnewargs__", None)
def write(self, buffer, value):
- # Try __reduce_ex__ first (with protocol 2), then __reduce__
+ # Try __reduce_ex__ first (with protocol 5 for pickle5 out-of-band
buffer support), then __reduce__
# Check if the object has a custom __reduce_ex__ method (not just the
default from object)
if hasattr(value, "__reduce_ex__") and value.__class__.__reduce_ex__
is not object.__reduce_ex__:
try:
- reduce_result = value.__reduce_ex__(2)
+ reduce_result = value.__reduce_ex__(5)
except TypeError:
# Some objects don't support protocol argument
reduce_result = value.__reduce_ex__()
diff --git a/python/pyfory/tests/test_cross_language.py
b/python/pyfory/tests/test_cross_language.py
index 4523bdf17..33cd2f18a 100644
--- a/python/pyfory/tests/test_cross_language.py
+++ b/python/pyfory/tests/test_cross_language.py
@@ -403,7 +403,7 @@ def test_serialize_arrow_out_of_band(int_band_file,
out_of_band_file):
assert objects == [batch, table]
buffer_objects = []
in_band_buffer = fory.serialize([batch, table],
buffer_callback=buffer_objects.append)
- buffers = [o.to_buffer() for o in buffer_objects]
+ buffers = [o.getbuffer() for o in buffer_objects]
with open(int_band_file, "wb+") as f:
f.write(in_band_buffer)
with open(out_of_band_file, "wb+") as f:
@@ -676,8 +676,8 @@ def test_oob_buffer(in_band_file_path,
out_of_band_file_path):
# in_band_bytes size may be different because it may contain
language-specific meta.
debug_print(f"{len(serialized), len(in_band_bytes)}")
debug_print(f"deserialized from other language {new_obj}")
- debug_print(f"deserialized from python {fory.deserialize(serialized,
[o.to_buffer() for o in buffer_objects])}")
- fory.deserialize(serialized, [o.to_buffer() for o in buffer_objects])
+ debug_print(f"deserialized from python {fory.deserialize(serialized,
[o.getbuffer() for o in buffer_objects])}")
+ fory.deserialize(serialized, [o.getbuffer() for o in buffer_objects])
with open(in_band_file_path, "wb+") as f:
f.write(serialized)
out_of_band_buffer.write_int32(len(buffer_objects))
diff --git a/python/pyfory/tests/test_pickle_buffer.py
b/python/pyfory/tests/test_pickle_buffer.py
new file mode 100644
index 000000000..b19082ba3
--- /dev/null
+++ b/python/pyfory/tests/test_pickle_buffer.py
@@ -0,0 +1,320 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pickle
+import pytest
+from pyfory import Fory
+
+
+try:
+ import numpy as np
+except ImportError:
+ np = None
+
+try:
+ import pandas as pd
+except ImportError:
+ pd = None
+
+
+def test_pickle_buffer_serialization():
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ data = b"Hello, PickleBuffer!"
+ pickle_buffer = pickle.PickleBuffer(data)
+
+ serialized = fory.serialize(pickle_buffer)
+ deserialized = fory.deserialize(serialized)
+
+ assert isinstance(deserialized, pickle.PickleBuffer)
+ assert bytes(deserialized.raw()) == data
+
+
[email protected](np is None, reason="Requires numpy")
+def test_numpy_out_of_band_serialization():
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ arr = np.arange(10000, dtype=np.float64)
+
+ buffer_objects = []
+ serialized = fory.serialize(arr, buffer_callback=buffer_objects.append)
+
+ buffers = [o.getbuffer() for o in buffer_objects]
+
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ np.testing.assert_array_equal(arr, deserialized)
+
+
[email protected](pd is None, reason="Requires pandas")
+def test_pandas_out_of_band_serialization():
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ df = pd.DataFrame(
+ {
+ "a": np.arange(1000, dtype=np.float64),
+ "b": np.arange(1000, dtype=np.int64),
+ "c": ["text"] * 1000,
+ }
+ )
+
+ buffer_objects = []
+ serialized = fory.serialize(df, buffer_callback=buffer_objects.append)
+
+ buffers = [o.getbuffer() for o in buffer_objects]
+
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ pd.testing.assert_frame_equal(df, deserialized)
+
+
[email protected](np is None, reason="Requires numpy")
+def test_numpy_multiple_arrays_out_of_band():
+ fory = Fory(xlang=False, ref=True, strict=False)
+
+ arr1 = np.arange(5000, dtype=np.float32)
+ arr2 = np.arange(3000, dtype=np.int32)
+ arr3 = np.arange(2000, dtype=np.float64)
+
+ data = [arr1, arr2, arr3]
+
+ buffer_objects = []
+ serialized = fory.serialize(data, buffer_callback=buffer_objects.append)
+
+ buffers = [o.getbuffer() for o in buffer_objects]
+
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ assert len(deserialized) == 3
+ np.testing.assert_array_equal(arr1, deserialized[0])
+ np.testing.assert_array_equal(arr2, deserialized[1])
+ np.testing.assert_array_equal(arr3, deserialized[2])
+
+
[email protected](np is None, reason="Requires numpy")
+def test_numpy_with_mixed_types():
+ fory = Fory(xlang=False, ref=True, strict=False)
+
+ arr = np.arange(1000, dtype=np.float64)
+ text = "some text"
+ number = 42
+
+ data = {"array": arr, "text": text, "number": number}
+
+ buffer_objects = []
+ serialized = fory.serialize(data, buffer_callback=buffer_objects.append)
+
+ buffers = [o.getbuffer() for o in buffer_objects]
+
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ assert deserialized["text"] == text
+ assert deserialized["number"] == number
+ np.testing.assert_array_equal(arr, deserialized["array"])
+
+
[email protected](pd is None or np is None, reason="Requires numpy and
pandas")
+def test_mixed_numpy_pandas_out_of_band():
+ fory = Fory(xlang=False, ref=True, strict=False)
+
+ arr = np.arange(500, dtype=np.float64)
+ df = pd.DataFrame({"x": np.arange(500, dtype=np.int64), "y":
np.arange(500, dtype=np.float32)})
+
+ data = {"array": arr, "dataframe": df}
+
+ buffer_objects = []
+ serialized = fory.serialize(data, buffer_callback=buffer_objects.append)
+
+ buffers = [o.getbuffer() for o in buffer_objects]
+
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ np.testing.assert_array_equal(arr, deserialized["array"])
+ pd.testing.assert_frame_equal(df, deserialized["dataframe"])
+
+
[email protected](np is None, reason="Requires numpy")
+def test_selective_out_of_band_serialization():
+ fory = Fory(xlang=False, ref=True, strict=False)
+
+ arr1 = np.arange(1000, dtype=np.float64)
+ arr2 = np.arange(1000, dtype=np.float64)
+
+ data = [arr1, arr2]
+
+ buffer_objects = []
+ counter = 0
+
+ def selective_buffer_callback(buffer_object):
+ nonlocal counter
+ counter += 1
+ if counter % 2 == 0:
+ buffer_objects.append(buffer_object)
+ return False
+ else:
+ return True
+
+ serialized = fory.serialize(data,
buffer_callback=selective_buffer_callback)
+
+ buffers = [o.getbuffer() for o in buffer_objects]
+
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ assert len(deserialized) == 2
+ np.testing.assert_array_equal(arr1, deserialized[0])
+ np.testing.assert_array_equal(arr2, deserialized[1])
+
+
[email protected](np is None, reason="Requires numpy")
+def test_buffer_object_write_to_stream():
+ """Test BufferObject.write_to() with different stream types"""
+ import io
+ from pyfory.serializer import NDArrayBufferObject
+
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ arr = np.arange(100).reshape(10, 10).astype(np.float64)
+
+ buffer_objects = []
+ serialized = fory.serialize(arr, buffer_callback=buffer_objects.append)
+
+ assert len(buffer_objects) > 0, "Should have collected out-of-band buffers"
+
+ for buffer_obj in buffer_objects:
+ assert isinstance(buffer_obj, NDArrayBufferObject), f"Expected
NDArrayBufferObject, got {type(buffer_obj)}"
+
+ for buffer_obj in buffer_objects:
+ stream = io.BytesIO()
+ buffer_obj.write_to(stream)
+ stream.seek(0)
+ data = stream.read()
+ assert len(data) == buffer_obj.total_bytes()
+
+ for buffer_obj in buffer_objects:
+ mv = buffer_obj.getbuffer()
+ assert isinstance(mv, memoryview)
+ assert mv.nbytes == buffer_obj.total_bytes()
+
+ buffers = [obj.getbuffer() for obj in buffer_objects]
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+ np.testing.assert_array_equal(arr, deserialized)
+
+
[email protected](np is None, reason="Requires numpy")
+def test_multidimensional_numpy_array_out_of_band():
+ """Test out-of-band serialization with multi-dimensional numpy arrays"""
+ from pyfory.serializer import NDArrayBufferObject
+
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ arr_2d = np.arange(100).reshape(10, 10).astype(np.float64)
+ arr_3d = np.arange(1000).reshape(10, 10, 10).astype(np.int64)
+ arr_4d = np.arange(256).reshape(4, 4, 4, 4).astype(np.float32)
+
+ data = [arr_2d, arr_3d, arr_4d]
+
+ buffer_objects = []
+ serialized = fory.serialize(data, buffer_callback=buffer_objects.append)
+
+ assert len(buffer_objects) > 0, "Should have collected out-of-band buffers"
+
+ for buffer_obj in buffer_objects:
+ assert isinstance(buffer_obj, NDArrayBufferObject), f"Expected
NDArrayBufferObject, got {type(buffer_obj)}"
+ mv = buffer_obj.getbuffer()
+ assert isinstance(mv, memoryview), "getbuffer() should return
memoryview"
+ assert len(mv) > 0, "Buffer should contain data"
+
+ buffers = [obj.getbuffer() for obj in buffer_objects]
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ assert len(deserialized) == 3
+ assert deserialized[0].shape == (10, 10)
+ assert deserialized[1].shape == (10, 10, 10)
+ assert deserialized[2].shape == (4, 4, 4, 4)
+
+ np.testing.assert_array_equal(arr_2d, deserialized[0])
+ np.testing.assert_array_equal(arr_3d, deserialized[1])
+ np.testing.assert_array_equal(arr_4d, deserialized[2])
+
+
[email protected](np is None, reason="Requires numpy")
+def test_numpy_array_different_dtypes_out_of_band():
+ """Test out-of-band serialization preserves various numpy dtypes"""
+ from pyfory.serializer import NDArrayBufferObject
+
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ arrays = {
+ "float32": np.arange(100).reshape(10, 10).astype(np.float32),
+ "float64": np.arange(100).reshape(10, 10).astype(np.float64),
+ "int8": np.arange(100).reshape(10, 10).astype(np.int8),
+ "int16": np.arange(100).reshape(10, 10).astype(np.int16),
+ "int32": np.arange(100).reshape(10, 10).astype(np.int32),
+ "int64": np.arange(100).reshape(10, 10).astype(np.int64),
+ "uint8": np.arange(100).reshape(10, 10).astype(np.uint8),
+ "bool": np.array([True, False] * 50, dtype=np.bool_).reshape(10, 10),
+ }
+
+ buffer_objects = []
+ serialized = fory.serialize(arrays, buffer_callback=buffer_objects.append)
+
+ assert len(buffer_objects) > 0, "Should have collected out-of-band buffers"
+
+ for buffer_obj in buffer_objects:
+ assert isinstance(buffer_obj, NDArrayBufferObject), f"Expected
NDArrayBufferObject, got {type(buffer_obj)}"
+ mv = buffer_obj.getbuffer()
+ assert isinstance(mv, memoryview), "getbuffer() should return
memoryview"
+
+ buffers = [obj.getbuffer() for obj in buffer_objects]
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ for key, original_array in arrays.items():
+ np.testing.assert_array_equal(original_array, deserialized[key])
+ assert original_array.dtype == deserialized[key].dtype, f"dtype
mismatch for {key}: {original_array.dtype} != {deserialized[key].dtype}"
+
+
[email protected](np is None, reason="Requires numpy")
+def test_large_numpy_arrays_verify_buffer_collection():
+ """Verify that large numpy arrays properly use out-of-band buffers"""
+ from pyfory.serializer import NDArrayBufferObject
+
+ fory = Fory(xlang=False, ref=False, strict=False)
+
+ large_2d = np.arange(10000).reshape(100, 100).astype(np.int64)
+ large_3d = np.arange(27000).reshape(30, 30, 30).astype(np.float32)
+ large_4d = np.arange(10000).reshape(10, 10, 10, 10).astype(np.float64)
+
+ arrays = [large_2d, large_3d, large_4d]
+
+ buffer_objects = []
+ serialized = fory.serialize(arrays, buffer_callback=buffer_objects.append)
+
+ assert len(buffer_objects) > 0, "Should have collected out-of-band buffers"
+
+ for i, buffer_obj in enumerate(buffer_objects):
+ assert isinstance(buffer_obj, NDArrayBufferObject), f"Buffer {i}:
Expected NDArrayBufferObject, got {type(buffer_obj)}"
+ mv = buffer_obj.getbuffer()
+ assert isinstance(mv, memoryview), "getbuffer() should return
memoryview"
+ assert len(mv) > 0, f"Buffer {i} should contain data"
+
+ buffers = [obj.getbuffer() for obj in buffer_objects]
+ deserialized = fory.deserialize(serialized, buffers=buffers)
+
+ assert len(deserialized) == 3
+ np.testing.assert_array_equal(large_2d, deserialized[0])
+ np.testing.assert_array_equal(large_3d, deserialized[1])
+ np.testing.assert_array_equal(large_4d, deserialized[2])
diff --git a/python/pyfory/tests/test_serializer.py
b/python/pyfory/tests/test_serializer.py
index ee91a09a8..bcff60272 100644
--- a/python/pyfory/tests/test_serializer.py
+++ b/python/pyfory/tests/test_serializer.py
@@ -362,7 +362,7 @@ def test_serialize_arrow_zero_copy():
serialized_data = Buffer.allocate(32)
fory.serialize(record_batch, buffer=serialized_data,
buffer_callback=buffer_objects.append)
fory.serialize(table, buffer=serialized_data,
buffer_callback=buffer_objects.append)
- buffers = [o.to_buffer() for o in buffer_objects]
+ buffers = [o.getbuffer() for o in buffer_objects]
new_batch = fory.deserialize(serialized_data, buffers=buffers[:1])
new_table = fory.deserialize(serialized_data, buffers=buffers[1:])
buffer_objects.clear()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]