This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch optimize/stream-materializer-wiring in repository https://gitbox.apache.org/repos/asf/pekko.git
commit e9bac4d88380e87ee658bef67fb62b825fb938d7 Author: He-Pin <[email protected]> AuthorDate: Sun Jun 14 04:01:00 2026 +0800 perf: optimize stream materializer wiring with HashMap and ArrayList replacements Motivation: The PhasedFusingActorMaterializer uses an ArrayList with O(n) linear scan to resolve forward wires during graph materialization, and GraphStageIsland uses a Scala List for outConnections which allocates cons cells on every addition. Both create unnecessary overhead for complex graphs. Modification: - Replace forwardWires ArrayList with HashMap<Integer, ForwardWire> keyed by toGlobalOffset, giving O(1) lookup and removal in wireIn/wireOut - Replace outConnections Scala List with ArrayList, eliminating cons cell allocation and enabling indexed iteration in onIslandReady - Add shared emptyOutSlots constant in TraversalBuilder to avoid allocating zero-length arrays for stages with no outlets - Add MaterializerWiringBenchmark (JMH) to measure materialization throughput for linear, broadcast-merge, and imported flow graph topologies - Add AsyncBoundaryThroughputBenchmark (JMH) to measure cross-island element throughput with varying numbers of async boundaries Result: Forward wire resolution drops from O(n) to O(1) per lookup, outConnections avoids Scala List overhead, and empty outlet stages reuse a shared array. New benchmarks provide regression coverage for these code paths. Tests: - sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec *TraversalBuilderSpec" - 30 tests passed - sbt "stream-tests / Test / testOnly *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 112 tests passed - sbt "bench-jmh / compile" - passed References: None - performance optimization identified through code review --- .../stream/AsyncBoundaryThroughputBenchmark.scala | 113 +++++++++++++++++++ .../pekko/stream/MaterializerWiringBenchmark.scala | 124 +++++++++++++++++++++ .../impl/PhasedFusingActorMaterializer.scala | 41 +++---- .../pekko/stream/impl/TraversalBuilder.scala | 4 +- 4 files changed, 256 insertions(+), 26 deletions(-) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala new file mode 100644 index 0000000000..dd0545c399 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.annotation.nowarn +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl._ +import pekko.stream.stage._ + +import com.typesafe.config.ConfigFactory + +object AsyncBoundaryThroughputBenchmark { + final val ElementCount = 100 * 1000 +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class AsyncBoundaryThroughputBenchmark { + + import AsyncBoundaryThroughputBenchmark._ + + val config = ConfigFactory.parseString(s""" + pekko.stream.materializer.sync-processing-limit = ${Int.MaxValue} + """) + + implicit val system: ActorSystem = ActorSystem("AsyncBoundaryThroughputBenchmark", config) + + @Param(Array("1", "3", "10")) + var asyncBoundaries = 0 + + var source: Source[Int, NotUsed] = _ + var flow: Flow[Int, Int, NotUsed] = _ + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + source = Source(1 to ElementCount) + var f: Flow[Int, Int, NotUsed] = Flow[Int] + for (_ <- 1 to asyncBoundaries) { + f = f.map(identity).async + } + flow = f + } + + @Benchmark + @OperationsPerInvocation(ElementCount) + def async_boundary_throughput(blackhole: Blackhole): CountDownLatch = { + FusedGraphsBenchmark.blackhole = blackhole + val latch = source + .via(flow) + .toMat(Sink.fromGraph(new JitSafeCompletionLatchInt))(Keep.right) + .run() + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch timed out") + latch + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } +} + +class JitSafeCompletionLatchInt extends GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] { + val in = Inlet[Int]("JitSafeCompletionLatchInt.in") + override val shape = SinkShape(in) + + @nowarn("cat=unused-params") + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, CountDownLatch) = { + val latch = new CountDownLatch(1) + val logic = new GraphStageLogic(shape) with InHandler { + private var count = 0 + + override def preStart(): Unit = pull(in) + override def onPush(): Unit = { + count += 1 + pull(in) + } + + override def onUpstreamFinish(): Unit = { + FusedGraphsBenchmark.blackhole.consume(count) + latch.countDown() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + latch.countDown() + throw ex + } + + setHandler(in, this) + } + (logic, latch) + } +} diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala new file mode 100644 index 0000000000..9b60428bf0 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream + +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.openjdk.jmh.annotations._ + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl._ + +object MaterializerWiringBenchmark { + + val linearFlowBuilder: Int => RunnableGraph[NotUsed] = numOfOperators => { + var source = Source.single(()) + for (_ <- 1 to numOfOperators) { + source = source.map(identity) + } + source.to(Sink.ignore) + } + + val broadcastMergeBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions => + RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val broadcast = b.add(Broadcast[Unit](numOfJunctions)) + var outlet = broadcast.out(0) + for (i <- 1 until numOfJunctions) { + val merge = b.add(Merge[Unit](2)) + outlet ~> merge + broadcast.out(i) ~> merge + outlet = merge.out + } + + Source.single(()) ~> broadcast + outlet ~> Sink.ignore + ClosedShape + }) + + val broadcastMergeImmediateBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions => + RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val broadcast = b.add(Broadcast[Unit](numOfJunctions)) + val merge = b.add(Merge[Unit](numOfJunctions)) + for (_ <- 0 until numOfJunctions) { + broadcast ~> merge + } + + Source.single(()) ~> broadcast + merge ~> Sink.ignore + ClosedShape + }) + + val importedFlowBuilder: Int => RunnableGraph[NotUsed] = numOfFlows => + RunnableGraph.fromGraph(GraphDSL.createGraph(Source.single(())) { implicit b => source => + import GraphDSL.Implicits._ + val flow = Flow[Unit].map(identity) + var out: Outlet[Unit] = source.out + for (_ <- 0 until numOfFlows) { + val flowShape = b.add(flow) + out ~> flowShape + out = flowShape.outlet + } + out ~> Sink.ignore + ClosedShape + }) +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class MaterializerWiringBenchmark { + + import MaterializerWiringBenchmark._ + + implicit val system: ActorSystem = ActorSystem("MaterializerWiringBenchmark") + + var linearFlow: RunnableGraph[NotUsed] = _ + var broadcastMerge: RunnableGraph[NotUsed] = _ + var broadcastMergeImmediate: RunnableGraph[NotUsed] = _ + var importedFlow: RunnableGraph[NotUsed] = _ + + @Param(Array("10", "50", "100", "200")) + var complexity = 0 + + @Setup + def setup(): Unit = { + SystemMaterializer(system).materializer + linearFlow = linearFlowBuilder(complexity) + broadcastMerge = broadcastMergeBuilder(complexity) + broadcastMergeImmediate = broadcastMergeImmediateBuilder(complexity) + importedFlow = importedFlowBuilder(complexity) + } + + @Benchmark + def linear(): NotUsed = linearFlow.run() + + @Benchmark + def broadcast_merge_gradual(): NotUsed = broadcastMerge.run() + + @Benchmark + def broadcast_merge_immediate(): NotUsed = broadcastMergeImmediate.run() + + @Benchmark + def imported_flow(): NotUsed = importedFlow.run() + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala index 29b6b740c3..e77cf528a3 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala @@ -198,7 +198,7 @@ private final case class SavedIslandData( private var segments: java.util.ArrayList[SegmentInfo] = _ private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = _ - private var forwardWires: java.util.ArrayList[ForwardWire] = _ + private var forwardWires: java.util.HashMap[java.lang.Integer, ForwardWire] = _ private var islandStateStack: java.util.ArrayList[SavedIslandData] = _ private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, attributes, materializer, nextIslandName()) @@ -277,22 +277,11 @@ private final case class SavedIslandData( currentPhase.assignPort(in, localInSlot, logic) // Check if there was any forward wiring that has this offset/slot as its target - // First try to find such wiring - var forwardWire: ForwardWire = null - if ((forwardWires ne null) && !forwardWires.isEmpty) { - var i = 0 - while (i < forwardWires.size()) { - forwardWire = forwardWires.get(i) - if (forwardWire.toGlobalOffset == currentGlobalOffset) { - if (Debug) println(s" there is a forward wire to this slot $forwardWire") - forwardWires.remove(i) - i = Int.MaxValue // Exit the loop - } else { - forwardWire = null // Didn't found it yet - i += 1 - } - } - } + val forwardWire: ForwardWire = + if (forwardWires ne null) forwardWires.remove(currentGlobalOffset) + else null + if ((forwardWire ne null) && Debug) + println(s" there is a forward wire to this slot $forwardWire") // If there is a forward wiring we need to resolve it if (forwardWire ne null) { @@ -362,7 +351,7 @@ private final case class SavedIslandData( // The forward wire tracking data structure is only allocated when needed. Many graphs have no forward wires // even though it might have islands. if (forwardWires eq null) { - forwardWires = new java.util.ArrayList[ForwardWire](8) + forwardWires = new java.util.HashMap[java.lang.Integer, ForwardWire](8) } val forwardWire = ForwardWire( @@ -373,7 +362,7 @@ private final case class SavedIslandData( currentPhase) if (Debug) println(s" wiring is forward, recording $forwardWire") - forwardWires.add(forwardWire) + forwardWires.put(absoluteOffset, forwardWire) } } @@ -702,7 +691,7 @@ private[pekko] object GraphStageIsland { private var connections = new Array[Connection](16) private var maxConnections = 0 - private var outConnections: List[Connection] = Nil + private var outConnections: java.util.ArrayList[Connection] = _ private var fullIslandName: OptionVal[String] = OptionVal.None val shell = new GraphInterpreterShell(connections = null, logics = null, effectiveAttributes, materializer) @@ -742,7 +731,8 @@ private[pekko] object GraphStageIsland { def outConn(): Connection = { val connection = new Connection(0, null, null, null, null) - outConnections ::= connection + if (outConnections eq null) outConnections = new util.ArrayList[Connection](4) + outConnections.add(connection) connection } @@ -800,14 +790,15 @@ private[pekko] object GraphStageIsland { override def onIslandReady(): Unit = { - val totalConnections = maxConnections + outConnections.size + 1 + val outConnSize = if (outConnections ne null) outConnections.size() else 0 + val totalConnections = maxConnections + outConnSize + 1 val finalConnections = java.util.Arrays.copyOf(connections, totalConnections) var i = maxConnections + 1 - var outConns = outConnections + var j = 0 while (i < totalConnections) { - val conn = outConns.head - outConns = outConns.tail + val conn = outConnections.get(j) + j += 1 if (conn.inHandler eq null) failOnMissingHandler(conn.inOwner) else if (conn.outHandler eq null) failOnMissingHandler(conn.outOwner) finalConnections(i) = conn diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala index 980d56c021..d4a7da2074 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala @@ -213,6 +213,8 @@ import pekko.util.OptionVal private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none) + private val emptyOutSlots: Array[Int] = Array.emptyIntArray + /** * INTERNAL API * @@ -269,7 +271,7 @@ import pekko.util.OptionVal val builder = if (module.shape.outlets.isEmpty) { val b = CompletedTraversalBuilder( - traversalSoFar = MaterializeAtomic(module, new Array[Int](module.shape.outlets.size)), + traversalSoFar = MaterializeAtomic(module, emptyOutSlots), inSlots = module.shape.inlets.size, inToOffset = module.shape.inlets.map(in => in -> in.id).toMap, Attributes.none) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
