This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 956d478919 test: fix InterpreterBenchmark so it produces trustworthy
numbers (#2985)
956d478919 is described below
commit 956d478919f7183fce7ca49f225934a9b6b443ad
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 17:14:39 2026 +0800
test: fix InterpreterBenchmark so it produces trustworthy numbers (#2985)
* test: stop leaking ActorSystem in InterpreterBenchmark per invocation
Motivation:
The previous shape `new GraphInterpreterSpecKit { new TestSetup { ... } }`
ran
inside @Benchmark, so each invocation built (and never tore down) a fresh
ActorSystem. Long iterations exhausted native threads and JMH reported empty
results once the JVM ran out of resources.
Modification:
Make the benchmark class itself extend GraphInterpreterSpecKit so JMH's
@State(Scope.Benchmark) lifecycle reuses one ActorSystem across all
invocations. Add @TearDown(Level.Trial) to terminate it cleanly.
Result:
The benchmark now runs to completion and produces stable numbers, which is a
prerequisite for measuring follow-up GraphInterpreter optimizations.
Tests:
sbt 'bench-jmh/compile'
* test: use per-instance IdentityStage in InterpreterBenchmark
Motivation:
GraphStages.identity is a singleton whose Inlet/Outlet shape is shared
across
every reference. Chaining N copies into the assembly (numberOfIds = 5/10)
collapses to a single shape and mis-wires the connections, which surfaced
as a
runtime "Cannot pull port twice" error spam during the benchmark and
produced
nonsense throughput numbers (5/10 stages reported as faster than 1).
Modification:
Define a local IdentityStage class with its own Inlet/Outlet per instance
and
use Vector.fill(numberOfIds)(new IdentityStage[Int]).
Result:
The benchmark wires N distinct stages and produces stable, monotonic numbers
(throughput decreases as numberOfIds grows, as expected).
Tests:
sbt 'bench-jmh/compile'
---
.../apache/pekko/stream/InterpreterBenchmark.scala | 69 ++++++++++++++--------
1 file changed, 45 insertions(+), 24 deletions(-)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
index 30724ab775..2045852a69 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
@@ -15,19 +15,20 @@ package org.apache.pekko.stream
import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import org.openjdk.jmh.annotations._
import org.apache.pekko
-import pekko.event._
import pekko.stream.impl.fusing.GraphInterpreter.{
DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import pekko.stream.impl.fusing.GraphInterpreterSpecKit
-import pekko.stream.impl.fusing.GraphStages
import pekko.stream.stage._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
-class InterpreterBenchmark {
+class InterpreterBenchmark extends GraphInterpreterSpecKit {
import InterpreterBenchmark._
// manual, and not via @Param, because we want @OperationsPerInvocation on
our tests
@@ -36,32 +37,59 @@ class InterpreterBenchmark {
@Param(Array("1", "5", "10"))
var numberOfIds: Int = 0
+ // Earlier this benchmark instantiated `new GraphInterpreterSpecKit` inside
@Benchmark, which
+ // created (and leaked) a fresh ActorSystem on every invocation and would
exhaust native threads
+ // on long runs. Extending the SpecKit means JMH's @State(Scope.Benchmark)
lifecycle reuses a
+ // single ActorSystem across all invocations.
+
+ @TearDown(Level.Trial)
+ def shutdown(): Unit = {
+ Await.result(system.terminate(), 10.seconds)
+ }
+
@Benchmark
@OperationsPerInvocation(100000)
def graph_interpreter_100k_elements(): Unit = {
- new GraphInterpreterSpecKit {
- new TestSetup {
- val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int])
- val source = new GraphDataSource("source", data100k)
- val sink = new GraphDataSink[Int]("sink", data100k.size)
-
- val b = builder(identities: _*).connect(source,
identities.head.in).connect(identities.last.out, sink)
+ new TestSetup {
+ val identities = Vector.fill(numberOfIds)(new IdentityStage[Int])
+ val source = new GraphDataSource("source", data100k)
+ val sink = new GraphDataSink[Int]("sink", data100k.size)
- // FIXME: This should not be here, this is pure setup overhead
- for (i <- 0 until identities.size - 1) {
- b.connect(identities(i).out, identities(i + 1).in)
- }
+ val b = builder(identities: _*).connect(source,
identities.head.in).connect(identities.last.out, sink)
- b.init()
- sink.requestOne()
- interpreter.execute(Int.MaxValue)
+ // FIXME: This should not be here, this is pure setup overhead
+ for (i <- 0 until identities.size - 1) {
+ b.connect(identities(i).out, identities(i + 1).in)
}
+
+ b.init()
+ sink.requestOne()
+ interpreter.execute(Int.MaxValue)
}
}
}
object InterpreterBenchmark {
+ /**
+ * Per-instance identity stage. Cannot reuse [[GraphStages.identity]]
because it is a singleton
+ * whose Inlet/Outlet shape is shared across all references — chaining N
copies of the singleton
+ * collapses to a single shape and mis-wires the assembly (manifests as
`Cannot pull port twice`).
+ */
+ final class IdentityStage[T] extends GraphStage[FlowShape[T, T]] {
+ val in = Inlet[T]("Identity.in")
+ val out = Outlet[T]("Identity.out")
+ override val shape: FlowShape[T, T] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic
=
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ override def onPush(): Unit = push(out, grab(in))
+ override def onPull(): Unit = pull(in)
+ setHandler(in, this)
+ setHandler(out, this)
+ }
+ }
+
case class GraphDataSource[T](override val toString: String, data:
Vector[T]) extends UpstreamBoundaryStageLogic[T] {
var idx: Int = 0
override val out: pekko.stream.Outlet[T] = Outlet[T]("out")
@@ -98,11 +126,4 @@ object InterpreterBenchmark {
def requestOne(): Unit = pull(in)
}
-
- val NoopBus = new LoggingBus {
- override def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
true
- override def publish(event: Event): Unit = ()
- override def unsubscribe(subscriber: Subscriber, from: Classifier):
Boolean = true
- override def unsubscribe(subscriber: Subscriber): Unit = ()
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]