This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch optimize-stage-actor-ref-lazy-dispatch
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 3619719aa619a4355c0deda3856f22352b76626e
Author: He-Pin <[email protected]>
AuthorDate: Mon Jun 1 13:11:36 2026 +0800

    fix: optimize lazy stage actor dispatch via MPSC drain coalescing
    
    Motivation:
    
    Lazy `getStageActor` refs paid one actor mailbox enqueue per external tell:
    sender -> FunctionRef -> ConcurrentAsyncCallback.invokeWithPromise -> 
interpreter
    self ! AsyncInput. Under high tell rate to a single stage actor the 
bottleneck
    is mailbox traffic (envelope alloc, cross-thread wakeup, dequeue), not the
    dispatch lambda. Each tell also allocated a Tuple2, an AsyncInput, and a
    mailbox Envelope.
    
    Modification:
    
    Lazy `getStageActor` now installs an MPSC dispatch (`LazyDispatch`) that:
      - enqueues (sender, msg) into a Vyukov MPSC queue (`AbstractNodeQueue`)
      - elects a single drain via IDLE -> SCHEDULED CAS; only the elected
        producer pays a mailbox enqueue
      - drains on the interpreter thread in a tight loop bounded by
        `stage-actor-drain-batch` (default 16), then either publishes IDLE
        (with the canonical recheck race fix) or re-schedules another envelope
        so other BoundaryEvents interleave naturally via the actor mailbox
      - preserves `isStageCompleted` semantics: items added after completion are
        dropped exactly as the old per-tell path silently skipped them.
    
    The eager construction path (used before stream demand) is unchanged and
    still routes through the materializer supervisor + `AsyncCallback`.
    
    JIT/GC notes:
      - `LazyDispatch` is a `final class` and extends `AbstractNodeQueue`
        directly so it is its own queue (one fewer allocation and field deref).
      - `scheduledState` is a plain `@volatile var Int` driven by a static
        `VarHandle` (created via `MethodHandles.privateLookupIn`), avoiding the
        per-instance `AtomicBoolean` wrapper. Same pattern as
        `AbstractNodeQueue` itself.
      - The dispatch `apply` is monomorphic per StageActor instance; the drain
        callback is allocated once and reused. The FunctionRef lambda is
        rewritten as `(sender, msg) =>` to skip the Tuple2 allocation on the
        PoisonPill / Kill warning path.
      - Per-tell allocation is now 1 Node + 1 Tuple2 (the Tuple2 is forced by
        the public `StageActorRef.Receive` type); AsyncInput and Envelope are
        amortized across the batch.
    
    Configuration:
    
    `pekko.stream.materializer.stage-actor-drain-batch` (default 16) bounds the
    per-envelope drain. The default aligns with `InputBuffer.max` and keeps the
    per-actor-wakeup work in the same order of magnitude as the dispatcher
    throughput; smaller values trade tell throughput for tighter interleaving
    with upstream/downstream events, larger values do the opposite.
    
    Binary compatibility:
    
    The original 4-arg `private[pekko] StageActor` constructor
    (`materializer, getAsyncCallback, initialReceive, name`) is preserved as
    an auxiliary constructor and continues to use the eager
    `AsyncCallback` path. A new 5-arg `private[pekko]` constructor
    (`materializer, interpreter, logic, initialReceive, name`) is added for the
    lazy path. `sbt stream/mimaReportBinaryIssues` passes clean.
    
    Result:
    
    `StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k` (JMH 2 forks x 10
    iter x 2s, macOS) - throughput is now bounded by Vyukov enqueue + drain
    loop rather than per-tell mailbox traffic:
    
    | Variant                          | Throughput (ops/s)   | vs main |
    |----------------------------------|----------------------|---------|
    | main                             | 6,587,561 +- 616,243 | 1.00x   |
    | MPSC + drain coalescing (cap=16) | 13,044,829 +- 1,525K | 1.98x   |
    | MPSC + drain coalescing (cap=8)  | 13,589,612 +- 2,114K | 2.06x   |
    
    BroadcastHubBenchmark is unchanged in this measurement (its bottleneck is
    fan-out broadcasting, not stage-actor tell traffic).
    
    Tests:
    
    - sbt "stream / compile" "stream / mimaReportBinaryIssues"
    - sbt "stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.StageActorRefSpec"
      (11/11)
    - sbt "stream-tests / Test / testOnly
      org.apache.pekko.stream.scaladsl.ActorRefSinkSpec
      org.apache.pekko.stream.scaladsl.ActorRefSourceSpec
      org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec
      org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec" (42/42)
    - sbt "stream-tests / Test / testOnly
      org.apache.pekko.stream.scaladsl.QueueSinkSpec
      org.apache.pekko.stream.scaladsl.QueueSourceSpec
      org.apache.pekko.stream.scaladsl.HubSpec" (94/94)
    - sbt scalafmt headerCheck
    - sbt "bench-jmh / Jmh / run -i 10 -wi 5 -f 2 -r 2s -w 2s
      .*StageActorRefBenchmark.*"
    
    References:
    
    Refs https://github.com/akka/akka-core/issues/26857 (public issue only;
    clean-room implementation)
