This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new 1c741362c feat(python): add thread safe fory (#2735)
1c741362c is described below
commit 1c741362c633e029941df079c26067345782d7f0
Author: Shawn Yang <[email protected]>
AuthorDate: Fri Oct 10 10:09:49 2025 +0800
feat(python): add thread safe fory (#2735)
<!--
**Thanks for contributing to Apache Fory™.**
**If this is your first time opening a PR on fory, you can refer to
[CONTRIBUTING.md](https://github.com/apache/fory/blob/main/CONTRIBUTING.md).**
Contribution Checklist
- The **Apache Fory™** community has requirements on the naming of pr
titles. You can also find instructions in
[CONTRIBUTING.md](https://github.com/apache/fory/blob/main/CONTRIBUTING.md).
- Apache Fory™ has a strong focus on performance. If the PR you submit
will have an impact on performance, please benchmark it first and
provide the benchmark result here.
-->
## Why?
<!-- Describe the purpose of this PR. -->
## What does this PR do?
add thread safe fory
## Related issues
<!--
Is there any related issue? If this PR closes them you say say
fix/closes:
- #xxxx0
- #xxxx1
- Fixes #xxxx2
-->
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fory/issues/new/choose) describing the
need to do so and update the document if necessary.
Delete section if not applicable.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
Delete section if not applicable.
-->
---
python/README.md | 59 +++++++++++
python/pyfory/__init__.py | 1 +
python/pyfory/_fory.py | 164 +++++++++++++++++++++++++++++
python/pyfory/tests/test_thread_safe.py | 177 ++++++++++++++++++++++++++++++++
4 files changed, 401 insertions(+)
diff --git a/python/README.md b/python/README.md
index 77b54f036..db50ba40a 100644
--- a/python/README.md
+++ b/python/README.md
@@ -671,6 +671,65 @@ class Fory:
)
```
+### ThreadSafeFory Class
+
+Thread-safe serialization interface using thread-local storage:
+
+```python
+class ThreadSafeFory:
+ def __init__(
+ self,
+ xlang: bool = False,
+ ref: bool = False,
+ strict: bool = True,
+ compatible: bool = False,
+ max_depth: int = 50
+ )
+```
+
+`ThreadSafeFory` provides thread-safe serialization by maintaining a pool of
`Fory` instances protected by a lock. When a thread needs to
serialize/deserialize, it gets an instance from the pool, uses it, and returns
it. All type registrations must be done before any serialization to ensure
consistency across all instances.
+
+**Thread Safety Example:**
+
+```python
+import pyfory
+import threading
+from dataclasses import dataclass
+
+@dataclass
+class Person:
+ name: str
+ age: int
+
+# Create thread-safe Fory instance
+fory = pyfory.ThreadSafeFory(xlang=False, ref=True)
+fory.register(Person)
+
+# Use in multiple threads safely
+def serialize_in_thread(thread_id):
+ person = Person(name=f"User{thread_id}", age=25 + thread_id)
+ data = fory.serialize(person)
+ result = fory.deserialize(data)
+ print(f"Thread {thread_id}: {result}")
+
+threads = [threading.Thread(target=serialize_in_thread, args=(i,)) for i in
range(10)]
+for t in threads: t.start()
+for t in threads: t.join()
+```
+
+**Key Features:**
+
+- **Instance Pool**: Maintains a pool of `Fory` instances protected by a lock
for thread safety
+- **Shared Configuration**: All registrations must be done upfront and are
applied to all instances
+- **Same API**: Drop-in replacement for `Fory` class with identical methods
+- **Registration Safety**: Prevents registration after first use to ensure
consistency
+
+**When to Use:**
+
+- **Multi-threaded Applications**: Web servers, concurrent workers, parallel
processing
+- **Shared Fory Instances**: When multiple threads need to
serialize/deserialize data
+- **Thread Pools**: Applications using thread pools or concurrent.futures
+
**Parameters:**
- **`xlang`** (`bool`, default=`False`): Enable cross-language serialization.
When `False`, enables Python-native mode supporting all Python objects. When
`True`, enables cross-language mode compatible with Java, Go, Rust, etc.
diff --git a/python/pyfory/__init__.py b/python/pyfory/__init__.py
index 7cfdf62d5..2abeda2a7 100644
--- a/python/pyfory/__init__.py
+++ b/python/pyfory/__init__.py
@@ -19,6 +19,7 @@ from pyfory import lib # noqa: F401 # pylint:
disable=unused-import
from pyfory._fory import ( # noqa: F401 # pylint: disable=unused-import
Fory,
Language,
+ ThreadSafeFory,
)
PYTHON = Language.PYTHON
diff --git a/python/pyfory/_fory.py b/python/pyfory/_fory.py
index 170bce9a2..dec22fe61 100644
--- a/python/pyfory/_fory.py
+++ b/python/pyfory/_fory.py
@@ -593,3 +593,167 @@ _ENABLE_TYPE_REGISTRATION_FORCIBLY =
os.getenv("ENABLE_TYPE_REGISTRATION_FORCIBL
"1",
"true",
}
+
+
+class ThreadSafeFory:
+ """
+ Thread-safe wrapper for Fory using instance pooling.
+
+ ThreadSafeFory maintains a pool of Fory instances protected by a lock to
enable
+ safe concurrent serialization/deserialization across multiple threads.
When a thread
+ needs to serialize or deserialize data, it acquires an instance from the
pool, uses it,
+ and returns it for reuse by other threads.
+
+ All type registrations must be performed before any serialization
operations to ensure
+ consistency across all pooled instances. Attempting to register types
after the first
+ serialization will raise a RuntimeError.
+
+ Args:
+ xlang (bool): Whether to enable cross-language serialization. Defaults
to False.
+ ref (bool): Whether to enable reference tracking. Defaults to False.
+ strict (bool): Whether to require type registration. Defaults to True.
+ compatible (bool): Whether to enable compatible mode. Defaults to
False.
+ max_depth (int): Maximum depth for deserialization. Defaults to 50.
+
+ Example:
+ >>> import pyfury
+ >>> import threading
+ >>> from dataclasses import dataclass
+ >>>
+ >>> @dataclass
+ >>> class Person:
+ ... name: str
+ ... age: int
+ >>>
+ >>> # Create thread-safe instance
+ >>> fory = pyfury.ThreadSafeFory()
+ >>> fory.register(Person)
+ >>>
+ >>> # Use safely from multiple threads
+ >>> def worker(thread_id):
+ ... person = Person(f"User{thread_id}", 25)
+ ... data = fory.serialize(person)
+ ... result = fory.deserialize(data)
+ ... print(f"Thread {thread_id}: {result}")
+ >>>
+ >>> threads = [threading.Thread(target=worker, args=(i,)) for i in
range(5)]
+ >>> for t in threads: t.start()
+ >>> for t in threads: t.join()
+
+ Note:
+ - Register all types before calling serialize/deserialize
+ - The pool grows dynamically as needed based on thread contention
+ - Instances are automatically returned to the pool after use
+ - Both Python and Cython modes are supported automatically
+ """
+
+ def __init__(self, **kwargs):
+ import threading
+
+ self._config = kwargs
+ self._callbacks = []
+ self._lock = threading.Lock()
+ self._pool = []
+ self._fory_class = self._get_fory_class()
+ self._instances_created = False
+
+ def _get_fory_class(self):
+ try:
+ from pyfory._serialization import ENABLE_FORY_CYTHON_SERIALIZATION
+
+ if ENABLE_FORY_CYTHON_SERIALIZATION:
+ from pyfory._serialization import Fory as CythonFory
+
+ return CythonFory
+ except ImportError:
+ pass
+ return Fory
+
+ def _get_fory(self):
+ with self._lock:
+ if self._pool:
+ return self._pool.pop()
+ self._instances_created = True
+ fory = self._fory_class(**self._config)
+ for callback in self._callbacks:
+ callback(fory)
+ return fory
+
+ def _return_fory(self, fory):
+ with self._lock:
+ self._pool.append(fory)
+
+ def _register_callback(self, callback):
+ with self._lock:
+ if self._instances_created:
+ raise RuntimeError(
+ "Cannot register types after Fory instances have been
created. Please register all types before calling serialize/deserialize."
+ )
+ self._callbacks.append(callback)
+
+ def register(
+ self,
+ cls: Union[type, TypeVar],
+ *,
+ type_id: int = None,
+ namespace: str = None,
+ typename: str = None,
+ serializer=None,
+ ):
+ self._register_callback(lambda f: f.register(cls, type_id=type_id,
namespace=namespace, typename=typename, serializer=serializer))
+
+ def register_type(
+ self,
+ cls: Union[type, TypeVar],
+ *,
+ type_id: int = None,
+ namespace: str = None,
+ typename: str = None,
+ serializer=None,
+ ):
+ self._register_callback(lambda f: f.register_type(cls,
type_id=type_id, namespace=namespace, typename=typename, serializer=serializer))
+
+ def register_serializer(self, cls: type, serializer):
+ self._register_callback(lambda f: f.register_serializer(cls,
serializer))
+
+ def serialize(
+ self,
+ obj,
+ buffer: Buffer = None,
+ buffer_callback=None,
+ unsupported_callback=None,
+ ) -> Union[Buffer, bytes]:
+ fory = self._get_fory()
+ try:
+ return fory.serialize(obj, buffer, buffer_callback,
unsupported_callback)
+ finally:
+ self._return_fory(fory)
+
+ def deserialize(
+ self,
+ buffer: Union[Buffer, bytes],
+ buffers: Iterable = None,
+ unsupported_objects: Iterable = None,
+ ):
+ fory = self._get_fory()
+ try:
+ return fory.deserialize(buffer, buffers, unsupported_objects)
+ finally:
+ self._return_fory(fory)
+
+ def dumps(
+ self,
+ obj,
+ buffer: Buffer = None,
+ buffer_callback=None,
+ unsupported_callback=None,
+ ) -> Union[Buffer, bytes]:
+ return self.serialize(obj, buffer, buffer_callback,
unsupported_callback)
+
+ def loads(
+ self,
+ buffer: Union[Buffer, bytes],
+ buffers: Iterable = None,
+ unsupported_objects: Iterable = None,
+ ):
+ return self.deserialize(buffer, buffers, unsupported_objects)
diff --git a/python/pyfory/tests/test_thread_safe.py
b/python/pyfory/tests/test_thread_safe.py
new file mode 100644
index 000000000..5b49738ed
--- /dev/null
+++ b/python/pyfory/tests/test_thread_safe.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+from dataclasses import dataclass
+
+
+from pyfory import ThreadSafeFory
+
+
+@dataclass
+class Person:
+ name: str
+ age: int
+
+
+@dataclass
+class Address:
+ city: str
+ country: str
+
+
+def test_thread_safe_fory_basic_serialization():
+ fory = ThreadSafeFory()
+ fory.register(Person)
+
+ person = Person(name="Alice", age=30)
+ data = fory.serialize(person)
+ result = fory.deserialize(data)
+
+ assert result.name == person.name
+ assert result.age == person.age
+
+
+def test_thread_safe_fory_multiple_threads():
+ fory = ThreadSafeFory()
+ fory.register(Person)
+
+ results = []
+ errors = []
+
+ def serialize_deserialize(thread_id):
+ try:
+ person = Person(name=f"Person{thread_id}", age=20 + thread_id)
+ data = fory.serialize(person)
+ result = fory.deserialize(data)
+ results.append((thread_id, result))
+ except Exception as e:
+ errors.append((thread_id, e))
+
+ threads = []
+ for i in range(10):
+ t = threading.Thread(target=serialize_deserialize, args=(i,))
+ threads.append(t)
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ assert len(errors) == 0, f"Errors occurred: {errors}"
+ assert len(results) == 10
+
+ for thread_id, result in results:
+ assert result.name == f"Person{thread_id}"
+ assert result.age == 20 + thread_id
+
+
+def test_thread_safe_fory_registration():
+ fory = ThreadSafeFory()
+ fory.register(Person, type_id=100)
+ fory.register(Address, namespace="test", typename="Address")
+
+ person = Person(name="Bob", age=25)
+ data = fory.serialize(person)
+ result = fory.deserialize(data)
+ assert result.name == person.name
+
+ address = Address(city="NYC", country="USA")
+ data = fory.serialize(address)
+ result = fory.deserialize(data)
+ assert result.city == address.city
+
+
+def test_thread_safe_fory_xlang_mode():
+ fory = ThreadSafeFory(xlang=True, ref=True)
+ fory.register(Person)
+
+ person = Person(name="Charlie", age=35)
+ data = fory.serialize(person)
+ result = fory.deserialize(data)
+
+ assert result.name == person.name
+ assert result.age == person.age
+
+
+def test_thread_safe_fory_dumps_loads():
+ fory = ThreadSafeFory()
+ fory.register(Person)
+
+ person = Person(name="Dave", age=40)
+ data = fory.dumps(person)
+ result = fory.loads(data)
+
+ assert result.name == person.name
+ assert result.age == person.age
+
+
+def test_thread_safe_fory_ref_tracking():
+ fory = ThreadSafeFory(ref=True)
+ fory.register(Person)
+
+ person = Person(name="Eve", age=28)
+ data = [person, person]
+ serialized = fory.serialize(data)
+ result = fory.deserialize(serialized)
+
+ assert len(result) == 2
+ assert result[0].name == person.name
+ assert result[1].name == person.name
+
+
+def test_thread_safe_fory_cross_thread_registration():
+ fory = ThreadSafeFory()
+ fory.register(Person)
+ fory.register(Address)
+
+ results = []
+ errors = []
+
+ def serialize_data(thread_id):
+ try:
+ person = Person(name=f"User{thread_id}", age=25)
+ data = fory.serialize(person)
+ result = fory.deserialize(data)
+ results.append(result)
+ except Exception as e:
+ errors.append((thread_id, e))
+
+ threads = []
+ for i in range(5):
+ t = threading.Thread(target=serialize_data, args=(i,))
+ threads.append(t)
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ assert len(errors) == 0
+ assert len(results) == 5
+
+
+def test_thread_safe_fory_register_after_use():
+ fory = ThreadSafeFory()
+ fory.register(Person)
+
+ person = Person(name="Alice", age=30)
+ fory.serialize(person)
+
+ try:
+ fory.register(Address)
+ assert False, "Should raise RuntimeError"
+ except RuntimeError as e:
+ assert "Cannot register types after Fory instances have been created"
in str(e)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]