This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git


The following commit(s) were added to refs/heads/main by this push:
     new a0f3fe448 fix(java): fix race condition in blocking queue serializers 
(#2956)
a0f3fe448 is described below

commit a0f3fe448f552d7f67a7ad165da59122c80d87c0
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 23:32:24 2025 +0800

    fix(java): fix race condition in blocking queue serializers (#2956)
    
    ## What does this PR do?
    This PR fixes the race condition concern raised in PR #2955 review by
    maintainer @chaokunyang.
    
    ## Problem
    The original implementation calculated capacity using:
    ```java
    int capacity = abq.remainingCapacity() + abq.size();
    ```
    This could cause race conditions when the queue is being modified
    concurrently between the two calls.
    
    ## Solution
    Use reflection to directly read the capacity field:
    - For `ArrayBlockingQueue`: Read `items` array length (capacity is array
    length)
    - For `LinkedBlockingQueue`: Read `capacity` field directly
    
    This follows the same pattern used by `ArraysAsListSerializer` which
    uses `Platform.objectFieldOffset()` for safe field access.
    
    ## Related issues
    Follow-up to #2955 addressing:
    https://github.com/apache/fory/pull/2955#pullrequestreview-2508169408
    
    ## Does this PR introduce any user-facing change?
    No - this is an internal implementation fix.
    
    ## Benchmark
    N/A - no performance impact expected, both approaches are O(1).
---
 .../collection/CollectionSerializers.java          | 49 ++++++++++++++++++----
 1 file changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
 
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
index 9169334a6..709ea6474 100644
--- 
a/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
+++ 
b/java/fory-core/src/main/java/org/apache/fory/serializer/collection/CollectionSerializers.java
@@ -702,15 +702,33 @@ public class CollectionSerializers {
   public static class ArrayBlockingQueueSerializer
       extends ConcurrentCollectionSerializer<ArrayBlockingQueue> {
 
+    // Use reflection to get the items array length which represents the 
capacity.
+    // This avoids race conditions when reading remainingCapacity() and size() 
separately.
+    private static final long ITEMS_OFFSET;
+
+    static {
+      try {
+        Field itemsField = ArrayBlockingQueue.class.getDeclaredField("items");
+        ITEMS_OFFSET = Platform.objectFieldOffset(itemsField);
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     public ArrayBlockingQueueSerializer(Fory fory, Class<ArrayBlockingQueue> 
cls) {
       super(fory, cls, true);
     }
 
+    private static int getCapacity(ArrayBlockingQueue queue) {
+      Object[] items = (Object[]) Platform.getObject(queue, ITEMS_OFFSET);
+      return items.length;
+    }
+
     @Override
     public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer, 
ArrayBlockingQueue value) {
+      // Read capacity before creating snapshot to ensure consistency
+      int capacity = getCapacity(value);
       CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
-      // Write the capacity (remaining capacity + current size = total 
capacity)
-      int capacity = value.remainingCapacity() + value.size();
       buffer.writeVarUint32Small7(capacity);
       return snapshot;
     }
@@ -728,8 +746,7 @@ public class CollectionSerializers {
     @Override
     public Collection newCollection(Collection collection) {
       ArrayBlockingQueue abq = (ArrayBlockingQueue) collection;
-      // Capacity is remaining + current size
-      int capacity = abq.remainingCapacity() + abq.size();
+      int capacity = getCapacity(abq);
       return new ArrayBlockingQueue<>(capacity);
     }
   }
@@ -743,15 +760,32 @@ public class CollectionSerializers {
   public static class LinkedBlockingQueueSerializer
       extends ConcurrentCollectionSerializer<LinkedBlockingQueue> {
 
+    // Use reflection to get the capacity field directly.
+    // This avoids race conditions when reading remainingCapacity() and size() 
separately.
+    private static final long CAPACITY_OFFSET;
+
+    static {
+      try {
+        Field capacityField = 
LinkedBlockingQueue.class.getDeclaredField("capacity");
+        CAPACITY_OFFSET = Platform.objectFieldOffset(capacityField);
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     public LinkedBlockingQueueSerializer(Fory fory, Class<LinkedBlockingQueue> 
cls) {
       super(fory, cls, true);
     }
 
+    private static int getCapacity(LinkedBlockingQueue queue) {
+      return Platform.getInt(queue, CAPACITY_OFFSET);
+    }
+
     @Override
     public CollectionSnapshot onCollectionWrite(MemoryBuffer buffer, 
LinkedBlockingQueue value) {
+      // Read capacity before creating snapshot to ensure consistency
+      int capacity = getCapacity(value);
       CollectionSnapshot snapshot = super.onCollectionWrite(buffer, value);
-      // Write the capacity (remaining capacity + current size = total 
capacity)
-      int capacity = value.remainingCapacity() + value.size();
       buffer.writeVarUint32Small7(capacity);
       return snapshot;
     }
@@ -769,8 +803,7 @@ public class CollectionSerializers {
     @Override
     public Collection newCollection(Collection collection) {
       LinkedBlockingQueue lbq = (LinkedBlockingQueue) collection;
-      // Capacity is remaining + current size
-      int capacity = lbq.remainingCapacity() + lbq.size();
+      int capacity = getCapacity(lbq);
       return new LinkedBlockingQueue<>(capacity);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to