This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fury.git


The following commit(s) were added to refs/heads/main by this push:
     new 3af277ab feat(python):  chunk based map serialization in pure python 
(#2037)
3af277ab is described below

commit 3af277abc6f35aad57c5ebc308056bfcf228e3e5
Author: PAN <[email protected]>
AuthorDate: Thu Feb 13 10:37:30 2025 +0800

    feat(python):  chunk based map serialization in pure python (#2037)
    
    <!--
    **Thanks for contributing to Fury.**
    
    **If this is your first time opening a PR on fury, you can refer to
    
[CONTRIBUTING.md](https://github.com/apache/fury/blob/main/CONTRIBUTING.md).**
    
    Contribution Checklist
    
    - The **Apache Fury (incubating)** community has restrictions on the
    naming of pr titles. You can also find instructions in
    [CONTRIBUTING.md](https://github.com/apache/fury/blob/main/CONTRIBUTING.md).
    
    - Fury has a strong focus on performance. If the PR you submit will have
    an impact on performance, please benchmark it first and provide the
    benchmark result here.
    -->
    
    ## What does this PR do?
    
    I implemented chunk based map serialization in pure python, and there
    are some differences from cython. and, cython has strict type detection,
    here I added put_uint8 and other methods, and removed some fast path.
    
    <!-- Describe the purpose of this PR. -->
    
    ## Related issues
    #1935
    
    <!--
    Is there any related issue? Please attach here.
    
    - #xxxx0
    - #xxxx1
    - #xxxx2
    -->
    
    ## Does this PR introduce any user-facing change?
    
    <!--
    If any user-facing interface changes, please [open an
    issue](https://github.com/apache/fury/issues/new/choose) describing the
    need to do so and update the document if necessary.
    -->
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    ## Benchmark
    
    <!--
    When the PR has an impact on performance (if you don't know whether the
    PR will have an impact on performance, you can submit the PR first, and
    if it will have impact on performance, the code reviewer will explain
    it), be sure to attach a benchmark data here.
    -->
    
    ---------
    
    Co-authored-by: chaokunyang <[email protected]>
---
 ci/run_ci.sh                           |   6 +
 python/pyfury/_fury.py                 |   2 +-
 python/pyfury/_serializer.py           | 299 +++++++++++++++++++++++++++++----
 python/pyfury/_util.pxd                |   2 +
 python/pyfury/_util.pyx                |   4 +
 python/pyfury/includes/libutil.pxd     |   2 +
 python/pyfury/tests/test_serializer.py |  28 +++
 7 files changed, 305 insertions(+), 38 deletions(-)

diff --git a/ci/run_ci.sh b/ci/run_ci.sh
index 1c96cafb..c5126240 100755
--- a/ci/run_ci.sh
+++ b/ci/run_ci.sh
@@ -259,6 +259,12 @@ case $1 in
         exit $testcode
       fi
       echo "Executing fury python tests succeeds"
+      ENABLE_FURY_CYTHON_SERIALIZATION=0 pytest -v -s --durations=60 
pyfury/tests
+      testcode=$?
+      if [[ $testcode -ne 0 ]]; then
+        exit $testcode
+      fi
+      echo "Executing fury python tests succeeds"
     ;;
     go)
       echo "Executing fury go tests for go"
diff --git a/python/pyfury/_fury.py b/python/pyfury/_fury.py
index 4a243948..b7ff6f04 100644
--- a/python/pyfury/_fury.py
+++ b/python/pyfury/_fury.py
@@ -428,7 +428,7 @@ class Fury:
     def read_buffer_object(self, buffer) -> Buffer:
         in_band = buffer.read_bool()
         if in_band:
-            size = buffer.read_varint32()
+            size = buffer.read_varuint32()
             buf = buffer.slice(buffer.reader_index, size)
             buffer.reader_index += size
             return buf
diff --git a/python/pyfury/_serializer.py b/python/pyfury/_serializer.py
index b0f4379f..5bb1badd 100644
--- a/python/pyfury/_serializer.py
+++ b/python/pyfury/_serializer.py
@@ -21,9 +21,9 @@ from abc import ABC, abstractmethod
 from typing import Dict, Iterable, Any
 
 from pyfury._fury import (
-    NOT_NULL_STRING_FLAG,
     NOT_NULL_INT64_FLAG,
     NOT_NULL_BOOL_FLAG,
+    NOT_NULL_STRING_FLAG,
 )
 from pyfury.resolver import NOT_NULL_VALUE_FLAG, NULL_FLAG
 from pyfury.type import is_primitive_type
@@ -35,6 +35,36 @@ except ImportError:
 
 logger = logging.getLogger(__name__)
 
+MAX_CHUNK_SIZE = 255
+# Whether track key ref.
+TRACKING_KEY_REF = 0b1
+# Whether key has null.
+KEY_HAS_NULL = 0b10
+# Whether key is not declare type.
+KEY_DECL_TYPE = 0b100
+# Whether track value ref.
+TRACKING_VALUE_REF = 0b1000
+# Whether value has null.
+VALUE_HAS_NULL = 0b10000
+# Whether value is not declare type.
+VALUE_DECL_TYPE = 0b100000
+# When key or value is null that entry will be serialized as a new chunk with 
size 1.
+# In such cases, chunk size will be skipped writing.
+# Both key and value are null.
+KV_NULL = KEY_HAS_NULL | VALUE_HAS_NULL
+# Key is null, value type is declared type, and ref tracking for value is 
disabled.
+NULL_KEY_VALUE_DECL_TYPE = KEY_HAS_NULL | VALUE_DECL_TYPE
+# Key is null, value type is declared type, and ref tracking for value is 
enabled.
+NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF = (
+    KEY_HAS_NULL | VALUE_DECL_TYPE | TRACKING_VALUE_REF
+)
+# Value is null, key type is declared type, and ref tracking for key is 
disabled.
+NULL_VALUE_KEY_DECL_TYPE = VALUE_HAS_NULL | KEY_DECL_TYPE
+# Value is null, key type is declared type, and ref tracking for key is 
enabled.
+NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF = (
+    VALUE_HAS_NULL | KEY_DECL_TYPE | TRACKING_VALUE_REF
+)
+
 
 class Serializer(ABC):
     __slots__ = "fury", "type_", "need_to_write_ref"
@@ -280,13 +310,6 @@ class SetSerializer(CollectionSerializer):
 
 
 class MapSerializer(Serializer):
-    __slots__ = (
-        "class_resolver",
-        "ref_resolver",
-        "key_serializer",
-        "value_serializer",
-    )
-
     def __init__(self, fury, type_, key_serializer=None, 
value_serializer=None):
         super().__init__(fury, type_)
         self.class_resolver = fury.class_resolver
@@ -294,39 +317,241 @@ class MapSerializer(Serializer):
         self.key_serializer = key_serializer
         self.value_serializer = value_serializer
 
-    def write(self, buffer, value: Dict):
-        buffer.write_varuint32(len(value))
-        for k, v in value.items():
-            key_cls = type(k)
-            if key_cls is str:
-                buffer.write_int16(NOT_NULL_STRING_FLAG)
-                buffer.write_string(k)
+    def write(self, buffer, o):
+        obj = o
+        length = len(obj)
+        buffer.write_varuint32(length)
+        if length == 0:
+            return
+        fury = self.fury
+        class_resolver = fury.class_resolver
+        ref_resolver = fury.ref_resolver
+        key_serializer = self.key_serializer
+        value_serializer = self.value_serializer
+
+        items_iter = iter(obj.items())
+        key, value = next(items_iter)
+        has_next = True
+        while has_next:
+
+            while True:
+                if key is not None:
+                    if value is not None:
+                        break
+                    if key_serializer is not None:
+                        if key_serializer.need_to_write_ref:
+                            
buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF)
+                            if not ref_resolver.write_ref_or_null(buffer, key):
+                                key_serializer.write(buffer, key)
+                        else:
+                            buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE)
+                            key_serializer.write(buffer, key)
+                    else:
+                        buffer.write_int8(VALUE_HAS_NULL | TRACKING_KEY_REF)
+                        fury.serialize_ref(buffer, key)
+                else:
+                    if value is not None:
+                        if value_serializer is not None:
+                            if value_serializer.need_to_write_ref:
+                                
buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF)
+                                if not ref_resolver.write_ref_or_null(buffer, 
key):
+                                    value_serializer.write(buffer, key)
+                                if not ref_resolver.write_ref_or_null(buffer, 
value):
+                                    value_serializer.write(buffer, value)
+                            else:
+                                buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE)
+                                value_serializer.write(buffer, value)
+                        else:
+                            buffer.write_int8(KEY_HAS_NULL | 
TRACKING_VALUE_REF)
+                            fury.serialize_ref(buffer, value)
+                    else:
+                        buffer.write_int8(KV_NULL)
+                try:
+                    key, value = next(items_iter)
+                except StopIteration:
+                    has_next = False
+                    break
+
+            if not has_next:
+                break
+
+            key_cls = type(key)
+            value_cls = type(value)
+            buffer.write_int16(-1)
+            chunk_size_offset = buffer.writer_index - 1
+            chunk_header = 0
+
+            if key_serializer is not None:
+                chunk_header |= KEY_DECL_TYPE
             else:
