This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch fix/jdk25-restore-volatile-reads
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 07f6e3ba488fbde04242ee7f0a91378ec99bac7e
Author: He-Pin <[email protected]>
AuthorDate: Fri May 29 19:00:22 2026 +0800

    fix: restore volatile reads/writes downgraded by the VarHandle migration
    
    Motivation:
    The migration from sun.misc.Unsafe to VarHandle (#1990, and #1894 for
    Mailbox) replaced the producer writes correctly (compareAndSwapObject
    -> compareAndSet, putOrderedObject -> setRelease) but silently
    downgraded the corresponding reads and a couple of writes from
    volatile to plain access: `Unsafe.getObjectVolatile`/`getIntVolatile`/
    `getLongVolatile` became `VarHandle.get` (plain) and two
    `putObjectVolatile`/`putIntVolatile` became `VarHandle.set` (plain).
    
    `VarHandle.get`/`set` have plain memory semantics even when the field
    is declared `volatile`, so these accesses lost their happens-before
    guarantees with the concurrent compareAndSet/setRelease publications.
    On weakly-ordered hardware (AArch64) a reader can observe a stale value,
    and inside tight loops the plain read may be hoisted by the JIT. (On
    x86-64 plain loads happen to have acquire semantics, which is why this
    mostly went unnoticed.)
    
    Modification — restore the original ordered semantics on every
    downgraded access:
    - Mailbox: `currentStatus`/`setStatus` and `systemQueueGet` (status and
      system-message queue head).
    - ActorCell: `mailbox` (Dispatch), `childrenRefs`/`functionRefs` reads
      and the `setTerminated` write (Children).
    - RepointableActorRef: `underlying`/`lookup` cell reads.
    - CircuitBreaker: `currentState`/`currentResetTimeout` reads.
    - PromiseActorRef (AskSupport): `state`/`watchedBy` reads and the
      `setState` write.
    - MessageDispatcher: `inhabitants`/`shutdownSchedule` reads.
    - Artery Association: `associationState` read.
    
    CAS-published fields use `getVolatile`/`setVolatile` (restoring the
    exact getObjectVolatile/putObjectVolatile semantics). For a load,
    `getVolatile` compiles to the same instruction as a plain load on
    x86-64 (MOV) and to LDAR on AArch64 — i.e. exactly what the original
    Unsafe code emitted — so this restores the prior semantics at no extra
    cost on the read side. The two writes restored to `setVolatile` carry a
    StoreLoad fence as they did before the migration.
    
    The node-queue spin reads (AbstractNodeQueue/AbstractBoundedNodeQueue)
    are addressed separately, as they pair with release stores and use
    getAcquire.
    
    Result:
    All concurrently-accessed fields that were volatile before the
    Unsafe->VarHandle migration are volatile again, closing latent
    visibility races (most importantly the mailbox status / system-message
    queue and the actor cell/mailbox references). Method signatures are
    unchanged, so there is no binary-compatibility impact.
    
    References: https://github.com/apache/pekko/issues/2870
---
 .../scala/org/apache/pekko/actor/RepointableActorRef.scala  |  7 +++++--
 .../scala/org/apache/pekko/actor/dungeon/Children.scala     | 12 +++++++++---
 .../scala/org/apache/pekko/actor/dungeon/Dispatch.scala     |  5 ++++-
 .../org/apache/pekko/dispatch/AbstractDispatcher.scala      |  6 ++++--
 .../src/main/scala/org/apache/pekko/dispatch/Mailbox.scala  | 13 ++++++++++---
 .../main/scala/org/apache/pekko/pattern/AskSupport.scala    | 10 +++++++---
 .../scala/org/apache/pekko/pattern/CircuitBreaker.scala     |  7 +++++--
 .../scala/org/apache/pekko/remote/artery/Association.scala  |  4 +++-
 8 files changed, 47 insertions(+), 17 deletions(-)

diff --git 
a/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala 
b/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
index 433e7492ac..f9d300d9b7 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
@@ -64,8 +64,11 @@ private[pekko] class RepointableActorRef(
     _lookupDoNotCallMeDirectly
   }
 
