This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new c1910b2136 fix: restore volatile reads/writes downgraded by the
VarHandle migration (#3008)
c1910b2136 is described below
commit c1910b21367c3ad2ffc8b26685f972140318d953
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 19:39:33 2026 +0800
fix: restore volatile reads/writes downgraded by the VarHandle migration
(#3008)
Motivation:
The migration from sun.misc.Unsafe to VarHandle (#1990, and #1894 for
Mailbox) replaced the producer writes correctly (compareAndSwapObject
-> compareAndSet, putOrderedObject -> setRelease) but silently
downgraded the corresponding reads and a couple of writes from
volatile to plain access: `Unsafe.getObjectVolatile`/`getIntVolatile`/
`getLongVolatile` became `VarHandle.get` (plain) and two
`putObjectVolatile`/`putIntVolatile` became `VarHandle.set` (plain).
`VarHandle.get`/`set` have plain memory semantics even when the field
is declared `volatile`, so these accesses lost their happens-before
guarantees with the concurrent compareAndSet/setRelease publications.
On weakly-ordered hardware (AArch64) a reader can observe a stale value,
and inside tight loops the plain read may be hoisted by the JIT. (On
x86-64 plain loads happen to have acquire semantics, which is why this
mostly went unnoticed.)
Modification — restore the original ordered semantics on every
downgraded access:
- Mailbox: `currentStatus`/`setStatus` and `systemQueueGet` (status and
system-message queue head).
- ActorCell: `mailbox` (Dispatch), `childrenRefs`/`functionRefs` reads
and the `setTerminated` write (Children).
- RepointableActorRef: `underlying`/`lookup` cell reads.
- CircuitBreaker: `currentState`/`currentResetTimeout` reads.
- PromiseActorRef (AskSupport): `state`/`watchedBy` reads and the
`setState` write.
- MessageDispatcher: `inhabitants`/`shutdownSchedule` reads.
- Artery Association: `associationState` read.
CAS-published fields use `getVolatile`/`setVolatile` (restoring the
exact getObjectVolatile/putObjectVolatile semantics). For a load,
`getVolatile` compiles to the same instruction as a plain load on
x86-64 (MOV) and to LDAR on AArch64 — i.e. exactly what the original
Unsafe code emitted — so this restores the prior semantics at no extra
cost on the read side. The two writes restored to `setVolatile` carry a
StoreLoad fence as they did before the migration.
The node-queue spin reads (AbstractNodeQueue/AbstractBoundedNodeQueue)
are addressed separately, as they pair with release stores and use
getAcquire.
Result:
All concurrently-accessed fields that were volatile before the
Unsafe->VarHandle migration are volatile again, closing latent
visibility races (most importantly the mailbox status / system-message
queue and the actor cell/mailbox references). Method signatures are
unchanged, so there is no binary-compatibility impact.
References: https://github.com/apache/pekko/issues/2870
---
.../scala/org/apache/pekko/actor/RepointableActorRef.scala | 7 +++++--
.../scala/org/apache/pekko/actor/dungeon/Children.scala | 12 +++++++++---
.../scala/org/apache/pekko/actor/dungeon/Dispatch.scala | 5 ++++-
.../org/apache/pekko/dispatch/AbstractDispatcher.scala | 6 ++++--
.../src/main/scala/org/apache/pekko/dispatch/Mailbox.scala | 13 ++++++++++---
.../main/scala/org/apache/pekko/pattern/AskSupport.scala | 10 +++++++---
.../scala/org/apache/pekko/pattern/CircuitBreaker.scala | 7 +++++--
.../scala/org/apache/pekko/remote/artery/Association.scala | 4 +++-
8 files changed, 47 insertions(+), 17 deletions(-)
diff --git
a/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
b/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
index 433e7492ac..f9d300d9b7 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/RepointableActorRef.scala
@@ -64,8 +64,11 @@ private[pekko] class RepointableActorRef(
_lookupDoNotCallMeDirectly
}
- def underlying: Cell = cellHandle.get(this)
- def lookup: Cell = lookupHandle.get(this)
+ // volatile reads: cell/lookup are published across threads via
compareAndSet in
+ // swapCell/swapLookup; restores the getObjectVolatile semantics these had
before
+ // the VarHandle migration
+ def underlying: Cell = cellHandle.getVolatile(this)
+ def lookup: Cell = lookupHandle.getVolatile(this)
@tailrec
final def swapCell(next: Cell): Cell = {
diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
index b450ce5fe6..d6845ecf4e 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Children.scala
@@ -39,7 +39,9 @@ private[pekko] trait Children { this: ActorCell =>
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer =
EmptyChildrenContainer
def childrenRefs: ChildrenContainer =
- AbstractActorCell.childrenHandle.get(this)
+ // volatile read: children is published across threads via
compareAndSet/setVolatile;
+ // restores the getObjectVolatile semantics this had before the VarHandle
migration
+ AbstractActorCell.childrenHandle.getVolatile(this)
final def children: immutable.Iterable[ActorRef] = childrenRefs.children
final def getChildren(): java.lang.Iterable[ActorRef] = {
@@ -65,7 +67,8 @@ private[pekko] trait Children { this: ActorCell =>
@nowarn @volatile private var _functionRefsDoNotCallMeDirectly =
immutable.Map.empty[String, FunctionRef]
private def functionRefs: Map[String, FunctionRef] =
- AbstractActorCell.functionRefsHandle.get(this)
+ // volatile read: published across threads via compareAndSet in
addFunctionRef/removeFunctionRef
+ AbstractActorCell.functionRefsHandle.getVolatile(this)
private[pekko] def getFunctionRefOrNobody(name: String, uid: Int =
ActorCell.undefinedUid): InternalActorRef =
functionRefs.getOrElse(name, Children.GetNobody()) match {
@@ -184,7 +187,10 @@ private[pekko] trait Children { this: ActorCell =>
}
final protected def setTerminated(): Unit =
- AbstractActorCell.childrenHandle.set(this, TerminatedChildrenContainer)
+ // volatile write: this publishes the terminal container to concurrent
readers of
+ // childrenRefs; restores the putObjectVolatile semantics this had before
the
+ // VarHandle migration (a plain set would not be ordered against those
reads)
+ AbstractActorCell.childrenHandle.setVolatile(this,
TerminatedChildrenContainer)
/*
* ActorCell-internal API
diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
index 99ef72718d..cfdd058062 100644
--- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
+++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/Dispatch.scala
@@ -51,7 +51,10 @@ private[pekko] trait Dispatch { this: ActorCell =>
}
final def mailbox: Mailbox =
- AbstractActorCell.mailboxHandle.get(this)
+ // volatile read: the mailbox is published across threads via the
compareAndSet in
+ // swapMailbox; a plain VarHandle.get could observe a stale mailbox
(restores the
+ // getObjectVolatile semantics this had before the VarHandle migration)
+ AbstractActorCell.mailboxHandle.getVolatile(this)
@tailrec
final def swapMailbox(newMailbox: Mailbox): Mailbox = {
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
index 142ac836fe..a0ed462f10 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala
@@ -136,10 +136,12 @@ abstract class MessageDispatcher(val configurator:
MessageDispatcherConfigurator
ret
}
- final def inhabitants: Long = inhabitantsHandle.get(this)
+ // volatile read: published across threads via getAndAdd in addInhabitants
+ final def inhabitants: Long = inhabitantsHandle.getVolatile(this)
private final def shutdownSchedule: Int =
- shutdownScheduleHandle.get(this)
+ // volatile read: published across threads via compareAndSet in
updateShutdownSchedule
+ shutdownScheduleHandle.getVolatile(this)
private final def updateShutdownSchedule(expect: Int, update: Int): Boolean =
shutdownScheduleHandle.compareAndSet(this, expect, update)
diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
index 8114dd3cda..6dc048430e 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala
@@ -121,7 +121,10 @@ private[pekko] abstract class Mailbox(val messageQueue:
MessageQueue)
@volatile
protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ // null by
default
- final def currentStatus: Mailbox.Status =
AbstractMailbox.mailboxStatusHandle.get(this)
+ // volatile read: the status is published across threads via
compareAndSet/setVolatile
+ // below; a plain VarHandle.get could observe a stale status (e.g. miss a
Scheduled or
+ // Closed transition). Restores the getIntVolatile semantics from before the
VarHandle migration.
+ final def currentStatus: Mailbox.Status =
AbstractMailbox.mailboxStatusHandle.getVolatile(this)
final def shouldProcessMessage: Boolean = (currentStatus &
shouldNotProcessMask) == 0
@@ -137,7 +140,9 @@ private[pekko] abstract class Mailbox(val messageQueue:
MessageQueue)
AbstractMailbox.mailboxStatusHandle.compareAndSet(this, oldStatus,
newStatus)
protected final def setStatus(newStatus: Status): Unit =
- AbstractMailbox.mailboxStatusHandle.set(this, newStatus)
+ // volatile write: ordered against the concurrent reads in currentStatus;
restores the
+ // putIntVolatile semantics from before the VarHandle migration
+ AbstractMailbox.mailboxStatusHandle.setVolatile(this, newStatus)
/**
* Reduce the suspend count by one. Caller does not need to worry about
whether
@@ -209,7 +214,9 @@ private[pekko] abstract class Mailbox(val messageQueue:
MessageQueue)
protected final def systemQueueGet: LatestFirstSystemMessageList =
// Note: contrary how it looks, there is no allocation here, as
SystemMessageList is a value class and as such
// it just exists as a typed view during compile-time. The actual return
type is still SystemMessage.
- new
LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.get(this))
+ // volatile read: the system message queue head is published across
threads via compareAndSet
+ // in systemQueuePut; restores the getObjectVolatile semantics from before
the VarHandle migration.
+ new
LatestFirstSystemMessageList(AbstractMailbox.systemMessageHandle.getVolatile(this))
protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new:
LatestFirstSystemMessageList): Boolean =
(_old.head eq _new.head) ||
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
index 98c717dcfe..6de334f1f4 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala
@@ -547,7 +547,8 @@ private[pekko] final class PromiseActorRef(
_watchedByDoNotCallMeDirectly
}
- private[this] def watchedBy: Set[ActorRef] = watchedByHandle.get(this)
+ // volatile read: published across threads via compareAndSet in
updateWatchedBy
+ private[this] def watchedBy: Set[ActorRef] =
watchedByHandle.getVolatile(this)
private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy:
Set[ActorRef]): Boolean =
watchedByHandle.compareAndSet(this, oldWatchedBy, newWatchedBy)
@@ -570,12 +571,15 @@ private[pekko] final class PromiseActorRef(
case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
}
- private[this] def state: AnyRef = stateHandle.get(this)
+ // volatile read: published across threads via compareAndSet/setVolatile
below
+ private[this] def state: AnyRef = stateHandle.getVolatile(this)
private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
stateHandle.compareAndSet(this, oldState, newState)
- private[this] def setState(newState: AnyRef): Unit = stateHandle.set(this,
newState)
+ // volatile write: ordered against the concurrent reads in state; restores
the
+ // putObjectVolatile semantics this had before the VarHandle migration
+ private[this] def setState(newState: AnyRef): Unit =
stateHandle.setVolatile(this, newState)
override def getParent: InternalActorRef = provider.tempContainer
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
index 75d4a0feca..25c296f06f 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala
@@ -291,7 +291,9 @@ class CircuitBreaker(
* @return Reference to current state
*/
private[this] def currentState: State =
- AbstractCircuitBreaker.stateHandle.get(this)
+ // volatile read: state is published across threads via compareAndSet in
swapState;
+ // restores the getObjectVolatile semantics this had before the VarHandle
migration
+ AbstractCircuitBreaker.stateHandle.getVolatile(this)
/**
* Helper method for updating the underlying resetTimeout via VarHandle
@@ -303,7 +305,8 @@ class CircuitBreaker(
* Helper method for accessing to the underlying resetTimeout via VarHandle
*/
private[this] def currentResetTimeout: FiniteDuration =
- AbstractCircuitBreaker.resetTimeoutHandle.get(this)
+ // volatile read: see currentState; published via compareAndSet in
swapResetTimeout
+ AbstractCircuitBreaker.resetTimeoutHandle.getVolatile(this)
/**
* Wraps invocations of asynchronous calls that need to be protected.
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
index 0df2d90350..d2e543414d 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
@@ -274,7 +274,9 @@ private[remote] class Association(
* @return Reference to current shared state
*/
def associationState: AssociationState =
- AbstractAssociation.sharedStateHandle.get(this)
+ // volatile read: published across threads via compareAndSet in swapState;
restores the
+ // getObjectVolatile semantics this had before the VarHandle migration
+ AbstractAssociation.sharedStateHandle.getVolatile(this)
def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit
= {
val current = associationState
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]