-                if not self.ref_resolver.write_ref_or_null(buffer, k):
-                    classinfo = self.class_resolver.get_classinfo(key_cls)
-                    self.class_resolver.write_typeinfo(buffer, classinfo)
-                    classinfo.serializer.write(buffer, k)
-            value_cls = type(v)
-            if value_cls is str:
-                buffer.write_int16(NOT_NULL_STRING_FLAG)
-                buffer.write_string(v)
-            elif value_cls is int:
-                buffer.write_int16(NOT_NULL_INT64_FLAG)
-                buffer.write_varint64(v)
+                key_classinfo = self.class_resolver.get_classinfo(key_cls)
+                class_resolver.write_typeinfo(buffer, key_classinfo)
+                key_serializer = key_classinfo.serializer
+
+            if value_serializer is not None:
+                chunk_header |= VALUE_DECL_TYPE
             else:
-                if not self.ref_resolver.write_ref_or_null(buffer, v):
-                    classinfo = self.class_resolver.get_classinfo(value_cls)
-                    self.class_resolver.write_typeinfo(buffer, classinfo)
-                    classinfo.serializer.write(buffer, v)
+                value_classinfo = self.class_resolver.get_classinfo(value_cls)
+                class_resolver.write_typeinfo(buffer, value_classinfo)
+                value_serializer = value_classinfo.serializer
+
+            key_write_ref = (
+                key_serializer.need_to_write_ref if key_serializer else False
+            )
+            value_write_ref = (
+                value_serializer.need_to_write_ref if value_serializer else 
False
+            )
+            if key_write_ref:
+                chunk_header |= TRACKING_KEY_REF
+            if value_write_ref:
+                chunk_header |= TRACKING_VALUE_REF
+
+            buffer.put_uint8(chunk_size_offset - 1, chunk_header)
+            chunk_size = 0
+
+            while chunk_size < MAX_CHUNK_SIZE:
+
+                if (
+                    key is None
+                    or value is None
+                    or type(key) is not key_cls
+                    or type(value) is not value_cls
+                ):
+                    break
+                if not key_write_ref or not 
ref_resolver.write_ref_or_null(buffer, key):
+                    key_serializer.write(buffer, key)
+                if not value_write_ref or not ref_resolver.write_ref_or_null(
+                    buffer, value
+                ):
+                    value_serializer.write(buffer, value)
+
+                chunk_size += 1
+
+                try:
+                    key, value = next(items_iter)
+                except StopIteration:
+                    has_next = False
+                    break
+
+            key_serializer = self.key_serializer
+            value_serializer = self.value_serializer
+            buffer.put_uint8(chunk_size_offset, chunk_size)
 
     def read(self, buffer):