-  def underlying: Cell = cellHandle.get(this)
-  def lookup: Cell = lookupHandle.get(this)
+  // volatile reads: cell/lookup are published across threads via 
compareAndSet in
+  // swapCell/swapLookup; restores the getObjectVolatile semantics these had 
before
+  // the VarHandle migration
+  def underlying: Cell = cellHandle.getVolatile(this)
+  def lookup: Cell = lookupHandle.getVolatile(this)
 
   @tailrec
   final def swapCell(next: Cell): Cell = {
diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala 
b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
index b450ce5fe6..d6845ecf4e 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
@@ -39,7 +39,9 @@ private[pekko] trait Children { this: ActorCell =>
   private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = 
EmptyChildrenContainer
 
   def childrenRefs: ChildrenContainer =
-    AbstractActorCell.childrenHandle.get(this)
+    // volatile read: children is published across threads via 
compareAndSet/setVolatile;
+    // restores the getObjectVolatile semantics this had before the VarHandle 
migration
+    AbstractActorCell.childrenHandle.getVolatile(this)
 
   final def children: immutable.Iterable[ActorRef] = childrenRefs.children
   final def getChildren(): java.lang.Iterable[ActorRef] = {
@@ -65,7 +67,8 @@ private[pekko] trait Children { this: ActorCell =>
 
   @nowarn @volatile private var _functionRefsDoNotCallMeDirectly = 
immutable.Map.empty[String, FunctionRef]
   private def functionRefs: Map[String, FunctionRef] =
-    AbstractActorCell.functionRefsHandle.get(this)
+    // volatile read: published across threads via compareAndSet in 
addFunctionRef/removeFunctionRef
+    AbstractActorCell.functionRefsHandle.getVolatile(this)
 
   private[pekko] def getFunctionRefOrNobody(name: String, uid: Int = 
ActorCell.undefinedUid): InternalActorRef =
     functionRefs.getOrElse(name, Children.GetNobody()) match {
@@ -184,7 +187,10 @@ private[pekko] trait Children { this: ActorCell =>
   }
 
   final protected def setTerminated(): Unit =
-    AbstractActorCell.childrenHandle.set(this, TerminatedChildrenContainer)
+    // volatile write: this publishes the terminal container to concurrent 
readers of
+    // childrenRefs; restores the putObjectVolatile semantics this had before 
the
+    // VarHandle migration (a plain set would not be ordered against those 
reads)
+    AbstractActorCell.childrenHandle.setVolatile(this, 
TerminatedChildrenContainer)
 
   /*
    * ActorCell-internal API
diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala 
b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
index 99ef72718d..cfdd058062 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
@@ -51,7 +51,10 @@ private[pekko] trait Dispatch { this: ActorCell =>
   }
 
   final def mailbox: Mailbox =
-    AbstractActorCell.mailboxHandle.get(this)
+    // volatile read: the mailbox is published across threads via the 
compareAndSet in
+    // swapMailbox; a plain VarHandle.get could observe a stale mailbox 
(restores the
+    // getObjectVolatile semantics this had before the VarHandle migration)
+    AbstractActorCell.mailboxHandle.getVolatile(this)
 
   @tailrec
   final def swapMailbox(newMailbox: Mailbox): Mailbox = {
diff --git 
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index 142ac836fe..a0ed462f10 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -136,10 +136,12 @@ abstract class MessageDispatcher(val configurator: 
MessageDispatcherConfigurator
     ret
   }
 
-  final def inhabitants: Long = inhabitantsHandle.get(this)
+  // volatile read: published across threads via getAndAdd in addInhabitants
+  final def inhabitants: Long = inhabitantsHandle.getVolatile(this)
 
   private final def shutdownSchedule: Int =
-    shutdownScheduleHandle.get(this)
+    // volatile read: published across threads via compareAndSet in 
updateShutdownSchedule
+    shutdownScheduleHandle.getVolatile(this)
   private final def updateShutdownSchedule(expect: Int, update: Int): Boolean =
     shutdownScheduleHandle.compareAndSet(this, expect, update)
 
diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala 
b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
index 8114dd3cda..6dc048430e 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
@@ -121,7 +121,10 @@ private[pekko] abstract class Mailbox(val messageQueue: 
MessageQueue)
   @volatile
   protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ // null by 
default
 
-  final def currentStatus: Mailbox.Status = 
AbstractMailbox.mailboxStatusHandle.get(this)
+  // volatile read: the status is published across threads via 
compareAndSet/setVolatile
+  // below; a plain VarHandle.get could observe a stale status (e.g. miss a 
Scheduled or
+  // Closed transition). Restores the getIntVolatile semantics from before the 
VarHandle migration.
+  final def currentStatus: Mailbox.Status = 
AbstractMailbox.mailboxStatusHandle.getVolatile(this)
 
   final def shouldProcessMessage: Boolean = (currentStatus & 
shouldNotProcessMask) == 0
 
@@ -137,7 +140,9 @@ private[pekko] abstract class Mailbox(val messageQueue: 
MessageQueue)
     AbstractMailbox.mailboxStatusHandle.compareAndSet(this, oldStatus, 
newStatus)
 
   protected final def setStatus(newStatus: Status): Unit =
-    AbstractMailbox.mailboxStatusHandle.set(this, newStatus)
+    // volatile write: ordered against the concurrent reads in currentStatus; 
restores the
+    // putIntVolatile semantics from before the VarHandle migration
+    AbstractMailbox.mailboxStatusHandle.setVolatile(this, newStatus)
 
   /**
    * Reduce the suspend count by one. Caller does not need to worry about 
whether
@@ -209,7 +214,9 @@ private[pekko] abstract class Mailbox(val messageQueue: 
MessageQueue)
   protected final def systemQueueGet: LatestFirstSystemMessageList =
     // Note: contrary how it looks, there is no allocation here, as 
SystemMessageList is a value class and as such
     // it just exists as a typed view during compile-time. The actual return 
type is still SystemMessage.
-    new 
LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.get(this))
+    // volatile read: the system message queue head is published across 
threads via compareAndSet
+    // in systemQueuePut; restores the getObjectVolatile semantics from before 
the VarHandle migration.
+    new 
LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.getVolatile(this))
 
   protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: 
LatestFirstSystemMessageList): Boolean =
     (_old.head eq _new.head) ||
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala 
b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
index 98c717dcfe..6de334f1f4 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
@@ -547,7 +547,8 @@ private[pekko] final class PromiseActorRef(
     _watchedByDoNotCallMeDirectly
   }
 
-  private[this] def watchedBy: Set[ActorRef] = watchedByHandle.get(this)
+  // volatile read: published across threads via compareAndSet in 
updateWatchedBy
+  private[this] def watchedBy: Set[ActorRef] = 
watchedByHandle.getVolatile(this)
 
   private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: 
Set[ActorRef]): Boolean =
     watchedByHandle.compareAndSet(this, oldWatchedBy, newWatchedBy)
@@ -570,12 +571,15 @@ private[pekko] final class PromiseActorRef(
     case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
   }
 
-  private[this] def state: AnyRef = stateHandle.get(this)
+  // volatile read: published across threads via compareAndSet/setVolatile 
below
+  private[this] def state: AnyRef = stateHandle.getVolatile(this)
 
   private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
     stateHandle.compareAndSet(this, oldState, newState)
 
-  private[this] def setState(newState: AnyRef): Unit = stateHandle.set(this, 
newState)
+  // volatile write: ordered against the concurrent reads in state; restores 
the
+  // putObjectVolatile semantics this had before the VarHandle migration
+  private[this] def setState(newState: AnyRef): Unit = 
stateHandle.setVolatile(this, newState)
 
   override def getParent: InternalActorRef = provider.tempContainer
 
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala 
b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
index 75d4a0feca..25c296f06f 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
@@ -291,7 +291,9 @@ class CircuitBreaker(
    * @return Reference to current state
    */
   private[this] def currentState: State =
-    AbstractCircuitBreaker.stateHandle.get(this)
+    // volatile read: state is published across threads via compareAndSet in 
swapState;
+    // restores the getObjectVolatile semantics this had before the VarHandle 
migration
+    AbstractCircuitBreaker.stateHandle.getVolatile(this)
 
   /**
    * Helper method for updating the underlying resetTimeout via VarHandle
@@ -303,7 +305,8 @@ class CircuitBreaker(
    * Helper method for accessing to the underlying resetTimeout via VarHandle
    */
   private[this] def currentResetTimeout: FiniteDuration =
-    AbstractCircuitBreaker.resetTimeoutHandle.get(this)
+    // volatile read: see currentState; published via compareAndSet in 
swapResetTimeout
+    AbstractCircuitBreaker.resetTimeoutHandle.getVolatile(this)
 
   /**
    * Wraps invocations of asynchronous calls that need to be protected.
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala 
b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
index 0df2d90350..d2e543414d 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
@@ -274,7 +274,9 @@ private[remote] class Association(
    * @return Reference to current shared state
    */
   def associationState: AssociationState =
-    AbstractAssociation.sharedStateHandle.get(this)
+    // volatile read: published across threads via compareAndSet in swapState; 
restores the
+    // getObjectVolatile semantics this had before the VarHandle migration
+    AbstractAssociation.sharedStateHandle.getVolatile(this)
 
   def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit 
= {
     val current = associationState


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to