---
 .../pekko/stream/StageActorRefBenchmark.scala      | 136 +++++++++++++
 .../pekko/stream/scaladsl/StageActorRefSpec.scala  | 114 ++++++++++-
 stream/src/main/resources/reference.conf           |   7 +
 .../org/apache/pekko/stream/stage/GraphStage.scala | 211 ++++++++++++++++++---
 4 files changed, 441 insertions(+), 27 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala
new file mode 100644
index 0000000000..007253e275
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/StageActorRefBenchmark.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.Await
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.actor.ActorSystem
+import pekko.actor.NoSerializationVerificationNeeded
+import pekko.stream.scaladsl.Keep
+import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+import pekko.stream.stage.GraphStageLogic
+import pekko.stream.stage.GraphStageWithMaterializedValue
+import pekko.stream.stage.InHandler
+
+object StageActorRefBenchmark {
+  final val OperationsPerInvocation = 10000
+  private case object CountDown extends NoSerializationVerificationNeeded
+
+  private final class Control {
+    private val ready = new CountDownLatch(1)
+    @volatile private var ref: ActorRef = _
+    @volatile private var latch: CountDownLatch = _
+
+    def init(ref: ActorRef): Unit = {
+      this.ref = ref
+      ready.countDown()
+    }
+
+    def stageActorRef: ActorRef = {
+      if (!ready.await(10, TimeUnit.SECONDS))
+        throw new RuntimeException("Stage actor ref was not initialized")
+      ref
+    }
+
+    def reset(expectedMessages: Int): Unit =
+      latch = new CountDownLatch(expectedMessages)
+
+    def countDown(): Unit =
+      latch.countDown()
+
+    def awaitDone(): Unit =
+      if (!latch.await(10, TimeUnit.SECONDS))
+        throw new RuntimeException("Stage actor ref benchmark messages timed 
out")
+  }
+
+  private final class StageActorSink extends 
GraphStageWithMaterializedValue[SinkShape[Any], Control] {
+    val in: Inlet[Any] = Inlet("StageActorSink.in")
+    override val shape: SinkShape[Any] = SinkShape(in)
+
+    override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Control) = {
+      val control = new Control
+
+      val logic = new GraphStageLogic(shape) {
+        override def preStart(): Unit = {
+          control.init(getStageActor {
+            case (_, CountDown) => control.countDown()
+          }.ref)
+          pull(in)
+        }
+
+        setHandler(
+          in,
+          new InHandler {
+            override def onPush(): Unit = pull(in)
+          })
+      }
+
+      logic -> control
+    }
+  }
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class StageActorRefBenchmark {
+  import StageActorRefBenchmark._
+
+  implicit val system: ActorSystem = ActorSystem("StageActorRefBenchmark")
+
+  private var completion: Promise[Option[Any]] = _
+  private var control: Control = _
+  private var stageActorRef: ActorRef = _
+
+  @Setup
+  def setup(): Unit = {
+    SystemMaterializer(system).materializer
+    val materialized = Source.maybe[Any].toMat(Sink.fromGraph(new 
StageActorSink))(Keep.both).run()
+    completion = materialized._1
+    control = materialized._2
+    stageActorRef = control.stageActorRef
+  }
+
+  @TearDown
+  def shutdown(): Unit = {
+    completion.trySuccess(None)
+    Await.result(system.terminate(), 5.seconds)
+  }
+
+  @Benchmark
+  @OperationsPerInvocation(OperationsPerInvocation)
+  def lazy_stage_actor_ref_tell_10k(): Unit = {
+    control.reset(OperationsPerInvocation)
+    var remaining = OperationsPerInvocation
+    while (remaining > 0) {
+      stageActorRef ! CountDown
+      remaining -= 1
+    }
+    control.awaitDone()
+  }
+}
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala
index 372e98f3ea..b5bc5722d7 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/StageActorRefSpec.scala
@@ -18,16 +18,19 @@ import scala.concurrent.Promise
 import scala.concurrent.duration._
 
 import org.apache.pekko
+import pekko.actor.ActorPath
 import pekko.actor.ActorRef
 import pekko.actor.Kill
 import pekko.actor.NoSerializationVerificationNeeded
 import pekko.actor.PoisonPill
 import pekko.event.Logging
 import pekko.stream._
+import pekko.stream.impl.fusing.GraphInterpreter
 import pekko.stream.stage.GraphStageLogic
 import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.stream.stage.InHandler
 import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.EventFilter
 import pekko.testkit.ImplicitSender
 import pekko.testkit.TestEvent
@@ -181,6 +184,50 @@ class StageActorRefSpec extends StreamSpec with 
ImplicitSender {
       res.futureValue should ===(42)
     }
 
+    "run non-eager stage actor messages in the graph interpreter actor" in {
+      val (_, res) = 
Source.maybe[Int].toMat(sumStage(testActor))(Keep.both).run()
+
+      val stageRef = expectMsgType[ActorRef]
+      stageRef ! AddAndTell(1)
+      expectMsg(1)
+
+      stageRef ! ReportStageActorInterpreter
+      val location = expectMsgType[StageActorLocation]
+
+      location.stageActorParent should ===(location.interpreter)
+
+      stageRef ! StopNow
+      res.futureValue should ===(1)
+    }
+
+    "keep eagerly materialized stage actors usable before stream demand" in {
+      val (ref, probe) = Source
+        .actorRef[Int]({
+            case CompleteNow => CompletionStrategy.Immediately
+          }, PartialFunction.empty, bufferSize = 8, OverflowStrategy.fail)
+        .toMat(TestSink[Int]())(Keep.both)
+        .run()
+
+      ref ! 1
+      probe.request(1).expectNext(1)
+      ref ! CompleteNow
+      probe.expectComplete()
+    }
+
+    "keep eagerly materialized stage actors attached to the stream supervisor" 
in {
+      val (source, res) = 
Source.maybe[Int].toMat(eagerLocationStage(testActor))(Keep.both).run()
+
+      val stageRef = expectMsgType[ActorRef]
+      stageRef ! ReportEagerStageActorInterpreter
+      val location = expectMsgType[EagerStageActorLocation]
+
+      location.stageActorParent should ===(location.supervisor)
+      location.stageActorParent should !==(location.interpreter)
+
+      source.success(None)
+      res.futureValue should ===(0)
+    }
+
   }
 
 }