-        len_ = buffer.read_varuint32()
-        map_ = self.type_()
-        self.fury.ref_resolver.reference(map_)
-        for i in range(len_):
-            k = self.fury.deserialize_ref(buffer)
-            v = self.fury.deserialize_ref(buffer)
-            map_[k] = v
+        fury = self.fury
+        ref_resolver = self.ref_resolver
+        class_resolver = self.class_resolver
+        size = buffer.read_varuint32()
+        map_ = {}
+        ref_resolver.reference(map_)
+        chunk_header = 0
+        if size != 0:
+            chunk_header = buffer.read_uint8()
+        key_serializer, value_serializer = None, None
+
+        while size > 0:
+            while True:
+                key_has_null = (chunk_header & KEY_HAS_NULL) != 0
+                value_has_null = (chunk_header & VALUE_HAS_NULL) != 0
+                if not key_has_null:
+                    if not value_has_null:
+                        break
+                    else:
+                        track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0
+                        if (chunk_header & KEY_DECL_TYPE) != 0:
+                            if track_key_ref:
+                                ref_id = 
ref_resolver.try_preserve_ref_id(buffer)
+                                if ref_id < NOT_NULL_VALUE_FLAG:
+                                    key = ref_resolver.get_read_object()
+                                else:
+                                    key = key_serializer.read(buffer)
+                                    ref_resolver.set_read_object(ref_id, key)
+                            else:
+                                key = key_serializer.read(buffer)
+                        else:
+                            key = fury.deserialize_ref(buffer)
+                        map_[key] = None
+                else:
+                    if not value_has_null:
+                        track_value_ref = (chunk_header & TRACKING_VALUE_REF) 
!= 0
+                        if (chunk_header & VALUE_DECL_TYPE) != 0:
+                            if track_value_ref:
+                                ref_id = 
ref_resolver.try_preserve_ref_id(buffer)
+                                if ref_id < NOT_NULL_VALUE_FLAG:
+                                    value = ref_resolver.get_read_object()
+                                else:
+                                    value = value_serializer.read(buffer)
+                                    ref_resolver.set_read_object(ref_id, value)
+                        else:
+                            value = fury.deserialize_ref(buffer)
+                        map_[None] = value
+                    else:
+                        map_[None] = None
+                size -= 1
+                if size == 0:
+                    return map_
+                else:
+                    chunk_header = buffer.read_uint8()
+
+            track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0
+            track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0
+            key_is_declared_type = (chunk_header & KEY_DECL_TYPE) != 0
+            value_is_declared_type = (chunk_header & VALUE_DECL_TYPE) != 0
+            chunk_size = buffer.read_uint8()
+            if not key_is_declared_type:
+                key_serializer = 
class_resolver.read_typeinfo(buffer).serializer
+            if not value_is_declared_type:
+                value_serializer = 
class_resolver.read_typeinfo(buffer).serializer
+            key_serializer_type = type(key_serializer)
+            value_serializer_type = type(value_serializer)
+            for i in range(chunk_size):
+                if track_key_ref:
+                    ref_id = ref_resolver.try_preserve_ref_id(buffer)
+                    if ref_id < NOT_NULL_VALUE_FLAG:
+                        key = ref_resolver.get_read_object()
+                    else:
+                        key = key_serializer.read(buffer)
+                        ref_resolver.set_read_object(ref_id, key)
+                else:
+                    if key_serializer_type is StringSerializer:
+                        key = buffer.read_string()
+                    elif key_serializer_type is Int64Serializer:
+                        key = buffer.read_varint64()
+                    elif key_serializer_type is Float64Serializer:
+                        key = buffer.read_double()
+                    elif key_serializer_type is Int32Serializer:
+                        key = buffer.read_varint32()
+                    elif key_serializer_type is Float32Serializer:
+                        key = buffer.read_float()
+                    else:
+                        key = key_serializer.read(buffer)
+                if track_value_ref:
+                    ref_id = ref_resolver.try_preserve_ref_id(buffer)
+                    if ref_id < NOT_NULL_VALUE_FLAG:
+                        value = ref_resolver.get_read_object()
+                    else:
+                        value = value_serializer.read(buffer)
+                        ref_resolver.set_read_object(ref_id, value)
+                else:
+                    if value_serializer_type is StringSerializer:
+                        value = buffer.read_string()
+                    elif value_serializer_type is Int64Serializer:
+                        value = buffer.read_varint64()
+                    elif value_serializer_type is Float64Serializer:
+                        value = buffer.read_double()
+                    elif value_serializer_type is Int32Serializer:
+                        value = buffer.read_varint32()
+                    elif value_serializer_type is Float32Serializer:
+                        value = buffer.read_float()
+                    elif value_serializer_type is BooleanSerializer:
+                        value = buffer.read_bool()
+                    else:
+                        value = value_serializer.read(buffer)
+                map_[key] = value
+                size -= 1
+            if size != 0:
+                chunk_header = buffer.read_uint8()
+
         return map_
 
     def xwrite(self, buffer, value: Dict):
