This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 6a52d0cfa1 fix: restore acquire semantics for AbstractNodeQueue node
reads (#3007)
6a52d0cfa1 is described below
commit 6a52d0cfa19d5988fcd567bea088931becfb05b1
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 19:02:46 2026 +0800
fix: restore acquire semantics for AbstractNodeQueue node reads (#3007)
* fix: restore acquire semantics for AbstractNodeQueue node reads
Motivation:
JDK 25 nightly stream tests hang for the full test timeout (the
recurring failures behind #2573 / #2870). A local reproduction (the
full stream-tests run with the nightly virtualize=on + timefactor=4
options on JDK 25) pins it down: one
`...-pekko.test.stream-dispatcher-CarrierThread-N` consumes ~97% CPU
(cpu time approximately equal to elapsed time) stuck in
`AbstractNodeQueue.pollNode`, while every other carrier is idle in
`ForkJoinPool.awaitWork` and a full virtual-thread dump shows no
producer thread anywhere. The spinning consumer is a virtual thread,
so the unbounded CPU spin pins its carrier permanently; the stream
never progresses and the test's `futureValue` never completes. Every
affected test passes in isolation (~100ms) even with the full nightly
JVM options, because the hang only appears under sustained load: it is
a JIT-state-dependent data race.
Root cause: PR #1990 (avoid sun.misc.Unsafe by using VarHandles)
mapped the producer writes correctly (`Unsafe.putOrderedObject` ->
`VarHandle.setRelease`) but downgraded every consumer read from
`Unsafe.getObjectVolatile` (a volatile load) to `VarHandle.get` — a
plain load, since `VarHandle.get` has plain semantics even when the
field is declared `volatile`. A plain read is not ordered against the
producer's release store, so it establishes no happens-before with the
published node, and inside the busy-spin loops in `peekNode`/`pollNode`
(`do { next = tail.next(); } while (next == null);`) the JIT may hoist
the plain load out of the loop, producing an unbounded spin that never
observes the linked next node. JDK 25's C2 makes this manifest
reliably, and virtual-thread carriers turn the transient spin into a
permanent 100% CPU pin.
Modification:
- `Node.next()` and the four `tailHandle` reads (`peekNode`,
`pollNode`, `isEmpty`, `count`) now use `getAcquire`, pairing with
the existing `setRelease` writes. This re-establishes the
happens-before and prevents the JIT from hoisting the loads out of
the spin loops. `getAcquire` (not `getVolatile`) is used because the
release/acquire pairing is exactly what is required here; for a load
it compiles to the same instruction as a volatile load on x86-64
(MOV) and AArch64 (LDAR), so the stronger sequential consistency
would add no value.
- Added `Thread.onSpinWait()` to both busy-spin loops: it hints the CPU
that we are busy-waiting, reducing spin power and pipeline cost and
yielding the core to an SMT sibling (which may be the producer
linking the next node).
Method signatures are unchanged, so there is no binary-compatibility
impact.
Result:
With the fix, the previously hanging
`HubSpec "work with long streams if one of the producers is slower"`
completes in ~2.7s (was stuck for the full timeout) and the full
stream-tests run proceeds past the point where it previously hung
(1800+ tests, no hang) under the same nightly virtualize=on +
timefactor=4 JVM options on JDK 25.
References: https://github.com/apache/pekko/issues/2870
* fix: restore acquire/volatile semantics for AbstractBoundedNodeQueue
Motivation:
AbstractBoundedNodeQueue is the bounded sibling of AbstractNodeQueue
and was downgraded by the same VarHandle migration (#1990): the
consumer reads `Node.next()`, `getEnq()` and `getDeq()` became plain
`VarHandle.get` loads, where the Unsafe original used
`getObjectVolatile`. A plain read carries no ordering even on a
volatile field, so it establishes no happens-before with the
producer's `setNext` (release) / `casEnq` publication and may be
hoisted out of the `for(;;)` spin loops in `peekNode`/`pollNode`,
producing an unbounded spin that never observes the linked node — the
same failure mode that hangs AbstractNodeQueue on JDK 25 virtual-thread
carriers.
Modification:
- `Node.next()` now uses `getAcquire`, pairing with the `setRelease`
write.
- `getEnq()`/`getDeq()` now use `getVolatile`, restoring the original
`getObjectVolatile` semantics for these CAS-published fields.
- Added `Thread.onSpinWait()` to the `peekNode` busy-wait.
For a load, `getAcquire`/`getVolatile` compile to the same instruction
as a volatile load on x86-64 (MOV) and AArch64 (LDAR), so this restores
the pre-#1990 memory semantics at no additional cost. Method signatures
are unchanged, so there is no binary-compatibility impact.
Result:
The bounded queue can no longer spin forever on a node that a producer
has appended but whose `next` link is not yet visible to the consumer.
References: https://github.com/apache/pekko/issues/2870
---
.../pekko/dispatch/AbstractBoundedNodeQueue.java | 19 +++++++++++++---
.../apache/pekko/dispatch/AbstractNodeQueue.java | 26 ++++++++++++++++------
2 files changed, 35 insertions(+), 10 deletions(-)
diff --git
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
index 361fa052e4..62f1ebdd13 100644
---
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
+++
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
@@ -50,7 +50,10 @@ public abstract class AbstractBoundedNodeQueue<T> {
}
private Node<T> getEnq() {
- return (Node<T>) enqHandle.get(this);
+ // Volatile read: this field is published across threads via casEnq; a
plain read
+ // (VarHandle.get) carries no ordering even on a volatile field and
may observe a
+ // stale node or be hoisted out of the spin loops in peekNode/pollNode.
+ return (Node<T>) enqHandle.getVolatile(this);
}
private boolean casEnq(Node<T> old, Node<T> nju) {
@@ -62,7 +65,8 @@ public abstract class AbstractBoundedNodeQueue<T> {
}
private Node<T> getDeq() {
- return (Node<T>) deqHandle.get(this);
+ // Volatile read: see getEnq. Published across threads via casDeq.
+ return (Node<T>) deqHandle.getVolatile(this);
}
private boolean casDeq(Node<T> old, Node<T> nju) {
@@ -75,6 +79,9 @@ public abstract class AbstractBoundedNodeQueue<T> {
final Node<T> next = deq.next();
if (next != null || getEnq() == deq)
return next;
+ // spinning until the producer that already advanced enq finishes
linking next;
+ // onSpinWait reduces busy-wait power/pipeline cost and yields to an
SMT sibling.
+ Thread.onSpinWait();
}
}
@@ -204,7 +211,13 @@ public abstract class AbstractBoundedNodeQueue<T> {
@SuppressWarnings("unchecked")
public final Node<T> next() {
- return (Node<T>) nextHandle.get(this);
+ // Acquire load to pair with the release store in setNext: it
establishes the
+ // happens-before that publishes the node and prevents the read
from being hoisted
+ // out of the spin loops in peekNode/pollNode (a plain
VarHandle.get carries no
+ // ordering even on a volatile field and may be hoisted, yielding
an endless spin).
+ // getAcquire is sufficient here (release/acquire pairing) and is
no costlier than
+ // getVolatile for a load, so the stronger sequential consistency
is not needed.
+ return (Node<T>) nextHandle.getAcquire(this);
}
protected final void setNext(final Node<T> newNext) {
diff --git
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
index ac8921e155..61d95204bd 100644
--- a/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
+++ b/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
@@ -54,12 +54,15 @@ public abstract class AbstractNodeQueue<T> extends
AtomicReference<AbstractNodeQ
*/
@SuppressWarnings("unchecked")
protected final Node<T> peekNode() {
- final Node<T> tail = (Node<T>) tailHandle.get(this);
+ final Node<T> tail = (Node<T>) tailHandle.getAcquire(this);
Node<T> next = tail.next();
if (next == null && get() != tail) {
// if tail != head this is not going to change until producer
makes progress
- // we can avoid reading the head and just spin on next until it
shows up
+ // we can avoid reading the head and just spin on next until it
shows up.
+ // onSpinWait hints the CPU we are busy-waiting: it cuts spin
power/pipeline cost
+ // and yields the core to an SMT sibling (which may be the
producer linking next).
do {
+ Thread.onSpinWait();
next = tail.next();
} while (next == null);
}
@@ -110,7 +113,7 @@ public abstract class AbstractNodeQueue<T> extends
AtomicReference<AbstractNodeQ
* @return true if queue was empty at some point in the past
*/
public final boolean isEmpty() {
- return tailHandle.get(this) == get();
+ return tailHandle.getAcquire(this) == get();
}
/**
@@ -126,7 +129,7 @@ public abstract class AbstractNodeQueue<T> extends
AtomicReference<AbstractNodeQ
public final int count() {
int count = 0;
final Node<T> head = get();
- for(Node<T> n = ((Node<T>) tailHandle.get(this)).next();
+ for(Node<T> n = ((Node<T>) tailHandle.getAcquire(this)).next();
n != null && count < Integer.MAX_VALUE;
n = n.next()) {
++count;
@@ -162,12 +165,15 @@ public abstract class AbstractNodeQueue<T> extends
AtomicReference<AbstractNodeQ
*/
@SuppressWarnings("unchecked")
public final Node<T> pollNode() {
- final Node<T> tail = (Node<T>) tailHandle.get(this);
+ final Node<T> tail = (Node<T>) tailHandle.getAcquire(this);
Node<T> next = tail.next();
if (next == null && get() != tail) {
// if tail != head this is not going to change until producer makes
progress
- // we can avoid reading the head and just spin on next until it
shows up
+ // we can avoid reading the head and just spin on next until it
shows up.
+ // onSpinWait hints the CPU we are busy-waiting: it cuts spin
power/pipeline cost
+ // and yields the core to an SMT sibling (which may be the producer
linking next).
do {
+ Thread.onSpinWait();
next = tail.next();
} while (next == null);
}
@@ -208,7 +214,13 @@ public abstract class AbstractNodeQueue<T> extends
AtomicReference<AbstractNodeQ
}
public final Node<T> next() {
- return (Node<T>) nextHandle.get(this);
+ // Acquire load to pair with the release store in setNext: it
establishes the
+ // happens-before that publishes the node and prevents the read
from being hoisted
+ // out of the spin loops in peekNode/pollNode (a plain
VarHandle.get carries no
+ // ordering even on a volatile field and may be hoisted, yielding
an endless spin).
+ // getAcquire is sufficient here (release/acquire pairing) and is
no costlier than
+ // getVolatile for a load, so the stronger sequential consistency
is not needed.
+ return (Node<T>) nextHandle.getAcquire(this);
}
protected final void setNext(final Node<T> newNext) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]