@@ -194,10 +241,19 @@ object StageActorRefSpec {
     case object BecomeStringEcho extends NoSerializationVerificationNeeded
     case object PullNow extends NoSerializationVerificationNeeded
     case object StopNow extends NoSerializationVerificationNeeded
+    case object ReportStageActorInterpreter extends 
NoSerializationVerificationNeeded
+    case object ReportEagerStageActorInterpreter extends 
NoSerializationVerificationNeeded
+    case object CompleteNow extends NoSerializationVerificationNeeded
+    final case class StageActorLocation(stageActorParent: ActorPath, 
interpreter: ActorPath)
+        extends NoSerializationVerificationNeeded
+    final case class EagerStageActorLocation(stageActorParent: ActorPath, 
supervisor: ActorPath, interpreter: ActorPath)
+        extends NoSerializationVerificationNeeded
   }
 
   import ControlProtocol._
 
+  def eagerLocationStage(probe: ActorRef) = EagerLocationStage(probe)
+
   case class SumTestStage(probe: ActorRef) extends 
GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] {
     val in = Inlet[Int]("IntSum.in")
     override val shape: SinkShape[Int] = SinkShape.of(in)
@@ -216,10 +272,12 @@ object StageActorRefSpec {
 
         def behavior(m: (ActorRef, Any)): Unit = {
           m match {
-            case (_, Add(n))                     => sum += n
-            case (_, PullNow)                    => pull(in)
-            case (sender, CallInitStageActorRef) => sender ! 
getStageActor(behavior).ref
-            case (_, BecomeStringEcho)           =>
+            case (_, Add(n))                           => sum += n
+            case (_, PullNow)                          => pull(in)
+            case (sender, CallInitStageActorRef)       => sender ! 
getStageActor(behavior).ref
+            case (sender, ReportStageActorInterpreter) =>
+              sender ! StageActorLocation(stageActor.ref.path.parent, 
GraphInterpreter.currentInterpreter.context.path)
+            case (_, BecomeStringEcho) =>
               getStageActor {
                 case (theSender, msg) => theSender ! msg.toString
               }
@@ -258,4 +316,52 @@ object StageActorRefSpec {
     }
   }
 
+  case class EagerLocationStage(probe: ActorRef) extends 
GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] {
+    val in = Inlet[Int]("EagerLocation.in")
+    override val shape: SinkShape[Int] = SinkShape.of(in)
+
+    override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[Int]) = {
+      val p: Promise[Int] = Promise()
+
+      val logic = new GraphStageLogic(shape) {
+        var stageRef: ActorRef = _
+        var interpreterPath: ActorPath = _
+        var supervisorPath: ActorPath = _
+
+        override def preStart(): Unit = {
+          interpreterPath = interpreter.context.path
+          supervisorPath = interpreter.materializer.supervisor.path
+          stageRef = getEagerStageActor(interpreter.materializer) {
+            case (sender, ReportEagerStageActorInterpreter) =>
+              sender ! EagerStageActorLocation(stageRef.path.parent, 
supervisorPath, interpreterPath)
+            case _ => throw new RuntimeException("unexpected message")
+          }.ref
+          pull(in)
+          probe ! stageRef
+        }
+
+        setHandler(
+          in,
+          new InHandler {
+            override def onPush(): Unit = {
+              p.trySuccess(grab(in))
+              completeStage()
+            }
+
+            override def onUpstreamFinish(): Unit = {
+              p.trySuccess(0)
+              completeStage()
+            }
+
+            override def onUpstreamFailure(ex: Throwable): Unit = {
+              p.tryFailure(ex)
+              failStage(ex)
+            }
+          })
+      }
+
+      logic -> p.future
+    }
+  }
+
 }