diff --git a/python/pyfury/_util.pxd b/python/pyfury/_util.pxd
index 89facbb0..6facd236 100644
--- a/python/pyfury/_util.pxd
+++ b/python/pyfury/_util.pxd
@@ -60,6 +60,8 @@ cdef class Buffer:
 
     cpdef inline put_bool(self, uint32_t offset, c_bool v)
 
+    cpdef inline put_uint8(self, uint32_t offset, uint8_t v)
+
     cpdef inline put_int8(self, uint32_t offset, int8_t v)
 
     cpdef inline put_int16(self, uint32_t offset, int16_t v)
diff --git a/python/pyfury/_util.pyx b/python/pyfury/_util.pyx
index 73c9fb31..ca9bf707 100644
--- a/python/pyfury/_util.pyx
+++ b/python/pyfury/_util.pyx
@@ -87,6 +87,10 @@ cdef class Buffer:
         self.check_bound(offset, <int32_t>1)
         self.c_buffer.get().UnsafePutByte(offset, v)
 
+    cpdef inline put_uint8(self, uint32_t offset, uint8_t v):
+        self.check_bound(offset, <int32_t>1)
+        self.c_buffer.get().UnsafePutByte(offset, v)
+
     cpdef inline put_int8(self, uint32_t offset, int8_t v):
         self.check_bound(offset, <int32_t>1)
         self.c_buffer.get().UnsafePutByte(offset, v)
