This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 03ebaf59bb perf: optimize stream materializer wiring with HashMap and
ArrayList replacements (#3062)
03ebaf59bb is described below
commit 03ebaf59bb042fb866c1456204104c611fd32e32
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Jun 14 19:17:11 2026 +0800
perf: optimize stream materializer wiring with HashMap and ArrayList
replacements (#3062)
* perf: optimize stream materializer wiring with HashMap and ArrayList
replacements
Motivation:
The PhasedFusingActorMaterializer uses an ArrayList with O(n) linear scan
to resolve forward wires during graph materialization, and GraphStageIsland
uses a Scala List for outConnections which allocates cons cells on every
addition. Both create unnecessary overhead for complex graphs.
Modification:
- Replace forwardWires ArrayList with HashMap<Integer, ForwardWire> keyed
by toGlobalOffset, giving O(1) lookup and removal in wireIn/wireOut
- Replace outConnections Scala List with ArrayList, eliminating cons cell
allocation and enabling indexed iteration in onIslandReady
- Add shared emptyOutSlots constant in TraversalBuilder to avoid allocating
zero-length arrays for stages with no outlets
- Add MaterializerWiringBenchmark (JMH) to measure materialization
throughput
for linear, broadcast-merge, and imported flow graph topologies
- Add AsyncBoundaryThroughputBenchmark (JMH) to measure cross-island element
throughput with varying numbers of async boundaries
Result:
Forward wire resolution drops from O(n) to O(1) per lookup, outConnections
avoids Scala List overhead, and empty outlet stages reuse a shared array.
New benchmarks provide regression coverage for these code paths.
Tests:
- sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec
*TraversalBuilderSpec" - 30 tests passed
- sbt "stream-tests / Test / testOnly *GraphInterpreterSpec
*ActorGraphInterpreterSpec" - 112 tests passed
- sbt "bench-jmh / compile" - passed
References:
None - performance optimization identified through code review
* fix: replace HashMap with sorted primitive arrays, fix benchmark headers
Motivation:
java.util.HashMap[java.lang.Integer, ForwardWire] introduces autoboxing
on every put/remove. Benchmark files had incorrect license headers
(Akka-derived header on new code).
Modification:
- Replace forwardWires HashMap with sorted parallel arrays
(Array[Int] keys + Array[ForwardWire] values). Lookup uses
binary search, insert maintains sorted order, removal compacts
via arraycopy. Zero boxing, zero per-op allocation.
- Fix benchmark file headers to use standard Apache 2.0 license
(new files should not have "derived from Akka" line).
- Update MaterializerWiringBenchmark @Param to include higher
complexity values (500, 1000) where forward wire optimization
shows measurable improvement.
Result:
At complexity=500, broadcast_merge_gradual materialization is
2.36x faster (4.027ms → 1.707ms). At complexity=1000, 2.13x faster
(5.441ms → 2.554ms). O(n) per-lookup becomes O(log n).
Tests:
- sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec
*TraversalBuilderSpec" - 134 passed
- sbt "stream-tests / Test / testOnly *GraphInterpreterSpec
*ActorGraphInterpreterSpec" - 22 passed
- JMH MaterializerWiringBenchmark: 2.36x at N=500, 2.13x at N=1000
References:
PR #3062 review feedback
* fix: clear dangling reference after forward wire removal
Motivation:
After removing a forward wire from the sorted arrays, the trailing
slot in forwardWireValues still held a reference to the consumed
ForwardWire object, preventing garbage collection.
Modification:
Set forwardWireValues(forwardWireCount) = null after removal and
array compaction.
Result:
Consumed ForwardWire objects become eligible for GC immediately
instead of lingering until the array is reused or discarded.
Tests:
- sbt "stream-tests / Test / testOnly *MaterializerSpec *PhasedFusingSpec
*TraversalBuilderSpec *GraphInterpreterSpec *ActorGraphInterpreterSpec" - 156
passed
References:
PR #3062 code review
* fix: improve GC friendliness of forwardWires and outConnections
Motivation:
After forward wires are consumed, the arrays and their entries
retained references preventing GC. Similarly, outConnections
ArrayList held references after being copied to finalConnections.
Modification:
- Null out forwardWireValues entries after removal to release
consumed ForwardWire objects immediately
- Release forwardWireKeys/Values arrays when forwardWireCount
reaches 0, allowing both arrays to be GC'd
- Set outConnections = null after onIslandReady() copies all
connections to the shell's finalConnections array
Result:
Reduced memory retention during materialization. ForwardWire
objects and their associated arrays are eligible for GC as
soon as they are consumed, rather than lingering until the
IslandTracking/GraphStageIsland instance is collected.
Tests:
- 156 tests passed (MaterializerSpec, PhasedFusingSpec,
TraversalBuilderSpec, GraphInterpreterSpec, ActorGraphInterpreterSpec)
References:
PR #3062 code review
* docs: clarify why sorted arrays over IntMap for forwardWires
IntMap's Patricia trie allocates nodes on every updated/removed.
Benchmarked at N=500: sorted arrays 1.707ms vs IntMap 2.537ms (1.49x).
At N=1000: sorted arrays 2.554ms vs IntMap 4.260ms (1.67x).
References:
PR #3062 review discussion
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---------
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../stream/AsyncBoundaryThroughputBenchmark.scala | 123 +++++++++++++++++++
.../pekko/stream/MaterializerWiringBenchmark.scala | 132 +++++++++++++++++++++
.../impl/PhasedFusingActorMaterializer.scala | 82 +++++++++----
.../pekko/stream/impl/TraversalBuilder.scala | 4 +-
4 files changed, 314 insertions(+), 27 deletions(-)
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala
new file mode 100644
index 0000000000..d508d074c3
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.nowarn
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.infra.Blackhole
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.stream.scaladsl._
+import pekko.stream.stage._
+
+import com.typesafe.config.ConfigFactory
+
+object AsyncBoundaryThroughputBenchmark {
+ final val ElementCount = 100 * 1000
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class AsyncBoundaryThroughputBenchmark {
+
+ import AsyncBoundaryThroughputBenchmark._
+
+ val config = ConfigFactory.parseString(s"""
+ pekko.stream.materializer.sync-processing-limit = ${Int.MaxValue}
+ """)
+
+ implicit val system: ActorSystem =
ActorSystem("AsyncBoundaryThroughputBenchmark", config)
+
+ @Param(Array("1", "3", "10"))
+ var asyncBoundaries = 0
+
+ var source: Source[Int, NotUsed] = _
+ var flow: Flow[Int, Int, NotUsed] = _
+
+ @Setup
+ def setup(): Unit = {
+ SystemMaterializer(system).materializer
+ source = Source(1 to ElementCount)
+ var f: Flow[Int, Int, NotUsed] = Flow[Int]
+ for (_ <- 1 to asyncBoundaries) {
+ f = f.map(identity).async
+ }
+ flow = f
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(ElementCount)
+ def async_boundary_throughput(blackhole: Blackhole): CountDownLatch = {
+ FusedGraphsBenchmark.blackhole = blackhole
+ val latch = source
+ .via(flow)
+ .toMat(Sink.fromGraph(new JitSafeCompletionLatchInt))(Keep.right)
+ .run()
+ if (!latch.await(30, TimeUnit.SECONDS))
+ throw new RuntimeException("Latch timed out")
+ latch
+ }
+
+ @TearDown
+ def shutdown(): Unit = {
+ Await.result(system.terminate(), 5.seconds)
+ }
+}
+
+class JitSafeCompletionLatchInt extends
GraphStageWithMaterializedValue[SinkShape[Int], CountDownLatch] {
+ val in = Inlet[Int]("JitSafeCompletionLatchInt.in")
+ override val shape = SinkShape(in)
+
+ @nowarn("cat=unused-params")
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, CountDownLatch) = {
+ val latch = new CountDownLatch(1)
+ val logic = new GraphStageLogic(shape) with InHandler {
+ private var count = 0
+
+ override def preStart(): Unit = pull(in)
+ override def onPush(): Unit = {
+ grab(in) // consume element
+ count += 1
+ pull(in)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ FusedGraphsBenchmark.blackhole.consume(count)
+ latch.countDown()
+ completeStage()
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ latch.countDown()
+ failStage(ex)
+ }
+
+ setHandler(in, this)
+ }
+ (logic, latch)
+ }
+}
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala
new file mode 100644
index 0000000000..00e7d70c8f
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/stream/MaterializerWiringBenchmark.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.openjdk.jmh.annotations._
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.stream.scaladsl._
+
+object MaterializerWiringBenchmark {
+
+ val linearFlowBuilder: Int => RunnableGraph[NotUsed] = numOfOperators => {
+ var source = Source.single(())
+ for (_ <- 1 to numOfOperators) {
+ source = source.map(identity)
+ }
+ source.to(Sink.ignore)
+ }
+
+ val broadcastMergeBuilder: Int => RunnableGraph[NotUsed] = numOfJunctions =>
+ RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val broadcast = b.add(Broadcast[Unit](numOfJunctions))
+ var outlet = broadcast.out(0)
+ for (i <- 1 until numOfJunctions) {
+ val merge = b.add(Merge[Unit](2))
+ outlet ~> merge
+ broadcast.out(i) ~> merge
+ outlet = merge.out
+ }
+
+ Source.single(()) ~> broadcast
+ outlet ~> Sink.ignore
+ ClosedShape
+ })
+
+ val broadcastMergeImmediateBuilder: Int => RunnableGraph[NotUsed] =
numOfJunctions =>
+ RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val broadcast = b.add(Broadcast[Unit](numOfJunctions))
+ val merge = b.add(Merge[Unit](numOfJunctions))
+ for (_ <- 0 until numOfJunctions) {
+ broadcast ~> merge
+ }
+
+ Source.single(()) ~> broadcast
+ merge ~> Sink.ignore
+ ClosedShape
+ })
+
+ val importedFlowBuilder: Int => RunnableGraph[NotUsed] = numOfFlows =>
+ RunnableGraph.fromGraph(GraphDSL.createGraph(Source.single(())) { implicit
b => source =>
+ import GraphDSL.Implicits._
+ val flow = Flow[Unit].map(identity)
+ var out: Outlet[Unit] = source.out
+ for (_ <- 0 until numOfFlows) {
+ val flowShape = b.add(flow)
+ out ~> flowShape
+ out = flowShape.outlet
+ }
+ out ~> Sink.ignore
+ ClosedShape
+ })
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class MaterializerWiringBenchmark {
+
+ import MaterializerWiringBenchmark._
+
+ implicit val system: ActorSystem = ActorSystem("MaterializerWiringBenchmark")
+
+ var linearFlow: RunnableGraph[NotUsed] = _
+ var broadcastMerge: RunnableGraph[NotUsed] = _
+ var broadcastMergeImmediate: RunnableGraph[NotUsed] = _
+ var importedFlow: RunnableGraph[NotUsed] = _
+
+ @Param(Array("100", "500", "1000"))
+ var complexity = 0
+
+ @Setup
+ def setup(): Unit = {
+ SystemMaterializer(system).materializer
+ linearFlow = linearFlowBuilder(complexity)
+ broadcastMerge = broadcastMergeBuilder(complexity)
+ broadcastMergeImmediate = broadcastMergeImmediateBuilder(complexity)
+ importedFlow = importedFlowBuilder(complexity)
+ }
+
+ @Benchmark
+ def linear(): NotUsed = linearFlow.run()
+
+ @Benchmark
+ def broadcast_merge_gradual(): NotUsed = broadcastMerge.run()
+
+ @Benchmark
+ def broadcast_merge_immediate(): NotUsed = broadcastMergeImmediate.run()
+
+ @Benchmark
+ def imported_flow(): NotUsed = importedFlow.run()
+
+ @TearDown
+ def shutdown(): Unit = {
+ Await.result(system.terminate(), 5.seconds)
+ }
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
index 29b6b740c3..0a3861984e 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala
@@ -198,7 +198,12 @@ private final case class SavedIslandData(
private var segments: java.util.ArrayList[SegmentInfo] = _
private var activePhases: java.util.ArrayList[PhaseIsland[Any]] = _
- private var forwardWires: java.util.ArrayList[ForwardWire] = _
+ // Forward wires stored as sorted parallel arrays by key (global offset).
+ // Avoids java.lang.Integer boxing, per-operation allocation, and provides
+ // better cache locality than IntMap's Patricia trie nodes.
+ private var forwardWireKeys: Array[Int] = _
+ private var forwardWireValues: Array[ForwardWire] = _
+ private var forwardWireCount: Int = 0
private var islandStateStack: java.util.ArrayList[SavedIslandData] = _
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings,
attributes, materializer, nextIslandName())
@@ -277,22 +282,26 @@ private final case class SavedIslandData(
currentPhase.assignPort(in, localInSlot, logic)
// Check if there was any forward wiring that has this offset/slot as its
target
- // First try to find such wiring
- var forwardWire: ForwardWire = null
- if ((forwardWires ne null) && !forwardWires.isEmpty) {
- var i = 0
- while (i < forwardWires.size()) {
- forwardWire = forwardWires.get(i)
- if (forwardWire.toGlobalOffset == currentGlobalOffset) {
- if (Debug) println(s" there is a forward wire to this slot
$forwardWire")
- forwardWires.remove(i)
- i = Int.MaxValue // Exit the loop
- } else {
- forwardWire = null // Didn't found it yet
- i += 1
- }
- }
- }
+ val forwardWire: ForwardWire =
+ if (forwardWireCount > 0) {
+ val idx = java.util.Arrays.binarySearch(forwardWireKeys, 0,
forwardWireCount, currentGlobalOffset)
+ if (idx >= 0) {
+ val fw = forwardWireValues(idx)
+ forwardWireCount -= 1
+ if (idx < forwardWireCount) {
+ System.arraycopy(forwardWireKeys, idx + 1, forwardWireKeys, idx,
forwardWireCount - idx)
+ System.arraycopy(forwardWireValues, idx + 1, forwardWireValues,
idx, forwardWireCount - idx)
+ }
+ forwardWireValues(forwardWireCount) = null
+ if (forwardWireCount == 0) {
+ forwardWireKeys = null
+ forwardWireValues = null
+ }
+ fw
+ } else null
+ } else null
+ if ((forwardWire ne null) && Debug)
+ println(s" there is a forward wire to this slot $forwardWire")
// If there is a forward wiring we need to resolve it
if (forwardWire ne null) {
@@ -361,8 +370,9 @@ private final case class SavedIslandData(
// The forward wire tracking data structure is only allocated when
needed. Many graphs have no forward wires
// even though it might have islands.
- if (forwardWires eq null) {
- forwardWires = new java.util.ArrayList[ForwardWire](8)
+ if (forwardWireKeys eq null) {
+ forwardWireKeys = new Array[Int](4)
+ forwardWireValues = new Array[ForwardWire](4)
}
val forwardWire = ForwardWire(
@@ -373,7 +383,24 @@ private final case class SavedIslandData(
currentPhase)
if (Debug) println(s" wiring is forward, recording $forwardWire")
- forwardWires.add(forwardWire)
+ if (forwardWireCount == forwardWireKeys.length) {
+ val newLen = forwardWireKeys.length * 2
+ val newKeys = new Array[Int](newLen)
+ val newValues = new Array[ForwardWire](newLen)
+ System.arraycopy(forwardWireKeys, 0, newKeys, 0, forwardWireCount)
+ System.arraycopy(forwardWireValues, 0, newValues, 0, forwardWireCount)
+ forwardWireKeys = newKeys
+ forwardWireValues = newValues
+ }
+ val insertPos = java.util.Arrays.binarySearch(forwardWireKeys, 0,
forwardWireCount, absoluteOffset)
+ val pos = if (insertPos < 0) -(insertPos + 1) else insertPos
+ if (pos < forwardWireCount) {
+ System.arraycopy(forwardWireKeys, pos, forwardWireKeys, pos + 1,
forwardWireCount - pos)
+ System.arraycopy(forwardWireValues, pos, forwardWireValues, pos + 1,
forwardWireCount - pos)
+ }
+ forwardWireKeys(pos) = absoluteOffset
+ forwardWireValues(pos) = forwardWire
+ forwardWireCount += 1
}
}
@@ -702,7 +729,7 @@ private[pekko] object GraphStageIsland {
private var connections = new Array[Connection](16)
private var maxConnections = 0
- private var outConnections: List[Connection] = Nil
+ private var outConnections: java.util.ArrayList[Connection] = _
private var fullIslandName: OptionVal[String] = OptionVal.None
val shell = new GraphInterpreterShell(connections = null, logics = null,
effectiveAttributes, materializer)
@@ -742,7 +769,8 @@ private[pekko] object GraphStageIsland {
def outConn(): Connection = {
val connection = new Connection(0, null, null, null, null)
- outConnections ::= connection
+ if (outConnections eq null) outConnections = new
util.ArrayList[Connection](4)
+ outConnections.add(connection)
connection
}
@@ -800,20 +828,22 @@ private[pekko] object GraphStageIsland {
override def onIslandReady(): Unit = {
- val totalConnections = maxConnections + outConnections.size + 1
+ val outConnSize = if (outConnections ne null) outConnections.size() else 0
+ val totalConnections = maxConnections + outConnSize + 1
val finalConnections = java.util.Arrays.copyOf(connections,
totalConnections)
var i = maxConnections + 1
- var outConns = outConnections
+ var j = 0
while (i < totalConnections) {
- val conn = outConns.head
- outConns = outConns.tail
+ val conn = outConnections.get(j)
+ j += 1
if (conn.inHandler eq null) failOnMissingHandler(conn.inOwner)
else if (conn.outHandler eq null) failOnMissingHandler(conn.outOwner)
finalConnections(i) = conn
conn.id = i
i += 1
}
+ outConnections = null
shell.connections = finalConnections
shell.logics = logics.toArray(GraphStageIsland.emptyLogicArray)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index 980d56c021..d4a7da2074 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -213,6 +213,8 @@ import pekko.util.OptionVal
private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0,
Map.empty, Attributes.none)
+ private val emptyOutSlots: Array[Int] = Array.emptyIntArray
+
/**
* INTERNAL API
*
@@ -269,7 +271,7 @@ import pekko.util.OptionVal
val builder =
if (module.shape.outlets.isEmpty) {
val b = CompletedTraversalBuilder(
- traversalSoFar = MaterializeAtomic(module, new
Array[Int](module.shape.outlets.size)),
+ traversalSoFar = MaterializeAtomic(module, emptyOutSlots),
inSlots = module.shape.inlets.size,
inToOffset = module.shape.inlets.map(in => in -> in.id).toMap,
Attributes.none)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]