This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch optimize/broadcast-hub-wheel-map in repository https://gitbox.apache.org/repos/asf/pekko.git
commit ad9dbde8e844247669f19cd9b8e4d7797604d0a2 Author: He-Pin <[email protected]> AuthorDate: Sun Jun 14 06:05:56 2026 +0800 perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal Motivation: BroadcastHub's findAndRemoveConsumer used ArrayList.removeIf which is O(k) per event with lambda allocation on every call. In high-fan-out scenarios (thousands of consumers clustered in the same wheel slot), this creates a producer backpressure bottleneck: the head can only advance after the head slot is empty, and draining a large slot requires k linear scans each of O(k) cost. Modification: Replace Array[java.util.ArrayList[Consumer]] with Array[LongMap[Consumer]] keyed by Consumer.id. Slots are lazily allocated (null = empty) and released to null when drained, eliminating baseline memory for empty slots and enabling GC of drained LongMaps. Hot path uses getOrNull + -= (two primitive hash lookups) instead of remove (which would allocate Option), achieving zero heap allocation per add/remove cycle. No Long boxing since LongMap stores primitive long keys. Adds null guards in Advance/NeedWakeup event handlers to prevent latent NPE when findAndRemoveConsumer returns null. Updates onUpstreamFailure and wakeupIdx to skip null (empty) slots. Result: Consumer add/remove is O(1) with zero Long boxing and zero Option allocation. High-consumer lockstep scenarios see dramatically reduced producer backpressure from wheel slot contention. Memory for empty wheel slots drops from ~40 bytes per ArrayList to 0 (null). Tests: - sbt "stream-tests/Test/testOnly *HubSpec" → 50 passed, 0 failed - sbt "++3.3.8; stream/compile" → success - sbt "stream/mimaReportBinaryIssues" → no issues - sbt "bench-jmh/compile" → success References: Inspired by akkadotnet/akka.net#8264 (Dictionary-based consumer wheel). Pekko uses scala.collection.mutable.LongMap instead of HashMap for zero boxing on Long keys and contiguous open-addressing memory layout. --- .../pekko/stream/BroadcastHubBenchmark.scala | 50 ++++++++++++++- .../org/apache/pekko/stream/scaladsl/HubSpec.scala | 45 ++++++++++++++ .../org/apache/pekko/stream/scaladsl/Hub.scala | 72 +++++++++++++++------- 3 files changed, 141 insertions(+), 26 deletions(-) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala index c46bcf5aee..687f695183 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala @@ -32,8 +32,24 @@ import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit import com.typesafe.config.ConfigFactory +/** + * Benchmarks BroadcastHub throughput under high-fan-out lockstep consumer scenarios. + * + * The consumer wheel uses a LongMap per slot for O(1) keyed add/remove without Long boxing. + * In lockstep, all consumers cluster in the same wheel slot, maximizing per-slot contention. + * With a small buffer (64), the wheel has only 128 slots, so `consumerCount / 128` consumers + * share each slot — the old ArrayList.removeIf was O(k) per removal, now O(1). + * + * The `broadcast` benchmark parameterizes over consumer count with a fixed small buffer, + * measuring how throughput scales as wheel slot pressure increases. + * + * The `broadcastLargeBuffer` benchmark uses a larger buffer (256) for comparison, + * showing how the optimization holds up when consumers are spread across more slots. + */ object BroadcastHubBenchmark { final val OperationsPerInvocation = 100000 + final val SmallBufferSize = 64 + final val LargeBufferSize = 256 } @State(Scope.Benchmark) @@ -56,7 +72,7 @@ class BroadcastHubBenchmark { var testSource: Source[java.lang.Integer, NotUsed] = _ - @Param(Array("64", "256")) + @Param(Array("64", "256", "1000", "2000")) var parallelism = 0 @Setup @@ -71,12 +87,40 @@ class BroadcastHubBenchmark { Await.result(system.terminate(), 5.seconds) } + /** + * Lockstep broadcast with small buffer (64). + * All consumers stay at roughly the same wheel offset, clustering in the same slot. + * With 128 wheel slots and 2000 consumers, ~16 consumers share each slot on average; + * during NeedWakeup bursts, thousands cluster in a single slot. + * This maximizes the O(1) vs O(k) per-removal difference. + */ @Benchmark @OperationsPerInvocation(OperationsPerInvocation) def broadcast(): Unit = { val latch = new CountDownLatch(parallelism) val broadcastSink = - BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism, startAfterNrOfConsumers = parallelism) + BroadcastHub.sink[java.lang.Integer](bufferSize = SmallBufferSize, startAfterNrOfConsumers = parallelism) + val sink = new LatchSink(OperationsPerInvocation, latch) + val source = testSource.runWith(broadcastSink) + var idx = 0 + while (idx < parallelism) { + source.runWith(sink) + idx += 1 + } + awaitLatch(latch) + } + + /** + * Lockstep broadcast with larger buffer (256) for comparison. + * The wheel has 512 slots, so consumers are spread more thinly. + * Shows how the optimization scales when per-slot pressure is lower. + */ + @Benchmark + @OperationsPerInvocation(OperationsPerInvocation) + def broadcastLargeBuffer(): Unit = { + val latch = new CountDownLatch(parallelism) + val broadcastSink = + BroadcastHub.sink[java.lang.Integer](bufferSize = LargeBufferSize, startAfterNrOfConsumers = parallelism) val sink = new LatchSink(OperationsPerInvocation, latch) val source = testSource.runWith(broadcastSink) var idx = 0 @@ -88,7 +132,7 @@ class BroadcastHubBenchmark { } private def awaitLatch(latch: CountDownLatch): Unit = { - if (!latch.await(30, TimeUnit.SECONDS)) { + if (!latch.await(60, TimeUnit.SECONDS)) { StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor) throw new RuntimeException("Latch didn't complete in time") } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index a6d57dee19..b32c8746b1 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -629,6 +629,51 @@ class HubSpec extends StreamSpec { in.sendComplete() sinkProbe2.cancel() } + + "deliver all elements in order to many consumers" in { + val consumerCount = 200 + val messageCount = 2000 + + val source = Source(0 until messageCount).runWith(BroadcastHub.sink(bufferSize = 256, + startAfterNrOfConsumers = consumerCount)) + + val futures = (0 until consumerCount).map { _ => + source.runWith(Sink.seq) + } + + val results = Await.result(Future.sequence(futures), 30.seconds) + results.foreach { result => + result should ===(0 until messageCount) + } + } + + "handle many consumers when some cancel mid-stream" in { + val totalConsumers = 64 + val cancellingConsumers = 16 + val cancelAfter = 64 + val messageCount = 512 + + val source = Source(0 until messageCount).runWith( + BroadcastHub.sink(bufferSize = 256, startAfterNrOfConsumers = totalConsumers)) + + val cancellingFutures = (0 until cancellingConsumers).map { _ => + source.take(cancelAfter).runWith(Sink.seq) + } + + val remainingFutures = (0 until (totalConsumers - cancellingConsumers)).map { _ => + source.runWith(Sink.seq) + } + + val cancellingResults = Await.result(Future.sequence(cancellingFutures), 30.seconds) + cancellingResults.foreach { result => + result should ===(0 until cancelAfter) + } + + val remainingResults = Await.result(Future.sequence(remainingFutures), 30.seconds) + remainingResults.foreach { result => + result should ===(0 until messageCount) + } + } } "PartitionHub" must { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index 4b6ca0063e..8849529e6d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -536,14 +536,17 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I * of priorities always fall to a range * * This wheel tracks the position of Consumers relative to the slowest ones. Every slot - * contains a list of Consumers being known at that location (this might be out of date!). + * contains a map of Consumers being known at that location (this might be out of date!). * Consumers from time to time send Advance messages to indicate that they have progressed * by reading from the broadcast queue. Consumers that are blocked (due to reaching tail) request * a wakeup and update their position at the same time. * + * Each slot uses a LongMap keyed by Consumer.id for O(1) add/remove without Long boxing. + * Empty slots are null (no backing map allocated), reducing baseline memory and GC pressure. + * When a slot drains to zero consumers, its map is released (set to null). */ private[this] val consumerWheel = - Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new util.ArrayList[Consumer]()) + new Array[LongMap[Consumer]](bufferSize * 2) private[this] var activeConsumers = 0 override def preStart(): Unit = { @@ -574,15 +577,19 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I val newOffset = previousOffset + DemandThreshold // Move the consumer from its last known offset to its new one. Check if we are unblocked. val consumer = findAndRemoveConsumer(id, previousOffset) - addConsumer(consumer, newOffset) + if (consumer ne null) { + addConsumer(consumer, newOffset) + } checkUnblock(previousOffset) case NeedWakeup(id, previousOffset, currentOffset) => // Move the consumer from its last known offset to its new one. Check if we are unblocked. val consumer = findAndRemoveConsumer(id, previousOffset) - addConsumer(consumer, currentOffset) + if (consumer ne null) { + addConsumer(consumer, currentOffset) - // Also check if the consumer is now unblocked since we published an element since it went asleep. - if (currentOffset != tail) consumer.callback.invoke(Wakeup) + // Also check if the consumer is now unblocked since we published an element since it went asleep. + if (currentOffset != tail) consumer.callback.invoke(Wakeup) + } checkUnblock(previousOffset) case RegistrationPending => @@ -650,10 +657,14 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I consumer.callback.invoke(failMessage) } - // Notify registered consumers + // Notify registered consumers — skip null (empty) slots var idx = 0 while (idx < consumerWheel.length) { - consumerWheel(idx).forEach(_.callback.invoke(failMessage)) + val bucket = consumerWheel(idx) + if (bucket ne null) { + val itr = bucket.valuesIterator + while (itr.hasNext) itr.next().callback.invoke(failMessage) + } idx += 1 } failStage(ex) @@ -664,21 +675,19 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I * * NB: You cannot remove a consumer without knowing its last offset! Consumers on the Source side always must * track this so this can be a fast operation. + * + * Uses LongMap.getOrNull + -= to avoid Option allocation on the hot path. */ private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = { - // TODO: Try to eliminate modulo division somehow... val wheelSlot = offset & WheelMask - val consumersInSlot = consumerWheel(wheelSlot) - var removedConsumer: Consumer = null - if (consumersInSlot.size() > 0) { - consumersInSlot.removeIf(consumer => { - if (consumer.id == id) { - removedConsumer = consumer - true - } else false - }) + val bucket = consumerWheel(wheelSlot) + if (bucket eq null) return null + val consumer = bucket.getOrNull(id) + if (consumer ne null) { + bucket -= id + if (bucket.isEmpty) consumerWheel(wheelSlot) = null } - removedConsumer + consumer } /* @@ -697,7 +706,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I if (offsetOfConsumerRemoved == head) { // Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until // we either find a nonempty one, or we reached the end of the buffer. - while (consumerWheel(head & WheelMask).isEmpty && head != tail) { + while (isConsumerWheelSlotEmpty(head & WheelMask) && head != tail) { queue(head & Mask) = null head += 1 unblocked = true @@ -706,18 +715,35 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I unblocked } + private def isConsumerWheelSlotEmpty(slot: Int): Boolean = { + val bucket = consumerWheel(slot) + (bucket eq null) || bucket.isEmpty + } + private def addConsumer(consumer: Consumer, offset: Int): Unit = { val slot = offset & WheelMask - consumerWheel(slot).add(consumer) + val bucket = consumerWheel(slot) + if (bucket ne null) bucket.update(consumer.id, consumer) + else { + val newBucket = LongMap.empty[Consumer] + newBucket.update(consumer.id, consumer) + consumerWheel(slot) = newBucket + } } /* * Send a wakeup signal to all the Consumers at a certain wheel index. Note, this needs the actual index, * which is offset modulo (bufferSize + 1). + * + * Enumeration order of the bucket is not semantically significant — every consumer receives the same + * wakeup signal independently. */ private def wakeupIdx(idx: Int): Unit = { - val itr = consumerWheel(idx).iterator - while (itr.hasNext) itr.next().callback.invoke(Wakeup) + val bucket = consumerWheel(idx) + if (bucket ne null) { + val itr = bucket.valuesIterator + while (itr.hasNext) itr.next().callback.invoke(Wakeup) + } } private def complete(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
