This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch fix/jdk25-nodequeue-acquire-spin
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 2c942654414dd878a53f4a9039ff9b128d6cf509
Author: He-Pin <[email protected]>
AuthorDate: Fri May 29 17:52:18 2026 +0800

    fix: restore acquire semantics for AbstractNodeQueue node reads
    
    Motivation:
    JDK 25 nightly stream tests hang for the full test timeout. A local
    reproduction (full stream-tests with the nightly virtualize=on +
    timefactor=4 options on JDK 25) shows one
    `...-pekko.test.stream-dispatcher-CarrierThread-N` consuming ~97% CPU
    (cpu time approximately equal to elapsed time) stuck in
    `AbstractNodeQueue.pollNode`, while every other carrier is idle in
    `ForkJoinPool.awaitWork` and a full virtual-thread dump shows no
    producer thread anywhere. The spinning consumer is a virtual thread,
    so the unbounded CPU spin pins its carrier permanently; the stream
    never progresses and the test's `futureValue` never completes.
    
    Root cause: PR #1990 (avoid sun.misc.Unsafe by using VarHandles)
    mapped the producer writes correctly (`Unsafe.putOrderedObject` ->
    `VarHandle.setRelease`) but downgraded every consumer read from
    `Unsafe.getObjectVolatile` (a volatile/acquire load) to
    `VarHandle.get` (a plain load — `VarHandle.get` has plain semantics
    even when the field is declared `volatile`). A plain read is not
    ordered against the producer's release store, so it establishes no
    happens-before with the published node, and inside the busy-spin loops
    in `peekNode`/`pollNode` (`do { next = tail.next(); } while (next ==
    null);`) the JIT may hoist the plain load out of the loop, producing
    an unbounded spin that never observes the linked next node. JDK 25's
    C2 makes this manifest reliably, and virtual-thread carriers turn the
    transient spin into a permanent 100% CPU pin.
    
    Modification:
    - `Node.next()` and the four `tailHandle` reads (`peekNode`,
      `pollNode`, `isEmpty`, `count`) now use `getAcquire`, restoring the
      volatile-read semantics the code had before #1990 and pairing with
      the existing `setRelease` writes. This re-establishes the
      happens-before and prevents the JIT from hoisting the loads out of
      the spin loops.
    - Added `Thread.onSpinWait()` to both busy-spin loops as standard
      spin-wait hygiene.
    
    Performance: this restores the pre-#1990 memory semantics rather than
    adding new cost. Acquire loads compile to a plain `MOV` on x86-64
    (all x86 loads already have acquire semantics) and to a single `LDAR`
    on AArch64 — exactly what `getObjectVolatile` emitted before #1990.
    The net effect versus the original Unsafe-based design is zero on
    x86-64 and negligible on AArch64; it only removes the broken plain-read
    micro-optimization the VarHandle migration introduced. Method
    signatures are unchanged, so there is no binary-compatibility impact.
    
    Result:
    With the fix, the previously hanging
    `HubSpec "work with long streams if one of the producers is slower"`
    completes in ~2.7s (was stuck for the full timeout) and the full
    stream-tests run proceeds past the point where it previously hung,
    under the same nightly virtualize=on + timefactor=4 JVM options on
    JDK 25.
    
    References: https://github.com/apache/pekko/issues/2870
---
 .../org/apache/pekko/dispatch/AbstractNodeQueue.java  | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java 
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
index ac8921e155..0096d14704 100644
--- a/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
+++ b/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
@@ -54,12 +54,13 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
      */
     @SuppressWarnings("unchecked")
     protected final Node<T> peekNode() {
-        final Node<T> tail = (Node<T>) tailHandle.get(this);
+        final Node<T> tail = (Node<T>) tailHandle.getAcquire(this);
         Node<T> next = tail.next();
         if (next == null && get() != tail) {
             // if tail != head this is not going to change until producer 
makes progress
             // we can avoid reading the head and just spin on next until it 
shows up
             do {
+                Thread.onSpinWait();
                 next = tail.next();
             } while (next == null);
         }
@@ -110,7 +111,7 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
      * @return true if queue was empty at some point in the past
      */
     public final boolean isEmpty() {
-        return tailHandle.get(this) == get();
+        return tailHandle.getAcquire(this) == get();
     }
 
     /**
@@ -126,7 +127,7 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
     public final int count() {
         int count = 0;
         final Node<T> head = get();
-        for(Node<T> n = ((Node<T>) tailHandle.get(this)).next();
+        for(Node<T> n = ((Node<T>) tailHandle.getAcquire(this)).next();
             n != null && count < Integer.MAX_VALUE; 
             n = n.next()) {
           ++count;
@@ -162,12 +163,13 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
      */
     @SuppressWarnings("unchecked")
     public final Node<T> pollNode() {
-      final Node<T> tail = (Node<T>) tailHandle.get(this);
+      final Node<T> tail = (Node<T>) tailHandle.getAcquire(this);
       Node<T> next = tail.next();
       if (next == null && get() != tail) {
           // if tail != head this is not going to change until producer makes 
progress
           // we can avoid reading the head and just spin on next until it 
shows up
           do {
+              Thread.onSpinWait();
               next = tail.next();
           } while (next == null);
       }
@@ -208,7 +210,14 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
         }
 
         public final Node<T> next() {
-            return (Node<T>) nextHandle.get(this);
+            // Acquire load to pair with the release store in setNext. A plain 
read here
+            // (VarHandle.get has plain semantics even though the field is 
volatile) is not
+            // ordered against the producer's setRelease, so it establishes no 
happens-before
+            // with the published node and, inside the busy-spin loops in 
peekNode/pollNode,
+            // can be hoisted out of the loop by the JIT, producing an 
unbounded spin that
+            // never observes the linked next node. This was observed on JDK 
25 where such a
+            // spin pinned a virtual-thread carrier at 100% CPU and stalled 
the stream.
+            return (Node<T>) nextHandle.getAcquire(this);
         }
 
         protected final void setNext(final Node<T> newNext) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to