This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix/jdk25-restore-volatile-reads in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 07f6e3ba488fbde04242ee7f0a91378ec99bac7e Author: He-Pin <[email protected]> AuthorDate: Fri May 29 19:00:22 2026 +0800 fix: restore volatile reads/writes downgraded by the VarHandle migration Motivation: The migration from sun.misc.Unsafe to VarHandle (#1990, and #1894 for Mailbox) replaced the producer writes correctly (compareAndSwapObject -> compareAndSet, putOrderedObject -> setRelease) but silently downgraded the corresponding reads and a couple of writes from volatile to plain access: `Unsafe.getObjectVolatile`/`getIntVolatile`/ `getLongVolatile` became `VarHandle.get` (plain) and two `putObjectVolatile`/`putIntVolatile` became `VarHandle.set` (plain). `VarHandle.get`/`set` have plain memory semantics even when the field is declared `volatile`, so these accesses lost their happens-before guarantees with the concurrent compareAndSet/setRelease publications. On weakly-ordered hardware (AArch64) a reader can observe a stale value, and inside tight loops the plain read may be hoisted by the JIT. (On x86-64 plain loads happen to have acquire semantics, which is why this mostly went unnoticed.) Modification — restore the original ordered semantics on every downgraded access: - Mailbox: `currentStatus`/`setStatus` and `systemQueueGet` (status and system-message queue head). - ActorCell: `mailbox` (Dispatch), `childrenRefs`/`functionRefs` reads and the `setTerminated` write (Children). - RepointableActorRef: `underlying`/`lookup` cell reads. - CircuitBreaker: `currentState`/`currentResetTimeout` reads. - PromiseActorRef (AskSupport): `state`/`watchedBy` reads and the `setState` write. - MessageDispatcher: `inhabitants`/`shutdownSchedule` reads. - Artery Association: `associationState` read. CAS-published fields use `getVolatile`/`setVolatile` (restoring the exact getObjectVolatile/putObjectVolatile semantics). For a load, `getVolatile` compiles to the same instruction as a plain load on x86-64 (MOV) and to LDAR on AArch64 — i.e. exactly what the original Unsafe code emitted — so this restores the prior semantics at no extra cost on the read side. The two writes restored to `setVolatile` carry a StoreLoad fence as they did before the migration. The node-queue spin reads (AbstractNodeQueue/AbstractBoundedNodeQueue) are addressed separately, as they pair with release stores and use getAcquire. Result: All concurrently-accessed fields that were volatile before the Unsafe->VarHandle migration are volatile again, closing latent visibility races (most importantly the mailbox status / system-message queue and the actor cell/mailbox references). Method signatures are unchanged, so there is no binary-compatibility impact. References: https://github.com/apache/pekko/issues/2870 --- .../scala/org/apache/pekko/actor/RepointableActorRef.scala | 7 +++++-- .../scala/org/apache/pekko/actor/dungeon/Children.scala | 12 +++++++++--- .../scala/org/apache/pekko/actor/dungeon/Dispatch.scala | 5 ++++- .../org/apache/pekko/dispatch/AbstractDispatcher.scala | 6 ++++-- .../src/main/scala/org/apache/pekko/dispatch/Mailbox.scala | 13 ++++++++++--- .../main/scala/org/apache/pekko/pattern/AskSupport.scala | 10 +++++++--- .../scala/org/apache/pekko/pattern/CircuitBreaker.scala | 7 +++++-- .../scala/org/apache/pekko/remote/artery/Association.scala | 4 +++- 8 files changed, 47 insertions(+), 17 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala b/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala index 433e7492ac..f9d300d9b7 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala @@ -64,8 +64,11 @@ private[pekko] class RepointableActorRef( _lookupDoNotCallMeDirectly } - def underlying: Cell = cellHandle.get(this) - def lookup: Cell = lookupHandle.get(this) + // volatile reads: cell/lookup are published across threads via compareAndSet in + // swapCell/swapLookup; restores the getObjectVolatile semantics these had before + // the VarHandle migration + def underlying: Cell = cellHandle.getVolatile(this) + def lookup: Cell = lookupHandle.getVolatile(this) @tailrec final def swapCell(next: Cell): Cell = { diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala index b450ce5fe6..d6845ecf4e 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala @@ -39,7 +39,9 @@ private[pekko] trait Children { this: ActorCell => private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer def childrenRefs: ChildrenContainer = - AbstractActorCell.childrenHandle.get(this) + // volatile read: children is published across threads via compareAndSet/setVolatile; + // restores the getObjectVolatile semantics this had before the VarHandle migration + AbstractActorCell.childrenHandle.getVolatile(this) final def children: immutable.Iterable[ActorRef] = childrenRefs.children final def getChildren(): java.lang.Iterable[ActorRef] = { @@ -65,7 +67,8 @@ private[pekko] trait Children { this: ActorCell => @nowarn @volatile private var _functionRefsDoNotCallMeDirectly = immutable.Map.empty[String, FunctionRef] private def functionRefs: Map[String, FunctionRef] = - AbstractActorCell.functionRefsHandle.get(this) + // volatile read: published across threads via compareAndSet in addFunctionRef/removeFunctionRef + AbstractActorCell.functionRefsHandle.getVolatile(this) private[pekko] def getFunctionRefOrNobody(name: String, uid: Int = ActorCell.undefinedUid): InternalActorRef = functionRefs.getOrElse(name, Children.GetNobody()) match { @@ -184,7 +187,10 @@ private[pekko] trait Children { this: ActorCell => } final protected def setTerminated(): Unit = - AbstractActorCell.childrenHandle.set(this, TerminatedChildrenContainer) + // volatile write: this publishes the terminal container to concurrent readers of + // childrenRefs; restores the putObjectVolatile semantics this had before the + // VarHandle migration (a plain set would not be ordered against those reads) + AbstractActorCell.childrenHandle.setVolatile(this, TerminatedChildrenContainer) /* * ActorCell-internal API diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala index 99ef72718d..cfdd058062 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala @@ -51,7 +51,10 @@ private[pekko] trait Dispatch { this: ActorCell => } final def mailbox: Mailbox = - AbstractActorCell.mailboxHandle.get(this) + // volatile read: the mailbox is published across threads via the compareAndSet in + // swapMailbox; a plain VarHandle.get could observe a stale mailbox (restores the + // getObjectVolatile semantics this had before the VarHandle migration) + AbstractActorCell.mailboxHandle.getVolatile(this) @tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = { diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 142ac836fe..a0ed462f10 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -136,10 +136,12 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator ret } - final def inhabitants: Long = inhabitantsHandle.get(this) + // volatile read: published across threads via getAndAdd in addInhabitants + final def inhabitants: Long = inhabitantsHandle.getVolatile(this) private final def shutdownSchedule: Int = - shutdownScheduleHandle.get(this) + // volatile read: published across threads via compareAndSet in updateShutdownSchedule + shutdownScheduleHandle.getVolatile(this) private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = shutdownScheduleHandle.compareAndSet(this, expect, update) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala index 8114dd3cda..6dc048430e 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala @@ -121,7 +121,10 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) @volatile protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ // null by default - final def currentStatus: Mailbox.Status = AbstractMailbox.mailboxStatusHandle.get(this) + // volatile read: the status is published across threads via compareAndSet/setVolatile + // below; a plain VarHandle.get could observe a stale status (e.g. miss a Scheduled or + // Closed transition). Restores the getIntVolatile semantics from before the VarHandle migration. + final def currentStatus: Mailbox.Status = AbstractMailbox.mailboxStatusHandle.getVolatile(this) final def shouldProcessMessage: Boolean = (currentStatus & shouldNotProcessMask) == 0 @@ -137,7 +140,9 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) AbstractMailbox.mailboxStatusHandle.compareAndSet(this, oldStatus, newStatus) protected final def setStatus(newStatus: Status): Unit = - AbstractMailbox.mailboxStatusHandle.set(this, newStatus) + // volatile write: ordered against the concurrent reads in currentStatus; restores the + // putIntVolatile semantics from before the VarHandle migration + AbstractMailbox.mailboxStatusHandle.setVolatile(this, newStatus) /** * Reduce the suspend count by one. Caller does not need to worry about whether @@ -209,7 +214,9 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) protected final def systemQueueGet: LatestFirstSystemMessageList = // Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such // it just exists as a typed view during compile-time. The actual return type is still SystemMessage. - new LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.get(this)) + // volatile read: the system message queue head is published across threads via compareAndSet + // in systemQueuePut; restores the getObjectVolatile semantics from before the VarHandle migration. + new LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.getVolatile(this)) protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: LatestFirstSystemMessageList): Boolean = (_old.head eq _new.head) || diff --git a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala index 98c717dcfe..6de334f1f4 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala @@ -547,7 +547,8 @@ private[pekko] final class PromiseActorRef( _watchedByDoNotCallMeDirectly } - private[this] def watchedBy: Set[ActorRef] = watchedByHandle.get(this) + // volatile read: published across threads via compareAndSet in updateWatchedBy + private[this] def watchedBy: Set[ActorRef] = watchedByHandle.getVolatile(this) private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean = watchedByHandle.compareAndSet(this, oldWatchedBy, newWatchedBy) @@ -570,12 +571,15 @@ private[pekko] final class PromiseActorRef( case other => if (!updateWatchedBy(other, null)) clearWatchers() else other } - private[this] def state: AnyRef = stateHandle.get(this) + // volatile read: published across threads via compareAndSet/setVolatile below + private[this] def state: AnyRef = stateHandle.getVolatile(this) private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean = stateHandle.compareAndSet(this, oldState, newState) - private[this] def setState(newState: AnyRef): Unit = stateHandle.set(this, newState) + // volatile write: ordered against the concurrent reads in state; restores the + // putObjectVolatile semantics this had before the VarHandle migration + private[this] def setState(newState: AnyRef): Unit = stateHandle.setVolatile(this, newState) override def getParent: InternalActorRef = provider.tempContainer diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index 75d4a0feca..25c296f06f 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -291,7 +291,9 @@ class CircuitBreaker( * @return Reference to current state */ private[this] def currentState: State = - AbstractCircuitBreaker.stateHandle.get(this) + // volatile read: state is published across threads via compareAndSet in swapState; + // restores the getObjectVolatile semantics this had before the VarHandle migration + AbstractCircuitBreaker.stateHandle.getVolatile(this) /** * Helper method for updating the underlying resetTimeout via VarHandle @@ -303,7 +305,8 @@ class CircuitBreaker( * Helper method for accessing to the underlying resetTimeout via VarHandle */ private[this] def currentResetTimeout: FiniteDuration = - AbstractCircuitBreaker.resetTimeoutHandle.get(this) + // volatile read: see currentState; published via compareAndSet in swapResetTimeout + AbstractCircuitBreaker.resetTimeoutHandle.getVolatile(this) /** * Wraps invocations of asynchronous calls that need to be protected. diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala index 0df2d90350..d2e543414d 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala @@ -274,7 +274,9 @@ private[remote] class Association( * @return Reference to current shared state */ def associationState: AssociationState = - AbstractAssociation.sharedStateHandle.get(this) + // volatile read: published across threads via compareAndSet in swapState; restores the + // getObjectVolatile semantics this had before the VarHandle migration + AbstractAssociation.sharedStateHandle.getVolatile(this) def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit = { val current = associationState --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
