This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new a0f3fe448 fix(java): fix race condition in blocking queue serializers
(#2956)
a0f3fe448 is described below
commit a0f3fe448f552d7f67a7ad165da59122c80d87c0
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 23:32:24 2025 +0800
fix(java): fix race condition in blocking queue serializers (#2956)
## What does this PR do?
This PR fixes the race condition concern raised in PR #2955 review by
maintainer @chaokunyang.
## Problem
The original implementation calculated capacity using:
```java
int capacity = abq.remainingCapacity() + abq.size();
```
This could cause race conditions when the queue is being modified
concurrently between the two calls.
## Solution
Use reflection to directly read the capacity field:
- For `ArrayBlockingQueue`: Read `items` array length (capacity is array
length)
- For `LinkedBlockingQueue`: Read `capacity` field directly
This follows the same pattern used by `ArraysAsListSerializer` which
uses `Platform.objectFieldOffset()` for safe field access.
## Related issues
Follow-up to #2955 addressing:
https://github.com/apache/fory/pull/2955#pullrequestreview-2508169408
## Does this PR introduce any user-facing change?
No - this is an internal implementation fix.
## Benchmark
N/A - no performance impact expected, both approaches are O(1).
---
.../collection/CollectionSerializers.java | 49 ++++++++++++++++++----
1 file changed, 41 insertions(+), 8 deletions(-)
diff --git
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
index 9169334a6..709ea6474 100644
---
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
+++
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
@@ -702,15 +702,33 @@ public class CollectionSerializers {
public static class ArrayBlockingQueueSerializer
extends ConcurrentCollectionSerializer<ArrayBlockingQueue> {
+ // Use reflection to get the items array length which represents the
capacity.
+ // This avoids race conditions when reading remainingCapacity() and size()
separately.
+ private static final long ITEMS_OFFSET;
+
+ static {
+ try {
+ Field itemsField = ArrayBlockingQueue.class.getDeclaredField("items");
+ ITEMS_OFFSET = Platform.objectFieldOffset(itemsField);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public ArrayBlockingQueueSerializer(Fory fory, Class<ArrayBlockingQueue>
cls) {
super(fory, cls, true);
}
+ private static int getCapacity(ArrayBlockingQueue queue) {
+ Object[] items = (Object[]) Platform.getObject(queue, ITEMS_OFFSET);
+ return items.length;
+ }
+
@Override
public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer,
ArrayBlockingQueue value) {
+ // Read capacity before creating snapshot to ensure consistency
+ int capacity = getCapacity(value);
CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
- // Write the capacity (remaining capacity + current size = total
capacity)
- int capacity = value.remainingCapacity() + value.size();
buffer.writeVarUint32Small7(capacity);
return snapshot;
}
@@ -728,8 +746,7 @@ public class CollectionSerializers {
@Override
public Collection newCollection(Collection collection) {
ArrayBlockingQueue abq = (ArrayBlockingQueue) collection;
- // Capacity is remaining + current size
- int capacity = abq.remainingCapacity() + abq.size();
+ int capacity = getCapacity(abq);
return new ArrayBlockingQueue<>(capacity);
}
}
@@ -743,15 +760,32 @@ public class CollectionSerializers {
public static class LinkedBlockingQueueSerializer
extends ConcurrentCollectionSerializer<LinkedBlockingQueue> {
+ // Use reflection to get the capacity field directly.
+ // This avoids race conditions when reading remainingCapacity() and size()
separately.
+ private static final long CAPACITY_OFFSET;
+
+ static {
+ try {
+ Field capacityField =
LinkedBlockingQueue.class.getDeclaredField("capacity");
+ CAPACITY_OFFSET = Platform.objectFieldOffset(capacityField);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public LinkedBlockingQueueSerializer(Fory fory, Class<LinkedBlockingQueue>
cls) {
super(fory, cls, true);
}
+ private static int getCapacity(LinkedBlockingQueue queue) {
+ return Platform.getInt(queue, CAPACITY_OFFSET);
+ }
+
@Override
public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer,
LinkedBlockingQueue value) {
+ // Read capacity before creating snapshot to ensure consistency
+ int capacity = getCapacity(value);
CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
- // Write the capacity (remaining capacity + current size = total
capacity)
- int capacity = value.remainingCapacity() + value.size();
buffer.writeVarUint32Small7(capacity);
return snapshot;
}
@@ -769,8 +803,7 @@ public class CollectionSerializers {
@Override
public Collection newCollection(Collection collection) {
LinkedBlockingQueue lbq = (LinkedBlockingQueue) collection;
- // Capacity is remaining + current size
- int capacity = lbq.remainingCapacity() + lbq.size();
+ int capacity = getCapacity(lbq);
return new LinkedBlockingQueue<>(capacity);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]