This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new a32710544 feat(java): add optimized serializers for blocking queues
(#2955)
a32710544 is described below
commit a32710544d092dd68e250e3f6574292eaa5d37d4
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 22:00:53 2025 +0800
feat(java): add optimized serializers for blocking queues (#2955)
## Why?
The JDK default serialization for `ArrayBlockingQueue` and
`LinkedBlockingQueue` is inefficient and produces warnings like:
```text
WARN ObjectStreamSerializer - class java.util.concurrent.ArrayBlockingQueue
customized jdk serialization, which is inefficient.
```
These blocking queues are commonly used in concurrent applications, and
optimized serializers will improve performance.
## What does this PR do?
Add dedicated serializers for `java.util.concurrent.ArrayBlockingQueue`
and `java.util.concurrent.LinkedBlockingQueue`:
- `ArrayBlockingQueueSerializer`: Extends
`ConcurrentCollectionSerializer` to handle thread-safe serialization
with capacity preservation.
- `LinkedBlockingQueueSerializer`: Extends
`ConcurrentCollectionSerializer` to handle thread-safe serialization
with capacity preservation.
Both serializers:
- Use `CollectionSnapshot` for safe concurrent iteration.
- Preserve queue capacity during serialization/deserialization.
- Support both serialization and copy operations.
- Are registered as default serializers in
`registerDefaultSerializers()`.
## Related issues
- Fixes #2663 (partial - blocking queue serializers)
## Does this PR introduce any user-facing change?
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
No public API changes. Users will automatically benefit from optimized
serialization for `ArrayBlockingQueue` and `LinkedBlockingQueue`.
## Benchmark
Not applicable - This PR replaces the slow JDK `ObjectStreamSerializer`
with a specialized binary serializer, which is a well-known performance
improvement pattern already used for other collection types in this
codebase.
---
.../collection/CollectionSerializers.java | 90 ++++++++++++++++++++++
.../fory-core/native-image.properties | 2 +
.../apache/fory/resolver/ClassResolverTest.java | 12 +--
.../collection/CollectionSerializersTest.java | 21 ++---
4 files changed, 107 insertions(+), 18 deletions(-)
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
index 8265b2084..9169334a6 100644
---
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
@@ -42,9 +42,11 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.Vector;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.fory.Fory;
import org.apache.fory.collection.CollectionSnapshot;
import org.apache.fory.exception.ForyException;
@@ -690,6 +692,89 @@ public class CollectionSerializers {
}
}
+ /**
+ * Serializer for {@link ArrayBlockingQueue}.
+ *
+ * <p>This serializer handles the capacity field which is essential for
ArrayBlockingQueue since
+ * it's a bounded queue. The capacity is stored in the items array length
and needs to be
+ * preserved during serialization.
+ */
+ public static class ArrayBlockingQueueSerializer
+ extends ConcurrentCollectionSerializer<ArrayBlockingQueue> {
+
+ public ArrayBlockingQueueSerializer(Fory fory, Class<ArrayBlockingQueue>
cls) {
+ super(fory, cls, true);
+ }
+
+ @Override
+ public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer,
ArrayBlockingQueue value) {
+ CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
+ // Write the capacity (remaining capacity + current size = total
capacity)
+ int capacity = value.remainingCapacity() + value.size();
+ buffer.writeVarUint32Small7(capacity);
+ return snapshot;
+ }
+
+ @Override
+ public ArrayBlockingQueue newCollection(MemoryBuffer buffer) {
+ int numElements = buffer.readVarUint32Small7();
+ setNumElements(numElements);
+ int capacity = buffer.readVarUint32Small7();
+ ArrayBlockingQueue queue = new ArrayBlockingQueue<>(capacity);
+ fory.getRefResolver().reference(queue);
+ return queue;
+ }
+
+ @Override
+ public Collection newCollection(Collection collection) {
+ ArrayBlockingQueue abq = (ArrayBlockingQueue) collection;
+ // Capacity is remaining + current size
+ int capacity = abq.remainingCapacity() + abq.size();
+ return new ArrayBlockingQueue<>(capacity);
+ }
+ }
+
+ /**
+ * Serializer for {@link LinkedBlockingQueue}.
+ *
+ * <p>This serializer handles the capacity field which is essential for
LinkedBlockingQueue since
+ * it can be a bounded queue. The capacity needs to be preserved during
serialization.
+ */
+ public static class LinkedBlockingQueueSerializer
+ extends ConcurrentCollectionSerializer<LinkedBlockingQueue> {
+
+ public LinkedBlockingQueueSerializer(Fory fory, Class<LinkedBlockingQueue>
cls) {
+ super(fory, cls, true);
+ }
+
+ @Override
+ public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer,
LinkedBlockingQueue value) {
+ CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
+ // Write the capacity (remaining capacity + current size = total
capacity)
+ int capacity = value.remainingCapacity() + value.size();
+ buffer.writeVarUint32Small7(capacity);
+ return snapshot;
+ }
+
+ @Override
+ public LinkedBlockingQueue newCollection(MemoryBuffer buffer) {
+ int numElements = buffer.readVarUint32Small7();
+ setNumElements(numElements);
+ int capacity = buffer.readVarUint32Small7();
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<>(capacity);
+ fory.getRefResolver().reference(queue);
+ return queue;
+ }
+
+ @Override
+ public Collection newCollection(Collection collection) {
+ LinkedBlockingQueue lbq = (LinkedBlockingQueue) collection;
+ // Capacity is remaining + current size
+ int capacity = lbq.remainingCapacity() + lbq.size();
+ return new LinkedBlockingQueue<>(capacity);
+ }
+ }
+
/**
* Java serializer to serialize all fields of a collection implementation.
Note that this
* serializer won't use element generics and doesn't support JIT,
performance won't be the best,
@@ -873,6 +958,11 @@ public class CollectionSerializers {
resolver.registerSerializer(BitSet.class, new BitSetSerializer(fory,
BitSet.class));
resolver.registerSerializer(
PriorityQueue.class, new PriorityQueueSerializer(fory,
PriorityQueue.class));
+ resolver.registerSerializer(
+ ArrayBlockingQueue.class, new ArrayBlockingQueueSerializer(fory,
ArrayBlockingQueue.class));
+ resolver.registerSerializer(
+ LinkedBlockingQueue.class,
+ new LinkedBlockingQueueSerializer(fory, LinkedBlockingQueue.class));
resolver.registerSerializer(
CopyOnWriteArrayList.class,
new CopyOnWriteArrayListSerializer(fory, CopyOnWriteArrayList.class));
diff --git
a/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
b/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
index e65de39ac..6994105a2 100644
---
a/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
+++
b/java/fory-core/src/main/resources/META-INF/native-image/org.apache.fory/fory-core/native-image.properties
@@ -387,6 +387,7 @@
Args=--initialize-at-build-time=org.apache.fory.memory.MemoryBuffer,\
org.apache.fory.serializer.TimeSerializers$ZoneOffsetSerializer,\
org.apache.fory.serializer.TimeSerializers$ZonedDateTimeSerializer,\
org.apache.fory.serializer.collection.CollectionSerializer,\
+
org.apache.fory.serializer.collection.CollectionSerializers$ArrayBlockingQueueSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$ArrayDequeSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$ArrayListSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$ArraysAsListSerializer,\
@@ -402,6 +403,7 @@
Args=--initialize-at-build-time=org.apache.fory.memory.MemoryBuffer,\
org.apache.fory.serializer.collection.CollectionSerializers$EnumSetSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$HashSetSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$JDKCompatibleCollectionSerializer,\
+
org.apache.fory.serializer.collection.CollectionSerializers$LinkedBlockingQueueSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$LinkedHashSetSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$PriorityQueueSerializer,\
org.apache.fory.serializer.collection.CollectionSerializers$SetFromMapSerializer,\
diff --git
a/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
b/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
index d0790e836..7888318f6 100644
---
a/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
+++
b/java/fory-core/src/test/java/org/apache/fory/resolver/ClassResolverTest.java
@@ -165,15 +165,9 @@ public class ClassResolverTest extends ForyTestBase {
assertEquals(
classResolver.getSerializerClass(TreeMap.class),
MapSerializers.SortedMapSerializer.class);
- if (ClassResolver.requireJavaSerialization(ArrayBlockingQueue.class)) {
- assertEquals(
- classResolver.getSerializerClass(ArrayBlockingQueue.class),
- CollectionSerializers.JDKCompatibleCollectionSerializer.class);
- } else {
- assertEquals(
- classResolver.getSerializerClass(ArrayBlockingQueue.class),
- CollectionSerializers.DefaultJavaCollectionSerializer.class);
- }
+ assertEquals(
+ classResolver.getSerializerClass(ArrayBlockingQueue.class),
+ CollectionSerializers.ArrayBlockingQueueSerializer.class);
assertEquals(
classResolver.getSerializerClass(ConcurrentHashMap.class),
MapSerializers.ConcurrentHashMapSerializer.class);
diff --git
a/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
b/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
index fd517e22d..552f631a7 100644
---
a/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
+++
b/java/fory-core/src/test/java/org/apache/fory/serializer/collection/CollectionSerializersTest.java
@@ -474,23 +474,25 @@ public class CollectionSerializersTest extends
ForyTestBase {
.withRefTracking(true)
.requireClassRegistration(false)
.build();
- // TODO(chaokunyang) add optimized serializers for blocking queue.
{
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
queue.add(1);
queue.add(2);
queue.add(3);
- assertEquals(new ArrayList<>(serDe(fory, queue)), new
ArrayList<>(queue));
+ ArrayBlockingQueue<Integer> deserialized = serDe(fory, queue);
+ assertEquals(new ArrayList<>(deserialized), new ArrayList<>(queue));
+ // Verify capacity is preserved
+ assertEquals(deserialized.remainingCapacity() + deserialized.size(), 10);
}
{
- // If reference tracking is off, deserialization will throw
- // `java.lang.IllegalMonitorStateException`
- // when using fory `ObjectStreamSerializer`, maybe some internal state
are shared.
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
queue.add(1);
queue.add(2);
queue.add(3);
- assertEquals(new ArrayList<>(serDe(fory, queue)), new
ArrayList<>(queue));
+ LinkedBlockingQueue<Integer> deserialized = serDe(fory, queue);
+ assertEquals(new ArrayList<>(deserialized), new ArrayList<>(queue));
+ // Verify capacity is preserved
+ assertEquals(deserialized.remainingCapacity() + deserialized.size(), 10);
}
}
@@ -503,17 +505,18 @@ public class CollectionSerializersTest extends
ForyTestBase {
queue.add(3);
ArrayBlockingQueue<Integer> copy = fory.copy(queue);
Assert.assertEquals(Arrays.toString(copy.toArray()), "[1, 2, 3]");
+ // Verify capacity is preserved
+ Assert.assertEquals(copy.remainingCapacity() + copy.size(), 10);
}
{
- // If reference tracking is off, deserialization will throw
- // `java.lang.IllegalMonitorStateException`
- // when using fory `ObjectStreamSerializer`, maybe some internal state
are shared.
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
queue.add(1);
queue.add(2);
queue.add(3);
LinkedBlockingQueue<Integer> copy = fory.copy(queue);
Assert.assertEquals(Arrays.toString(copy.toArray()), "[1, 2, 3]");
+ // Verify capacity is preserved
+ Assert.assertEquals(copy.remainingCapacity() + copy.size(), 10);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]