This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git


The following commit(s) were added to refs/heads/main by this push:
     new a32710544 feat(java): add optimized serializers for blocking queues 
(#2955)
a32710544 is described below

commit a32710544d092dd68e250e3f6574292eaa5d37d4
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 22:00:53 2025 +0800

    feat(java): add optimized serializers for blocking queues (#2955)
    
    ## Why?
    
    The JDK default serialization for `ArrayBlockingQueue` and
    `LinkedBlockingQueue` is inefficient and produces warnings like:
    
    ```text
    WARN ObjectStreamSerializer - class java.util.concurrent.ArrayBlockingQueue 
customized jdk serialization, which is inefficient.
    ```
    
    These blocking queues are commonly used in concurrent applications, and
    optimized serializers will improve performance.
    
    ## What does this PR do?
    
    Add dedicated serializers for `java.util.concurrent.ArrayBlockingQueue`
    and `java.util.concurrent.LinkedBlockingQueue`:
    
    - `ArrayBlockingQueueSerializer`: Extends
    `ConcurrentCollectionSerializer` to handle thread-safe serialization
    with capacity preservation.
    - `LinkedBlockingQueueSerializer`: Extends
    `ConcurrentCollectionSerializer` to handle thread-safe serialization
    with capacity preservation.
    
    Both serializers:
    - Use `CollectionSnapshot` for safe concurrent iteration.
    - Preserve queue capacity during serialization/deserialization.
    - Support both serialization and copy operations.
    - Are registered as default serializers in
    `registerDefaultSerializers()`.
    
    ## Related issues
    
    - Fixes #2663 (partial - blocking queue serializers)
    
    ## Does this PR introduce any user-facing change?
    
    - [ ] Does this PR introduce any public API change?
    - [ ] Does this PR introduce any binary protocol compatibility change?
    
    No public API changes. Users will automatically benefit from optimized
    serialization for `ArrayBlockingQueue` and `LinkedBlockingQueue`.
    
    ## Benchmark
    
    Not applicable - This PR replaces the slow JDK `ObjectStreamSerializer`
    with a specialized binary serializer, which is a well-known performance
    improvement pattern already used for other collection types in this
    codebase.
---
 .../collection/CollectionSerializers.java          | 90 ++++++++++++++++++++++
 .../fory-core/native-image.properties              |  2 +
 .../apache/fory/resolver/ClassResolverTest.java    | 12 +--
 .../collection/CollectionSerializersTest.java      | 21 ++---
 4 files changed, 107 insertions(+), 18 deletions(-)

diff --git 
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
 
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
index 8265b2084..9169334a6 100644
--- 
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
+++ 
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
@@ -42,9 +42,11 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.fory.Fory;
 import org.apache.fory.collection.CollectionSnapshot;
 import org.apache.fory.exception.ForyException;
@@ -690,6 +692,89 @@ public class CollectionSerializers {
     }
   }
 
+  /**
+   * Serializer for {@link ArrayBlockingQueue}.
+   *
+   * <p>This serializer handles the capacity field which is essential for 
ArrayBlockingQueue since
+   * it's a bounded queue. The capacity is stored in the items array length 
and needs to be
+   * preserved during serialization.
+   */
+  public static class ArrayBlockingQueueSerializer
+      extends ConcurrentCollectionSerializer<ArrayBlockingQueue> {
+
+    public ArrayBlockingQueueSerializer(Fory fory, Class<ArrayBlockingQueue> 
cls) {
+      super(fory, cls, true);
+    }
+
+    @Override
+    public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer, 
ArrayBlockingQueue value) {
+      CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
+      // Write the capacity (remaining capacity + current size = total 
capacity)
+      int capacity = value.remainingCapacity() + value.size();
+      buffer.writeVarUint32Small7(capacity);
+      return snapshot;
+    }
+
+    @Override
+    public ArrayBlockingQueue newCollection(MemoryBuffer buffer) {
+      int numElements = buffer.readVarUint32Small7();
+      setNumElements(numElements);
+      int capacity = buffer.readVarUint32Small7();
+      ArrayBlockingQueue queue = new ArrayBlockingQueue<>(capacity);
+      fory.getRefResolver().reference(queue);
+      return queue;
+    }
+
+    @Override
+    public Collection newCollection(Collection collection) {
+      ArrayBlockingQueue abq = (ArrayBlockingQueue) collection;
+      // Capacity is remaining + current size
+      int capacity = abq.remainingCapacity() + abq.size();
+      return new ArrayBlockingQueue<>(capacity);
+    }
+  }
+
+  /**
+   * Serializer for {@link LinkedBlockingQueue}.
+   *
+   * <p>This serializer handles the capacity field which is essential for 
LinkedBlockingQueue since
+   * it can be a bounded queue. The capacity needs to be preserved during 
serialization.
+   */
+  public static class LinkedBlockingQueueSerializer
+      extends ConcurrentCollectionSerializer<LinkedBlockingQueue> {
+
+    public LinkedBlockingQueueSerializer(Fory fory, Class<LinkedBlockingQueue> 
cls) {
+      super(fory, cls, true);
+    }
+
+    @Override
+    public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer, 
LinkedBlockingQueue value) {
+      CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
+      // Write the capacity (remaining capacity + current size = total 
capacity)
+      int capacity = value.remainingCapacity() + value.size();
+      buffer.writeVarUint32Small7(capacity);
+      return snapshot;
+    }
+
+    @Override
+    public LinkedBlockingQueue newCollection(MemoryBuffer buffer) {
+      int numElements = buffer.readVarUint32Small7();
+      setNumElements(numElements);
+      int capacity = buffer.readVarUint32Small7();
+      LinkedBlockingQueue queue = new LinkedBlockingQueue<>(capacity);
+      fory.getRefResolver().reference(queue);
+      return queue;
+    }
+
+    @Override
+    public Collection newCollection(Collection collection) {
+      LinkedBlockingQueue lbq = (LinkedBlockingQueue) collection;
+      // Capacity is remaining + current size
+      int capacity = lbq.remainingCapacity() + lbq.size();
+      return new LinkedBlockingQueue<>(capacity);
+    }
+  }
+
   /**
    * Java serializer to serialize all fields of a collection implementation. 
Note that this
    * serializer won't use element generics and doesn't support JIT, 
performance won't be the best,
@@ -873,6 +958,11 @@ public class CollectionSerializers {
     resolver.registerSerializer(BitSet.class, new BitSetSerializer(fory, 
BitSet.class));
     resolver.registerSerializer(
         PriorityQueue.class, new PriorityQueueSerializer(fory, 
PriorityQueue.class));
+    resolver.registerSerializer(
+        ArrayBlockingQueue.class, new ArrayBlockingQueueSerializer(fory, 
ArrayBlockingQueue.class));
+    resolver.registerSerializer(
+        LinkedBlockingQueue.class,
+        new LinkedBlockingQueueSerializer(fory, LinkedBlockingQueue.class));
     resolver.registerSerializer(
         CopyOnWriteArrayList.class,
         new CopyOnWriteArrayListSerializer(fory, CopyOnWriteArrayList.class));
diff --git 
a/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
 
b/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
index e65de39ac..6994105a2 100644
--- 
a/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
+++ 
b/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
@@ -387,6 +387,7 @@ 
Args=--initialize-at-build-time=org.apache.fory.memory.MemoryBuffer,\
     org.apache.fory.serializer.TimeSerializers$ZoneOffsetSerializer,\
     org.apache.fory.serializer.TimeSerializers$ZonedDateTimeSerializer,\
     org.apache.fory.serializer.collection.CollectionSerializer,\
+    
org.apache.fory.serializer.collection.CollectionSerializers$ArrayBlockingQueueSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$ArrayDequeSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$ArrayListSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$ArraysAsListSerializer,\
@@ -402,6 +403,7 @@ 
Args=--initialize-at-build-time=org.apache.fory.memory.MemoryBuffer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$EnumSetSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$HashSetSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$JDKCompatibleCollectionSerializer,\
+    
org.apache.fory.serializer.collection.CollectionSerializers$LinkedBlockingQueueSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$LinkedHashSetSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$PriorityQueueSerializer,\
     
org.apache.fory.serializer.collection.CollectionSerializers$SetFromMapSerializer,\
diff --git 
a/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java 
b/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
index d0790e836..7888318f6 100644
--- 
a/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
+++ 
b/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
@@ -165,15 +165,9 @@ public class ClassResolverTest extends ForyTestBase {
     assertEquals(
         classResolver.getSerializerClass(TreeMap.class), 
MapSerializers.SortedMapSerializer.class);
 
-    if (ClassResolver.requireJavaSerialization(ArrayBlockingQueue.class)) {
-      assertEquals(
-          classResolver.getSerializerClass(ArrayBlockingQueue.class),
-          CollectionSerializers.JDKCompatibleCollectionSerializer.class);
-    } else {
-      assertEquals(
-          classResolver.getSerializerClass(ArrayBlockingQueue.class),
-          CollectionSerializers.DefaultJavaCollectionSerializer.class);
-    }
+    assertEquals(
+        classResolver.getSerializerClass(ArrayBlockingQueue.class),
+        CollectionSerializers.ArrayBlockingQueueSerializer.class);
     assertEquals(
         classResolver.getSerializerClass(ConcurrentHashMap.class),
         MapSerializers.ConcurrentHashMapSerializer.class);
diff --git 
a/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
 
b/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
index fd517e22d..552f631a7 100644
--- 
a/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
+++ 
b/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
@@ -474,23 +474,25 @@ public class CollectionSerializersTest extends 
ForyTestBase {
             .withRefTracking(true)
             .requireClassRegistration(false)
             .build();
-    // TODO(chaokunyang) add optimized serializers for blocking queue.
     {
       ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
       queue.add(1);
       queue.add(2);
       queue.add(3);
-      assertEquals(new ArrayList<>(serDe(fory, queue)), new 
ArrayList<>(queue));
+      ArrayBlockingQueue<Integer> deserialized = serDe(fory, queue);
+      assertEquals(new ArrayList<>(deserialized), new ArrayList<>(queue));
+      // Verify capacity is preserved
+      assertEquals(deserialized.remainingCapacity() + deserialized.size(), 10);
     }
     {
-      // If reference tracking is off, deserialization will throw
-      // `java.lang.IllegalMonitorStateException`
-      // when using fory `ObjectStreamSerializer`, maybe some internal state 
are shared.
       LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
       queue.add(1);
       queue.add(2);
       queue.add(3);
-      assertEquals(new ArrayList<>(serDe(fory, queue)), new 
ArrayList<>(queue));
+      LinkedBlockingQueue<Integer> deserialized = serDe(fory, queue);
+      assertEquals(new ArrayList<>(deserialized), new ArrayList<>(queue));
+      // Verify capacity is preserved
+      assertEquals(deserialized.remainingCapacity() + deserialized.size(), 10);
     }
   }
 
@@ -503,17 +505,18 @@ public class CollectionSerializersTest extends 
ForyTestBase {
       queue.add(3);
       ArrayBlockingQueue<Integer> copy = fory.copy(queue);
       Assert.assertEquals(Arrays.toString(copy.toArray()), "[1, 2, 3]");
+      // Verify capacity is preserved
+      Assert.assertEquals(copy.remainingCapacity() + copy.size(), 10);
     }
     {
-      // If reference tracking is off, deserialization will throw
-      // `java.lang.IllegalMonitorStateException`
-      // when using fory `ObjectStreamSerializer`, maybe some internal state 
are shared.
       LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
       queue.add(1);
       queue.add(2);
       queue.add(3);
       LinkedBlockingQueue<Integer> copy = fory.copy(queue);
       Assert.assertEquals(Arrays.toString(copy.toArray()), "[1, 2, 3]");
+      // Verify capacity is preserved
+      Assert.assertEquals(copy.remainingCapacity() + copy.size(), 10);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to