diff --git a/stream/src/main/resources/reference.conf 
b/stream/src/main/resources/reference.conf
index f4e4bd3193..059d28079a 100644
--- a/stream/src/main/resources/reference.conf
+++ b/stream/src/main/resources/reference.conf
@@ -83,6 +83,13 @@ pekko {
       # Allows to accelerate message processing that happening within same 
actor but keep system responsive.
       sync-processing-limit = 1000
 
+      # Upper bound on stage-actor messages drained per envelope for non-eager 
`getStageActor` refs. Lazy
+      # stage actors batch external `tell` deliveries into a MPSC queue and 
elect a single drain envelope;
+      # this cap bounds the burst so that other BoundaryEvents 
(pull/push/complete) can still interleave
+      # naturally via the actor mailbox. Smaller = better fairness for 
upstream/downstream events;
+      # larger = better tell throughput. Must be >= 1.
+      stage-actor-drain-batch = 16
+
       debug {
         # Enables the fuzzing mode which increases the chance of race 
conditions
         # by aggressively reordering events and making certain operations more
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala 
b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
index 5e1aec586c..b676e1c29d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.stream.stage
 
 import java.util.Spliterator
+import java.lang.invoke.{ MethodHandles, VarHandle }
 import java.util.concurrent.{ CompletionStage, ConcurrentHashMap }
 import java.util.concurrent.atomic.AtomicReference
 
@@ -27,6 +28,7 @@ import org.apache.pekko
 import pekko.{ Done, NotUsed }
 import pekko.actor._
 import pekko.annotation.InternalApi
+import pekko.dispatch.AbstractNodeQueue
 import pekko.japi.function.{ Effect, Procedure }
 import pekko.stream._
 import pekko.stream.Attributes.SourceLocation
@@ -206,29 +208,61 @@ object GraphStageLogic {
    *
    * Not for user instantiation, use [[GraphStageLogic.getStageActor]].
    */
-  final class StageActor @InternalApi() private[pekko] (
+  final class StageActor @InternalApi() private (
       materializer: Materializer,
-      getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, 
Any)],
       initialReceive: StageActorRef.Receive,
-      name: String) {
+      name: String,
+      cell: ActorCell,
+      buildDispatch: StageActorRef.Receive => ((ActorRef, Any)) => Unit) {
+
+    @InternalApi private[pekko] def this(
+        materializer: Materializer,
+        getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, 
Any)],
+        initialReceive: StageActorRef.Receive,
+        name: String) =
+      this(
+        materializer,
+        initialReceive,
+        name,
+        StageActor.localCell(materializer.supervisor, "Stream supervisor"),
+        receive => getAsyncCallback(receive).invoke)
+
+    @InternalApi private[pekko] def this(
+        materializer: Materializer,
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        initialReceive: StageActorRef.Receive,
+        name: String) =
+      this(
+        materializer,
+        initialReceive,
+        name,
+        StageActor.localCell(interpreter.context, "Graph interpreter"),
+        // Coalesce per-tell mailbox traffic: N tells produce 1 AsyncInput 
envelope (amortized).
+        receive =>
+          new StageActor.LazyDispatch(
+            interpreter,
+            logic,
+            receive.asInstanceOf[Any => Unit],
+            StageActor.drainBatchSize(materializer)))
+
+    // Monomorphic Function1 captured once; JIT can inline the apply at the 
FunctionRef call site.
+    private val dispatch: ((ActorRef, Any)) => Unit = 
buildDispatch(internalReceive)
 
-    private val callback = getAsyncCallback(internalReceive)
-
-    private def cell = materializer.supervisor match {
-      case ref: LocalActorRef => ref.underlying
-      case unknown            =>
-        throw new IllegalStateException(s"Stream supervisor must be a local 
actor, was [${unknown.getClass.getName}]")
-    }
     private val functionRef: FunctionRef = {
-      val f: (ActorRef, Any) => Unit = {
-        case (_, m @ (PoisonPill | Kill)) =>
-          materializer.logger.warning(
-            "{} message sent to StageActor({}) will be ignored, since it is 
not a real Actor." +
-            "Use a custom message type to communicate with it instead.",
-            m,
-            functionRef.path)
-        case pair => callback.invoke(pair)
-      }
+      // Explicit (sender, msg) lambda (not a pattern-match Function2 literal) 
so the PoisonPill / Kill
+      // branch matches on `msg` directly and does not allocate a Tuple2. The 
regular branch still
+      // constructs one tuple per tell, as required by the `((ActorRef, Any)) 
=> Unit` public Receive type.
+      val f: (ActorRef, Any) => Unit = (sender, msg) =>
+        msg match {
+          case PoisonPill | Kill =>
+            materializer.logger.warning(
+              "{} message sent to StageActor({}) will be ignored, since it is 
not a real Actor." +
+              "Use a custom message type to communicate with it instead.",
+              msg,
+              functionRef.path)
+          case _ => dispatch((sender, msg))
+        }
 
       cell.addFunctionRef(f, name)
     }
@@ -275,6 +309,124 @@ object GraphStageLogic {
     type Receive = ((ActorRef, Any)) => Unit
   }
 
+  private object StageActor {
+    def localCell(ref: ActorRef, description: String): ActorCell =
+      ref match {
+        case ref: LocalActorRef       => ref.underlying
+        case ref: RepointableActorRef =>
+          ref.underlying match {
+            case cell: ActorCell => cell
+            case unknown         =>
+              throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+          }
+        case unknown =>
+          throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+      }
+
+    /**
+     * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the 
materializer's ActorSystem config.
+     * Called once per lazy StageActor construction (never on the hot path). 
Bounded to `>= 1`.
+     */
+    def drainBatchSize(materializer: Materializer): Int =
+      Math.max(1, 
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+    private final val SchedStateIdle: Int = 0
+    private final val SchedStateScheduled: Int = 1
+
+    /**
+     * VarHandle for [[LazyDispatch.scheduledState]]: one static handle shared 
across all instances; the
+     * per-instance state is just a primitive `int` on the LazyDispatch object 
— no AtomicBoolean wrapper.
+     * `privateLookupIn` grants access to LazyDispatch's private fields from 
this (same-package) companion;
+     * this is the same pattern used by [[AbstractNodeQueue]] itself.
+     */
+    private val schedStateHandle: VarHandle = {
+      val lookup = MethodHandles.privateLookupIn(classOf[LazyDispatch], 
MethodHandles.lookup())
+      lookup.findVarHandle(classOf[LazyDispatch], "scheduledState", 
java.lang.Integer.TYPE)
+    }
+
+    /**
+     * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and 
elect a single drain via
+     * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox 
enqueue. The drain runs on the
+     * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, 
then either publishes IDLE
+     * (with a recheck for the publish-window race) or re-schedules another 
envelope to yield to other
+     * BoundaryEvents.
+     *
+     * JIT/GC notes:
+     *  - `final class` + monomorphic per-StageActor instance → JIT 
devirtualizes the apply at the
+     *    FunctionRef call site.
+     *  - Extends [[AbstractNodeQueue]] directly so the queue head atomic and 
the dispatch function share one
+     *    object (one allocation per StageActor, one fewer field deref on the 
producer hot path).
+     *  - `scheduledState` is a plain `@volatile var Int` updated via a shared 
`VarHandle` (companion-static)
+     *    so the per-instance state cost is one `int` field instead of a 
separate AtomicBoolean object.
+     *  - All hot-path state is `private[this]` → direct field access, no 
accessor methods.
+     *  - `drainBatchSize` is read once into a stack-local at the top of 
`drain` so the JIT can treat the loop
+     *    bound as a constant.
+     *  - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 
1 Tuple2 (~24 bytes). The Tuple2
+     *    is forced by the public `StageActorRef.Receive` type. No AsyncInput 
/ Envelope per tell — those are
+     *    amortized across the batch.
+     */
+    private final class LazyDispatch(
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        handler: Any => Unit,
+        drainBatchSize: Int)
+        extends AbstractNodeQueue[(ActorRef, Any)]
+        with (((ActorRef, Any)) => Unit) {
+
+      // Updated only via `schedStateHandle` (compareAndSet / setRelease). The 
compiler can't see those
+      // reflective accesses, hence the @nowarn for "never updated" and "never 
used".
+      @nowarn("cat=unused-privates")
+      @nowarn("msg=never updated")
+      @volatile private var scheduledState: Int = SchedStateIdle
+
+      // Reused across all drain batches; allocated once at construction.
+      private[this] val drainCallback: Any => Unit = (_: Any) => drain()
+
+      override def apply(pair: (ActorRef, Any)): Unit = {
+        add(pair) // Vyukov producer path: getAndSet + release-store, no CAS 
spin
+        // Double-checked CAS: uncontended fast path is one acquire-load; only 
the IDLE->SCHEDULED winner
+        // pays a CAS + mailbox push.
+        if (schedStateHandle.getAcquire(this).asInstanceOf[Int] == 
SchedStateIdle &&
+          schedStateHandle.compareAndSet(this, SchedStateIdle, 
SchedStateScheduled))
+          scheduleDrain()
+      }
+
+      private def scheduleDrain(): Unit =
+        // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to 
drainBatchSize tells).
+        interpreter.onAsyncInput(logic, null, NoPromise, drainCallback)
+
+      private def drain(): Unit = {
+        val limit = drainBatchSize // hoisted to a local so JIT treats it as a 
loop-invariant constant
+        var processed = 0
+        while (processed < limit) {
+          if (interpreter.isStageCompleted(logic)) {
+            // Stage completed mid-drain; drop the remainder (matches the 
original per-tell behaviour where
+            // runAsyncInput silently skipped completed stages). Don't 
reschedule — no future drain will run.
+            while (poll() ne null) ()
+            schedStateHandle.setRelease(this, SchedStateIdle)
+            return
+          }
+          val item = poll()
+          if (item eq null) {
+            schedStateHandle.setRelease(this, SchedStateIdle)
+            // Recheck race: a producer may have added between `poll == null` 
and the IDLE publish above.
+            // That producer saw scheduled=SCHEDULED and skipped the mailbox 
send, so we must re-elect.
+            if (!isEmpty &&
+              schedStateHandle.compareAndSet(this, SchedStateIdle, 
SchedStateScheduled))
+              scheduleDrain()
+            return
+          }
+          handler(item)
+          processed += 1
+        }
+        // Hit batch cap with items potentially still queued. Re-schedule 
another envelope so other
+        // BoundaryEvents (pull/push/complete) can interleave via the actor 
mailbox. `scheduledState` stays
+        // SCHEDULED: concurrent producers correctly observe SCHEDULED and 
skip; the new envelope will drain.
+        scheduleDrain()
+      }
+    }
+  }
+
   /**
    * Internal API
    *
@@ -1339,8 +1491,8 @@ abstract class GraphStageLogic private[stream] (val 
inCount: Int, val outCount:
 
   /**
    * Initialize a [[GraphStageLogic.StageActorRef]] which can be used to 
interact with from the outside world "as-if" a [[pekko.actor.Actor]].
-   * The messages are looped through the [[getAsyncCallback]] mechanism of 
[[GraphStage]] so they are safe to modify
-   * internal state of this operator.
+   * The messages are delivered through the owning stream interpreter so they 
are safe to modify internal state of this
+   * operator.
    *
    * This method must (the earliest) be called after the [[GraphStageLogic]] 
constructor has finished running,
    * for example from the [[preStart]] callback the graph operator logic 
provides.
@@ -1358,7 +1510,20 @@ abstract class GraphStageLogic private[stream] (val 
inCount: Int, val outCount:
    * @return minimal actor with watch method
    */
   final protected def getStageActor(receive: ((ActorRef, Any)) => Unit): 
StageActor =
-    getEagerStageActor(interpreter.materializer)(receive)
+    _stageActor match {
+      case null =>
+        val currentInterpreter = interpreter
+        _stageActor = new StageActor(
+          currentInterpreter.materializer,
+          currentInterpreter,
+          this,
+          receive,
+          stageActorName)
+        _stageActor
+      case existing =>
+        existing.become(receive)
+        existing
+    }
 
   /**
    * INTERNAL API
@@ -1382,7 +1547,7 @@ abstract class GraphStageLogic private[stream] (val 
inCount: Int, val outCount:
    * Override and return a name to be given to the StageActor of this operator.
    *
    * This method will be only invoked and used once, during the first 
[[getStageActor]]
-   * invocation whichc reates the actor, since subsequent `getStageActors` 
calls function
+   * invocation which creates the actor, since subsequent `getStageActors` 
calls function
    * like `become`, rather than creating new actors.
    *
    * Returns an empty string by default, which means that the name will a 
unique generated String (e.g. "$$a").


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to