This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 973446d1c7 Optimize stream boundary event allocation (#2916)
973446d1c7 is described below
commit 973446d1c759c371c406a3427d81bfff96f2557f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 18:02:52 2026 +0800
Optimize stream boundary event allocation (#2916)
* Optimize stream boundary event allocation
* fix for stray } in exception message
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---------
Co-authored-by: PJ Fanning <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../ActorGraphInterpreterBoundaryBenchmark.scala | 186 +++++++++++++++++++++
.../pr-2916-boundary-event-allocation.excludes | 30 ++++
.../stream/impl/fusing/ActorGraphInterpreter.scala | 182 ++++++++++----------
3 files changed, 310 insertions(+), 88 deletions(-)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/ActorGraphInterpreterBoundaryBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/ActorGraphInterpreterBoundaryBenchmark.scala
new file mode 100644
index 0000000000..d737283cc4
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/ActorGraphInterpreterBoundaryBenchmark.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.annotations.OperationsPerInvocation
+import org.openjdk.jmh.infra.Blackhole
+import org.reactivestreams.Publisher
+import org.reactivestreams.Subscriber
+import org.reactivestreams.Subscription
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.stream.scaladsl.Keep
+import pekko.stream.scaladsl.RunnableGraph
+import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+
+import com.typesafe.config.ConfigFactory
+
+object ActorGraphInterpreterBoundaryBenchmark {
+ final val ElementCount = 100 * 1000
+ final val CancelAfter = ElementCount / 2
+
+ final class SynchronousPublisher(elements: Array[MutableElement]) extends
Publisher[MutableElement] {
+ override def subscribe(subscriber: Subscriber[_ >: MutableElement]): Unit
= {
+ if (subscriber eq null) throw new NullPointerException("subscriber")
+
+ subscriber.onSubscribe(new Subscription {
+ private[this] var cancelled = false
+ private[this] var index = 0
+
+ override def request(n: Long): Unit =
+ if (!cancelled) {
+ if (n <= 0) {
+ cancelled = true
+ subscriber.onError(new IllegalArgumentException("non-positive
subscription request"))
+ } else {
+ var remaining = n
+ while (remaining > 0 && index < elements.length && !cancelled) {
+ subscriber.onNext(elements(index))
+ index += 1
+ remaining -= 1
+ }
+ if (index == elements.length && !cancelled) {
+ cancelled = true
+ subscriber.onComplete()
+ }
+ }
+ }
+
+ override def cancel(): Unit = cancelled = true
+ })
+ }
+ }
+
+ final class RequestOneSubscriber(
+ blackhole: Blackhole,
+ latch: CountDownLatch,
+ cancelAfter: Int)
+ extends Subscriber[MutableElement] {
+ private[this] var subscription: Subscription = _
+ private[this] var seen = 0
+ private[this] val failure = new AtomicReference[Throwable]
+
+ override def onSubscribe(subscription: Subscription): Unit = {
+ this.subscription = subscription
+ subscription.request(1)
+ }
+
+ override def onNext(element: MutableElement): Unit = {
+ blackhole.consume(element.value)
+ seen += 1
+ if (cancelAfter > 0 && seen == cancelAfter) {
+ subscription.cancel()
+ latch.countDown()
+ } else {
+ subscription.request(1)
+ }
+ }
+
+ override def onError(cause: Throwable): Unit = {
+ failure.set(cause)
+ latch.countDown()
+ }
+
+ override def onComplete(): Unit = latch.countDown()
+
+ def awaitExpected(expected: Int): Unit = {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ throw new RuntimeException("Latch timed out")
+
+ val cause = failure.get()
+ if (cause ne null) throw new RuntimeException("Subscriber failed", cause)
+ if (seen < expected) throw new RuntimeException(s"Expected at least
[$expected] elements but got [$seen]")
+ }
+ }
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class ActorGraphInterpreterBoundaryBenchmark {
+ import ActorGraphInterpreterBoundaryBenchmark._
+
+ implicit val system: ActorSystem = ActorSystem(
+ "test",
+ ConfigFactory.parseString(s"""
+ pekko.stream.materializer.sync-processing-limit = ${Int.MaxValue}
+ """))
+
+ private var publisherSource: RunnableGraph[CountDownLatch] = _
+ private var publisherSink: RunnableGraph[Publisher[MutableElement]] = _
+
+ @Setup
+ def setup(): Unit = {
+ SystemMaterializer(system).materializer
+ val testElements = Array.fill(ElementCount)(new MutableElement(1))
+ val testSource = Source.fromGraph(new TestSource(testElements))
+
+ publisherSource =
+ Source
+ .fromPublisher(new SynchronousPublisher(testElements))
+ .toMat(Sink.fromGraph(new JitSafeCompletionLatch))(Keep.right)
+ publisherSink =
+ testSource.toMat(Sink.asPublisher(false))(Keep.right)
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(ElementCount)
+ def external_publisher_to_sink(blackhole: Blackhole): Unit = {
+ FusedGraphsBenchmark.blackhole = blackhole
+ val latch = publisherSource.run()
+ if (!latch.await(10, TimeUnit.SECONDS))
+ throw new RuntimeException("Latch timed out")
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(ElementCount)
+ def source_to_external_subscriber_request_one(blackhole: Blackhole): Unit = {
+ val latch = new CountDownLatch(1)
+ val subscriber = new RequestOneSubscriber(blackhole, latch, cancelAfter =
0)
+
+ publisherSink.run().subscribe(subscriber)
+ subscriber.awaitExpected(ElementCount)
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(CancelAfter)
+ def source_to_external_subscriber_cancel_halfway(blackhole: Blackhole): Unit
= {
+ val latch = new CountDownLatch(1)
+ val subscriber = new RequestOneSubscriber(blackhole, latch, cancelAfter =
CancelAfter)
+
+ publisherSink.run().subscribe(subscriber)
+ subscriber.awaitExpected(CancelAfter)
+ }
+
+ @TearDown
+ def shutdown(): Unit = {
+ Await.result(system.terminate(), 5.seconds)
+ }
+}
diff --git
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes
new file mode 100644
index 0000000000..d8a5fbb51a
--- /dev/null
+++
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Optimize internal boundary event allocation
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BoundaryEvent*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*SimpleBoundaryEvent*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnError*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnComplete*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnNext*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*BatchingActorInputBoundary*OnSubscribe*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*Cancel*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*RequestMore*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter*SubscribePending*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*Abort*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*AsyncInput*")
+ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*ResumeShell*")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
index c1442bccc8..39e16aa9c3 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala
@@ -53,7 +53,7 @@ import org.reactivestreams.Subscription
object Resume extends DeadLetterSuppression with
NoSerializationVerificationNeeded
object Snapshot extends NoSerializationVerificationNeeded
- trait BoundaryEvent extends DeadLetterSuppression with
NoSerializationVerificationNeeded {
+ sealed abstract class BoundaryEvent extends DeadLetterSuppression with
NoSerializationVerificationNeeded {
def shell: GraphInterpreterShell
@InternalStableApi
@@ -63,7 +63,7 @@ import org.reactivestreams.Subscription
def cancel(): Unit
}
- trait SimpleBoundaryEvent extends BoundaryEvent {
+ sealed abstract class SimpleBoundaryEvent extends BoundaryEvent {
final override def execute(eventLimit: Int): Int = {
val wasNotShutdown = !shell.interpreter.isStageCompleted(logic)
execute()
@@ -79,60 +79,64 @@ import org.reactivestreams.Subscription
def props(shell: GraphInterpreterShell): Props =
Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local)
- @InternalStableApi
- class BatchingActorInputBoundary(
- size: Int,
- shell: GraphInterpreterShell,
- publisher: Publisher[Any],
- internalPortName: String)
- extends UpstreamBoundaryStageLogic[Any]
- with OutHandler {
-
- // can't be final because of SI-4440
- case class OnError(shell: GraphInterpreterShell, cause: Throwable) extends
SimpleBoundaryEvent {
- override def execute(): Unit = {
- if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError
port=$internalPortName")
- BatchingActorInputBoundary.this.onError(cause)
- }
+ final class OnError(boundary: BatchingActorInputBoundary, cause: Throwable)
extends SimpleBoundaryEvent {
+ override def execute(): Unit = {
+ if (GraphInterpreter.Debug)
+ println(s"${boundary.shell.interpreter.Name} onError
port=${boundary.internalPortName}")
+ boundary.onError(cause)
+ }
- override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+ override def shell: GraphInterpreterShell = boundary.shell
+ override def logic: GraphStageLogic = boundary
+ override def cancel(): Unit = ()
+ }
- override def cancel(): Unit = ()
+ final class OnComplete(boundary: BatchingActorInputBoundary) extends
SimpleBoundaryEvent {
+ override def execute(): Unit = {
+ if (GraphInterpreter.Debug)
+ println(s"${boundary.shell.interpreter.Name} onComplete
port=${boundary.internalPortName}")
+ boundary.onComplete()
}
- // can't be final because of SI-4440
- case class OnComplete(shell: GraphInterpreterShell) extends
SimpleBoundaryEvent {
- override def execute(): Unit = {
- if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete
port=$internalPortName")
- BatchingActorInputBoundary.this.onComplete()
- }
- override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+ override def shell: GraphInterpreterShell = boundary.shell
+ override def logic: GraphStageLogic = boundary
+ override def cancel(): Unit = ()
+ }
- override def cancel(): Unit = ()
+ final class OnNext(boundary: BatchingActorInputBoundary, e: Any) extends
SimpleBoundaryEvent {
+ override def execute(): Unit = {
+ if (GraphInterpreter.Debug)
+ println(s"${boundary.shell.interpreter.Name} onNext $e
port=${boundary.internalPortName}")
+ boundary.onNext(e)
}
- // can't be final because of SI-4440
- case class OnNext(shell: GraphInterpreterShell, e: Any) extends
SimpleBoundaryEvent {
- override def execute(): Unit = {
- if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e
port=$internalPortName")
- BatchingActorInputBoundary.this.onNext(e)
- }
- override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+ override def shell: GraphInterpreterShell = boundary.shell
+ override def logic: GraphStageLogic = boundary
+ override def cancel(): Unit = ()
+ }
- override def cancel(): Unit = ()
+ final class OnSubscribe(boundary: BatchingActorInputBoundary, subscription:
Subscription)
+ extends SimpleBoundaryEvent {
+ override def execute(): Unit = {
+ if (GraphInterpreter.Debug)
+ println(s"${boundary.shell.interpreter.Name} onSubscribe
port=${boundary.internalPortName}")
+ boundary.shell.subscribeArrived()
+ boundary.onSubscribe(subscription)
}
- // can't be final because of SI-4440
- case class OnSubscribe(shell: GraphInterpreterShell, subscription:
Subscription) extends SimpleBoundaryEvent {
- override def execute(): Unit = {
- if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe
port=$internalPortName")
- shell.subscribeArrived()
- BatchingActorInputBoundary.this.onSubscribe(subscription)
- }
- override def logic: GraphStageLogic = BatchingActorInputBoundary.this
+ override def shell: GraphInterpreterShell = boundary.shell
+ override def logic: GraphStageLogic = boundary
+ override def cancel(): Unit = ()
+ }
- override def cancel(): Unit = ()
- }
+ @InternalStableApi
+ class BatchingActorInputBoundary(
+ size: Int,
+ val shell: GraphInterpreterShell,
+ publisher: Publisher[Any],
+ val internalPortName: String)
+ extends UpstreamBoundaryStageLogic[Any]
+ with OutHandler {
if (size <= 0) throw new IllegalArgumentException("buffer size cannot be
zero")
if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer
size must be a power of two")
@@ -158,21 +162,21 @@ import org.reactivestreams.Subscription
publisher.subscribe(new Subscriber[Any] {
override def onError(t: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(t)
- actor ! OnError(shell, t)
+ actor ! new OnError(BatchingActorInputBoundary.this, t)
}
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
- actor ! OnSubscribe(shell, s)
+ actor ! new OnSubscribe(BatchingActorInputBoundary.this, s)
}
override def onComplete(): Unit = {
- actor ! OnComplete(shell)
+ actor ! new OnComplete(BatchingActorInputBoundary.this)
}
override def onNext(t: Any): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(t)
- actor ! OnNext(shell, t)
+ actor ! new OnNext(BatchingActorInputBoundary.this, t)
}
})
}
@@ -286,7 +290,7 @@ import org.reactivestreams.Subscription
s"BatchingActorInputBoundary(forPort=$internalPortName,
fill=$inputBufferElements/$size, completed=$upstreamCompleted,
canceled=$downstreamCanceled)"
}
- final case class SubscribePending(boundary: ActorOutputBoundary) extends
SimpleBoundaryEvent {
+ final class SubscribePending(boundary: ActorOutputBoundary) extends
SimpleBoundaryEvent {
override def execute(): Unit = boundary.subscribePending()
override def shell: GraphInterpreterShell = boundary.shell
@@ -296,7 +300,7 @@ import org.reactivestreams.Subscription
override def cancel(): Unit = ()
}
- final case class RequestMore(boundary: ActorOutputBoundary, demand: Long)
extends SimpleBoundaryEvent {
+ final class RequestMore(boundary: ActorOutputBoundary, demand: Long) extends
SimpleBoundaryEvent {
override def execute(): Unit = {
if (GraphInterpreter.Debug)
println(s"${boundary.shell.interpreter.Name} request $demand
port=${boundary.internalPortName}")
@@ -306,7 +310,7 @@ import org.reactivestreams.Subscription
override def logic: GraphStageLogic = boundary
override def cancel(): Unit = ()
}
- final case class Cancel(boundary: ActorOutputBoundary, cause: Throwable)
extends SimpleBoundaryEvent {
+ final class Cancel(boundary: ActorOutputBoundary, cause: Throwable) extends
SimpleBoundaryEvent {
override def execute(): Unit = {
if (GraphInterpreter.Debug)
println(
@@ -330,7 +334,7 @@ import org.reactivestreams.Subscription
// the shutdown method. Subscription attempts after shutdown can be denied
immediately.
private val pendingSubscribers = new
AtomicReference[immutable.Seq[Subscriber[Any]]](Nil)
- protected val wakeUpMsg: Any = SubscribePending(boundary)
+ protected val wakeUpMsg: Any = new SubscribePending(boundary)
override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = {
requireNonNullSubscriber(subscriber)
@@ -457,8 +461,8 @@ import org.reactivestreams.Subscription
if (subscriber eq null) {
subscriber = sub
val subscription = new Subscription with
SubscriptionWithCancelException {
- override def request(elements: Long): Unit = actor !
RequestMore(ActorOutputBoundary.this, elements)
- override def cancel(cause: Throwable): Unit = actor !
Cancel(ActorOutputBoundary.this, cause)
+ override def request(elements: Long): Unit = actor ! new
RequestMore(ActorOutputBoundary.this, elements)
+ override def cancel(cause: Throwable): Unit = actor ! new
Cancel(ActorOutputBoundary.this, cause)
override def toString = s"BoundarySubscription[$actor,
$internalPortName]"
}
@@ -498,24 +502,15 @@ import org.reactivestreams.Subscription
/**
* INTERNAL API
*/
-@InternalApi private[pekko] final class GraphInterpreterShell(
- var connections: Array[Connection],
- var logics: Array[GraphStageLogic],
- val attributes: Attributes,
- val mat: ExtendedActorMaterializer) {
-
- import ActorGraphInterpreter._
-
- private var self: ActorRef = _
- lazy val log = Logging(mat.system.eventStream, self)
+@InternalApi private[pekko] object GraphInterpreterShell {
+ import ActorGraphInterpreter.BoundaryEvent
/**
* @param promise Will be completed upon processing the event, or failed if
processing the event throws
* if the event isn't ever processed the promise (the
operator stops) is failed elsewhere
*/
- // can't be final because of SI-4440
- case class AsyncInput(
- shell: GraphInterpreterShell,
+ final class AsyncInput(
+ override val shell: GraphInterpreterShell,
logic: GraphStageLogic,
evt: Any,
promise: Promise[Done],
@@ -524,12 +519,12 @@ import org.reactivestreams.Subscription
@InternalStableApi
override def execute(eventLimit: Int): Int = {
- if (!waitingForShutdown) {
- interpreter.runAsyncInput(logic, evt, promise, handler)
- if (eventLimit == 1 && interpreter.isSuspended) {
- sendResume(true)
+ if (!shell.waitingForShutdown) {
+ shell.interpreter.runAsyncInput(logic, evt, promise, handler)
+ if (eventLimit == 1 && shell.interpreter.isSuspended) {
+ shell.sendResume(true)
0
- } else runBatch(eventLimit - 1)
+ } else shell.runBatch(eventLimit - 1)
} else {
eventLimit
}
@@ -538,40 +533,51 @@ import org.reactivestreams.Subscription
override def cancel(): Unit = ()
}
- // can't be final because of SI-4440
- case class ResumeShell(shell: GraphInterpreterShell) extends BoundaryEvent {
+ final class ResumeShell(override val shell: GraphInterpreterShell) extends
BoundaryEvent {
override def execute(eventLimit: Int): Int =
- if (!waitingForShutdown) {
- if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume")
- if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit
+ if (!shell.waitingForShutdown) {
+ if (GraphInterpreter.Debug) println(s"${shell.interpreter.Name}
resume")
+ if (shell.interpreter.isSuspended) shell.runBatch(eventLimit) else
eventLimit
} else eventLimit
override def cancel(): Unit = ()
}
- // can't be final because of SI-4440
- case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent {
+ final class Abort(override val shell: GraphInterpreterShell) extends
BoundaryEvent {
override def execute(eventLimit: Int): Int = {
- if (waitingForShutdown) {
- subscribesPending = 0
- val subscriptionTimeout =
attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
- tryAbort(
+ if (shell.waitingForShutdown) {
+ shell.subscribesPending = 0
+ val subscriptionTimeout =
shell.attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
+ shell.tryAbort(
new TimeoutException(
"Streaming actor has been already stopped processing (normally),
but not all of its " +
- s"inputs or outputs have been subscribed in
[$subscriptionTimeout}]. Aborting actor now."))
+ s"inputs or outputs have been subscribed in
[$subscriptionTimeout]. Aborting actor now."))
}
0
}
override def cancel(): Unit = ()
}
+}
+
+@InternalApi private[pekko] final class GraphInterpreterShell(
+ var connections: Array[Connection],
+ var logics: Array[GraphStageLogic],
+ val attributes: Attributes,
+ val mat: ExtendedActorMaterializer) {
+
+ import ActorGraphInterpreter._
+ import GraphInterpreterShell._
+
+ private var self: ActorRef = _
+ lazy val log = Logging(mat.system.eventStream, self)
private var enqueueToShortCircuit: (Any) => Unit = _
lazy val interpreter: GraphInterpreter =
new GraphInterpreter(mat, log, logics, connections,
(logic, event, promise, handler) => {
- val asyncInput = AsyncInput(this, logic, event, promise, handler)
+ val asyncInput = new AsyncInput(this, logic, event, promise, handler)
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
if ((currentInterpreter eq null) || (currentInterpreter.context ne
self))
self ! asyncInput
@@ -648,7 +654,7 @@ import org.reactivestreams.Subscription
private var waitingForShutdown: Boolean = false
- val resume = ResumeShell(this)
+ val resume = new ResumeShell(this)
def sendResume(sendResume: Boolean): Unit = {
resumeScheduled = true
@@ -666,7 +672,7 @@ import org.reactivestreams.Subscription
else {
waitingForShutdown = true
val subscriptionTimeout =
attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
- mat.scheduleOnce(subscriptionTimeout, () => self !
Abort(GraphInterpreterShell.this))
+ mat.scheduleOnce(subscriptionTimeout, () => self ! new
Abort(GraphInterpreterShell.this))
}
} else if (interpreter.isSuspended && !resumeScheduled)
sendResume(!usingShellLimit)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]