This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 973446d1c7 Optimize stream boundary event allocation (#2916)
973446d1c7 is described below

commit 973446d1c759c371c406a3427d81bfff96f2557f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 18:02:52 2026 +0800

    Optimize stream boundary event allocation (#2916)
    
    * Optimize stream boundary event allocation
    
    * fix for stray } in exception message
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .../ActorGraphInterpreterBoundaryBenchmark.scala   | 186 +++++++++++++++++++++
 .../pr-2916-boundary-event-allocation.excludes     |  30 ++++
 .../stream/impl/fusing/ActorGraphInterpreter.scala | 182 ++++++++++----------
 3 files changed, 310 insertions(+), 88 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/ActorGraphInterpreterBoundaryBenchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/ActorGraphInterpreterBoundaryBenchmark.scala
new file mode 100644
index 0000000000..d737283cc4
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/ActorGraphInterpreterBoundaryBenchmark.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.annotations.OperationsPerInvocation
+import org.openjdk.jmh.infra.Blackhole
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+import org.reactivestreams.Subscription
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.stream.scaladsl.Keep
+import pekko.stream.scaladsl.RunnableGraph
+import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+
+import com.typesafe.config.ConfigFactory
+
+object ActorGraphInterpreterBoundaryBenchmark {
+  final val ElementCount = 100 * 1000
+  final val CancelAfter = ElementCount / 2
+
+  final class SynchronousPublisher(elements: Array[MutableElement]) extends 
Publisher[MutableElement] {
+    override def subscribe(subscriber: Subscriber[_ >: MutableElement]): Unit 
= {
+      if (subscriber eq null) throw new NullPointerException("subscriber")
+
+      subscriber.onSubscribe(new Subscription {
+        private[this] var cancelled = false
+        private[this] var index = 0
+
+        override def request(n: Long): Unit =
+          if (!cancelled) {
+            if (n <= 0) {
+              cancelled = true
+              subscriber.onError(new IllegalArgumentException("non-positive 
subscription request"))
+            } else {
+              var remaining = n
+              while (remaining > 0 && index < elements.length && !cancelled) {
+                subscriber.onNext(elements(index))
+                index += 1
+                remaining -= 1
+              }
+              if (index == elements.length && !cancelled) {
+                cancelled = true
+                subscriber.onComplete()
+              }
+            }
+          }
+
+        override def cancel(): Unit = cancelled = true
+      })
+    }
+  }
+
+  final class RequestOneSubscriber(
+      blackhole: Blackhole,
+      latch: CountDownLatch,
+      cancelAfter: Int)
+      extends Subscriber[MutableElement] {
+    private[this] var subscription: Subscription = _
+    private[this] var seen = 0
+    private[this] val failure = new AtomicReference[Throwable]
+
+    override def onSubscribe(subscription: Subscription): Unit = {
+      this.subscription = subscription
+      subscription.request(1)
+    }
+
+    override def onNext(element: MutableElement): Unit = {
+      blackhole.consume(element.value)
+      seen += 1
+      if (cancelAfter > 0 && seen == cancelAfter) {
+        subscription.cancel()
+        latch.countDown()
+      } else {
+        subscription.request(1)
+      }
+    }
+
+    override def onError(cause: Throwable): Unit = {
+      failure.set(cause)
+      latch.countDown()
+    }
+
+    override def onComplete(): Unit = latch.countDown()
+
+    def awaitExpected(expected: Int): Unit = {
+      if (!latch.await(10, TimeUnit.SECONDS))
+        throw new RuntimeException("Latch timed out")
+
+      val cause = failure.get()
+      if (cause ne null) throw new RuntimeException("Subscriber failed", cause)
+      if (seen < expected) throw new RuntimeException(s"Expected at least 
[$expected] elements but got [$seen]")
+    }
+  }
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class ActorGraphInterpreterBoundaryBenchmark {
+  import ActorGraphInterpreterBoundaryBenchmark._
+
+  implicit val system: ActorSystem = ActorSystem(
+    "test",
+    ConfigFactory.parseString(s"""
+      pekko.stream.materializer.sync-processing-limit = ${Int.MaxValue}
+    """))
+
+  private var publisherSource: RunnableGraph[CountDownLatch] = _
+  private var publisherSink: RunnableGraph[Publisher[MutableElement]] = _
+
+  @Setup
+  def setup(): Unit = {
+    SystemMaterializer(system).materializer
+    val testElements = Array.fill(ElementCount)(new MutableElement(1))
+    val testSource = Source.fromGraph(new TestSource(testElements))
+
+    publisherSource =
+      Source
+        .fromPublisher(new SynchronousPublisher(testElements))
+        .toMat(Sink.fromGraph(new JitSafeCompletionLatch))(Keep.right)
+    publisherSink =
+      testSource.toMat(Sink.asPublisher(false))(Keep.right)
+  }
+
+  @Benchmark
+  @OperationsPerInvocation(ElementCount)
+  def external_publisher_to_sink(blackhole: Blackhole): Unit = {
+    FusedGraphsBenchmark.blackhole = blackhole
+    val latch = publisherSource.run()
+    if (!latch.await(10, TimeUnit.SECONDS))
+      throw new RuntimeException("Latch timed out")
+  }
+
+  @Benchmark
+  @OperationsPerInvocation(ElementCount)
+  def source_to_external_subscriber_request_one(blackhole: Blackhole): Unit = {
+    val latch = new CountDownLatch(1)
+    val subscriber = new RequestOneSubscriber(blackhole, latch, cancelAfter = 
0)
+
+    publisherSink.run().subscribe(subscriber)
+    subscriber.awaitExpected(ElementCount)
+  }
+
+  @Benchmark
+  @OperationsPerInvocation(CancelAfter)
+  def source_to_external_subscriber_cancel_halfway(blackhole: Blackhole): Unit 
= {
+    val latch = new CountDownLatch(1)
+    val subscriber = new RequestOneSubscriber(blackhole, latch, cancelAfter = 
CancelAfter)
+
+    publisherSink.run().subscribe(subscriber)
+    subscriber.awaitExpected(CancelAfter)
+  }
+
+  @TearDown
+  def shutdown(): Unit = {
+    Await.result(system.terminate(), 5.seconds)
+  }
+}
diff --git 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes
 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes
new file mode 100644
index 0000000000..d8a5fbb51a
--- /dev/null
+++ 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Optimize internal boundary event allocation
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BoundaryEvent*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*SimpleBoundaryEvent*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnError*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnComplete*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnNext*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnSubscribe*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*Cancel*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*RequestMore*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*SubscribePending*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*Abort*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*AsyncInput*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*ResumeShell*")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
index c1442bccc8..39e16aa9c3 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
@@ -53,7 +53,7 @@ import org.reactivestreams.Subscription
   object Resume extends DeadLetterSuppression with 
NoSerializationVerificationNeeded
   object Snapshot extends NoSerializationVerificationNeeded
 
-  trait BoundaryEvent extends DeadLetterSuppression with 
NoSerializationVerificationNeeded {
+  sealed abstract class BoundaryEvent extends DeadLetterSuppression with 
NoSerializationVerificationNeeded {
     def shell: GraphInterpreterShell
 
     @InternalStableApi
@@ -63,7 +63,7 @@ import org.reactivestreams.Subscription
     def cancel(): Unit
   }
 
-  trait SimpleBoundaryEvent extends BoundaryEvent {
+  sealed abstract class SimpleBoundaryEvent extends BoundaryEvent {
     final override def execute(eventLimit: Int): Int = {
       val wasNotShutdown = !shell.interpreter.isStageCompleted(logic)
       execute()
@@ -79,60 +79,64 @@ import org.reactivestreams.Subscription
   def props(shell: GraphInterpreterShell): Props =
     Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local)
 
-  @InternalStableApi
-  class BatchingActorInputBoundary(
-      size: Int,
-      shell: GraphInterpreterShell,
-      publisher: Publisher[Any],
-      internalPortName: String)
-      extends UpstreamBoundaryStageLogic[Any]
-      with OutHandler {
-
-    // can't be final because of SI-4440
-    case class OnError(shell: GraphInterpreterShell, cause: Throwable) extends 
SimpleBoundaryEvent {
-      override def execute(): Unit = {
-        if (GraphInterpreter.Debug) println(s"${interpreter.Name}  onError 
port=$internalPortName")
-        BatchingActorInputBoundary.this.onError(cause)
-      }
+  final class OnError(boundary: BatchingActorInputBoundary, cause: Throwable) 
extends SimpleBoundaryEvent {
+    override def execute(): Unit = {
+      if (GraphInterpreter.Debug)
+        println(s"${boundary.shell.interpreter.Name}  onError 
port=${boundary.internalPortName}")
+      boundary.onError(cause)
+    }
 
-      override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+    override def shell: GraphInterpreterShell = boundary.shell
+    override def logic: GraphStageLogic = boundary
+    override def cancel(): Unit = ()
+  }
 
-      override def cancel(): Unit = ()
+  final class OnComplete(boundary: BatchingActorInputBoundary) extends 
SimpleBoundaryEvent {
+    override def execute(): Unit = {
+      if (GraphInterpreter.Debug)
+        println(s"${boundary.shell.interpreter.Name}  onComplete 
port=${boundary.internalPortName}")
+      boundary.onComplete()
     }
-    // can't be final because of SI-4440
-    case class OnComplete(shell: GraphInterpreterShell) extends 
SimpleBoundaryEvent {
-      override def execute(): Unit = {
-        if (GraphInterpreter.Debug) println(s"${interpreter.Name}  onComplete 
port=$internalPortName")
-        BatchingActorInputBoundary.this.onComplete()
-      }
 
-      override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+    override def shell: GraphInterpreterShell = boundary.shell
+    override def logic: GraphStageLogic = boundary
+    override def cancel(): Unit = ()
+  }
 
-      override def cancel(): Unit = ()
+  final class OnNext(boundary: BatchingActorInputBoundary, e: Any) extends 
SimpleBoundaryEvent {
+    override def execute(): Unit = {
+      if (GraphInterpreter.Debug)
+        println(s"${boundary.shell.interpreter.Name} onNext $e 
port=${boundary.internalPortName}")
+      boundary.onNext(e)
     }
-    // can't be final because of SI-4440
-    case class OnNext(shell: GraphInterpreterShell, e: Any) extends 
SimpleBoundaryEvent {
-      override def execute(): Unit = {
-        if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e 
port=$internalPortName")
-        BatchingActorInputBoundary.this.onNext(e)
-      }
 
-      override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+    override def shell: GraphInterpreterShell = boundary.shell
+    override def logic: GraphStageLogic = boundary
+    override def cancel(): Unit = ()
+  }
 
-      override def cancel(): Unit = ()
+  final class OnSubscribe(boundary: BatchingActorInputBoundary, subscription: 
Subscription)
+      extends SimpleBoundaryEvent {
+    override def execute(): Unit = {
+      if (GraphInterpreter.Debug)
+        println(s"${boundary.shell.interpreter.Name}  onSubscribe 
port=${boundary.internalPortName}")
+      boundary.shell.subscribeArrived()
+      boundary.onSubscribe(subscription)
     }
-    // can't be final because of SI-4440
-    case class OnSubscribe(shell: GraphInterpreterShell, subscription: 
Subscription) extends SimpleBoundaryEvent {
-      override def execute(): Unit = {
-        if (GraphInterpreter.Debug) println(s"${interpreter.Name}  onSubscribe 
port=$internalPortName")
-        shell.subscribeArrived()
-        BatchingActorInputBoundary.this.onSubscribe(subscription)
-      }
 
-      override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+    override def shell: GraphInterpreterShell = boundary.shell
+    override def logic: GraphStageLogic = boundary
+    override def cancel(): Unit = ()
+  }
 
-      override def cancel(): Unit = ()
-    }
+  @InternalStableApi
+  class BatchingActorInputBoundary(
+      size: Int,
+      val shell: GraphInterpreterShell,
+      publisher: Publisher[Any],
+      val internalPortName: String)
+      extends UpstreamBoundaryStageLogic[Any]
+      with OutHandler {
 
     if (size <= 0) throw new IllegalArgumentException("buffer size cannot be 
zero")
     if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer 
size must be a power of two")
@@ -158,21 +162,21 @@ import org.reactivestreams.Subscription
       publisher.subscribe(new Subscriber[Any] {
         override def onError(t: Throwable): Unit = {
           ReactiveStreamsCompliance.requireNonNullException(t)
-          actor ! OnError(shell, t)
+          actor ! new OnError(BatchingActorInputBoundary.this, t)
         }
 
         override def onSubscribe(s: Subscription): Unit = {
           ReactiveStreamsCompliance.requireNonNullSubscription(s)
-          actor ! OnSubscribe(shell, s)
+          actor ! new OnSubscribe(BatchingActorInputBoundary.this, s)
         }
 
         override def onComplete(): Unit = {
-          actor ! OnComplete(shell)
+          actor ! new OnComplete(BatchingActorInputBoundary.this)
         }
 
         override def onNext(t: Any): Unit = {
           ReactiveStreamsCompliance.requireNonNullElement(t)
-          actor ! OnNext(shell, t)
+          actor ! new OnNext(BatchingActorInputBoundary.this, t)
         }
       })
     }
@@ -286,7 +290,7 @@ import org.reactivestreams.Subscription
       s"BatchingActorInputBoundary(forPort=$internalPortName, 
fill=$inputBufferElements/$size, completed=$upstreamCompleted, 
canceled=$downstreamCanceled)"
   }
 
-  final case class SubscribePending(boundary: ActorOutputBoundary) extends 
SimpleBoundaryEvent {
+  final class SubscribePending(boundary: ActorOutputBoundary) extends 
SimpleBoundaryEvent {
     override def execute(): Unit = boundary.subscribePending()
 
     override def shell: GraphInterpreterShell = boundary.shell
@@ -296,7 +300,7 @@ import org.reactivestreams.Subscription
     override def cancel(): Unit = ()
   }
 
-  final case class RequestMore(boundary: ActorOutputBoundary, demand: Long) 
extends SimpleBoundaryEvent {
+  final class RequestMore(boundary: ActorOutputBoundary, demand: Long) extends 
SimpleBoundaryEvent {
     override def execute(): Unit = {
       if (GraphInterpreter.Debug)
         println(s"${boundary.shell.interpreter.Name}  request  $demand 
port=${boundary.internalPortName}")
@@ -306,7 +310,7 @@ import org.reactivestreams.Subscription
     override def logic: GraphStageLogic = boundary
     override def cancel(): Unit = ()
   }
-  final case class Cancel(boundary: ActorOutputBoundary, cause: Throwable) 
extends SimpleBoundaryEvent {
+  final class Cancel(boundary: ActorOutputBoundary, cause: Throwable) extends 
SimpleBoundaryEvent {
     override def execute(): Unit = {
       if (GraphInterpreter.Debug)
         println(
@@ -330,7 +334,7 @@ import org.reactivestreams.Subscription
     // the shutdown method. Subscription attempts after shutdown can be denied 
immediately.
     private val pendingSubscribers = new 
AtomicReference[immutable.Seq[Subscriber[Any]]](Nil)
 
-    protected val wakeUpMsg: Any = SubscribePending(boundary)
+    protected val wakeUpMsg: Any = new SubscribePending(boundary)
 
     override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = {
       requireNonNullSubscriber(subscriber)
@@ -457,8 +461,8 @@ import org.reactivestreams.Subscription
         if (subscriber eq null) {
           subscriber = sub
           val subscription = new Subscription with 
SubscriptionWithCancelException {
-            override def request(elements: Long): Unit = actor ! 
RequestMore(ActorOutputBoundary.this, elements)
-            override def cancel(cause: Throwable): Unit = actor ! 
Cancel(ActorOutputBoundary.this, cause)
+            override def request(elements: Long): Unit = actor ! new 
RequestMore(ActorOutputBoundary.this, elements)
+            override def cancel(cause: Throwable): Unit = actor ! new 
Cancel(ActorOutputBoundary.this, cause)
 
             override def toString = s"BoundarySubscription[$actor, 
$internalPortName]"
           }
@@ -498,24 +502,15 @@ import org.reactivestreams.Subscription
 /**
  * INTERNAL API
  */
-@InternalApi private[pekko] final class GraphInterpreterShell(
-    var connections: Array[Connection],
-    var logics: Array[GraphStageLogic],
-    val attributes: Attributes,
-    val mat: ExtendedActorMaterializer) {
-
-  import ActorGraphInterpreter._
-
-  private var self: ActorRef = _
-  lazy val log = Logging(mat.system.eventStream, self)
+@InternalApi private[pekko] object GraphInterpreterShell {
+  import ActorGraphInterpreter.BoundaryEvent
 
   /**
    * @param promise Will be completed upon processing the event, or failed if 
processing the event throws
    *                if the event isn't ever processed the promise (the 
operator stops) is failed elsewhere
    */
-  // can't be final because of SI-4440
-  case class AsyncInput(
-      shell: GraphInterpreterShell,
+  final class AsyncInput(
+      override val shell: GraphInterpreterShell,
       logic: GraphStageLogic,
       evt: Any,
       promise: Promise[Done],
@@ -524,12 +519,12 @@ import org.reactivestreams.Subscription
 
     @InternalStableApi
     override def execute(eventLimit: Int): Int = {
-      if (!waitingForShutdown) {
-        interpreter.runAsyncInput(logic, evt, promise, handler)
-        if (eventLimit == 1 && interpreter.isSuspended) {
-          sendResume(true)
+      if (!shell.waitingForShutdown) {
+        shell.interpreter.runAsyncInput(logic, evt, promise, handler)
+        if (eventLimit == 1 && shell.interpreter.isSuspended) {
+          shell.sendResume(true)
           0
-        } else runBatch(eventLimit - 1)
+        } else shell.runBatch(eventLimit - 1)
       } else {
         eventLimit
       }
@@ -538,40 +533,51 @@ import org.reactivestreams.Subscription
     override def cancel(): Unit = ()
   }
 
-  // can't be final because of SI-4440
-  case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent {
+  final class ResumeShell(override val shell: GraphInterpreterShell) extends 
BoundaryEvent {
     override def execute(eventLimit: Int): Int =
-      if (!waitingForShutdown) {
-        if (GraphInterpreter.Debug) println(s"${interpreter.Name}  resume")
-        if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit
+      if (!shell.waitingForShutdown) {
+        if (GraphInterpreter.Debug) println(s"${shell.interpreter.Name}  
resume")
+        if (shell.interpreter.isSuspended) shell.runBatch(eventLimit) else 
eventLimit
       } else eventLimit
 
     override def cancel(): Unit = ()
   }
 
-  // can't be final because of SI-4440
-  case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent {
+  final class Abort(override val shell: GraphInterpreterShell) extends 
BoundaryEvent {
     override def execute(eventLimit: Int): Int = {
-      if (waitingForShutdown) {
-        subscribesPending = 0
-        val subscriptionTimeout = 
attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
-        tryAbort(
+      if (shell.waitingForShutdown) {
+        shell.subscribesPending = 0
+        val subscriptionTimeout = 
shell.attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
+        shell.tryAbort(
           new TimeoutException(
             "Streaming actor has been already stopped processing (normally), 
but not all of its " +
-            s"inputs or outputs have been subscribed in 
[$subscriptionTimeout}]. Aborting actor now."))
+            s"inputs or outputs have been subscribed in 
[$subscriptionTimeout]. Aborting actor now."))
       }
       0
     }
 
     override def cancel(): Unit = ()
   }
+}
+
+@InternalApi private[pekko] final class GraphInterpreterShell(
+    var connections: Array[Connection],
+    var logics: Array[GraphStageLogic],
+    val attributes: Attributes,
+    val mat: ExtendedActorMaterializer) {
+
+  import ActorGraphInterpreter._
+  import GraphInterpreterShell._
+
+  private var self: ActorRef = _
+  lazy val log = Logging(mat.system.eventStream, self)
 
   private var enqueueToShortCircuit: (Any) => Unit = _
 
   lazy val interpreter: GraphInterpreter =
     new GraphInterpreter(mat, log, logics, connections,
       (logic, event, promise, handler) => {
-        val asyncInput = AsyncInput(this, logic, event, promise, handler)
+        val asyncInput = new AsyncInput(this, logic, event, promise, handler)
         val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
         if ((currentInterpreter eq null) || (currentInterpreter.context ne 
self))
           self ! asyncInput
@@ -648,7 +654,7 @@ import org.reactivestreams.Subscription
 
   private var waitingForShutdown: Boolean = false
 
-  val resume = ResumeShell(this)
+  val resume = new ResumeShell(this)
 
   def sendResume(sendResume: Boolean): Unit = {
     resumeScheduled = true
@@ -666,7 +672,7 @@ import org.reactivestreams.Subscription
         else {
           waitingForShutdown = true
           val subscriptionTimeout = 
attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
-          mat.scheduleOnce(subscriptionTimeout, () => self ! 
Abort(GraphInterpreterShell.this))
+          mat.scheduleOnce(subscriptionTimeout, () => self ! new 
Abort(GraphInterpreterShell.this))
         }
       } else if (interpreter.isSuspended && !resumeScheduled) 
sendResume(!usingShellLimit)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to