diff --git a/python/pyfury/includes/libutil.pxd 
b/python/pyfury/includes/libutil.pxd
index 72a64003..c837cf5b 100644
--- a/python/pyfury/includes/libutil.pxd
+++ b/python/pyfury/includes/libutil.pxd
@@ -53,6 +53,8 @@ cdef extern from "fury/util/buffer.h" namespace "fury" nogil:
 
         inline void UnsafePutByte(uint32_t offset, c_bool)
 
+        inline void UnsafePutByte(uint32_t offset, uint8_t)
+
         inline void UnsafePutByte(uint32_t offset, int8_t)
 
         inline void UnsafePut(uint32_t offset, int16_t)
diff --git a/python/pyfury/tests/test_serializer.py 
b/python/pyfury/tests/test_serializer.py
index 7b02674f..07538e60 100644
--- a/python/pyfury/tests/test_serializer.py
+++ b/python/pyfury/tests/test_serializer.py
@@ -613,5 +613,33 @@ def test_function():
     assert df_sum().equals(df.sum())
 
 
+@dataclass(unsafe_hash=True)
+class MapFields:
+    simple_dict: dict = None
+    empty_dict: dict = None
+    large_dict: dict = None
+
+
+def test_map_fields_chunk_serializer():
+    fury = Fury(
+        language=Language.PYTHON, ref_tracking=True, 
require_class_registration=False
+    )
+
+    simple_dict = {"a": 1, "b": 2, "c": 3}
+    empty_dict = {}
+    large_dict = {f"key{i}": i for i in range(1000)}
+
+    # MapSerializer test
+    map_fields_object = MapFields(
+        simple_dict=simple_dict, empty_dict=empty_dict, large_dict=large_dict
+    )
+
+    serialized = fury.serialize(map_fields_object)
+    deserialized = fury.deserialize(serialized)
+    assert map_fields_object.simple_dict == deserialized.simple_dict
+    assert map_fields_object.empty_dict == deserialized.empty_dict
+    assert map_fields_object.large_dict == deserialized.large_dict
+
+
 if __name__ == "__main__":
     test_string()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to