This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch optimize/broadcast-hub-wheel-map
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit ad9dbde8e844247669f19cd9b8e4d7797604d0a2
Author: He-Pin <[email protected]>
AuthorDate: Sun Jun 14 06:05:56 2026 +0800

    perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal
    
    Motivation:
    BroadcastHub's findAndRemoveConsumer used ArrayList.removeIf which is
    O(k) per event with lambda allocation on every call. In high-fan-out
    scenarios (thousands of consumers clustered in the same wheel slot),
    this creates a producer backpressure bottleneck: the head can only
    advance after the head slot is empty, and draining a large slot
    requires k linear scans each of O(k) cost.
    
    Modification:
    Replace Array[java.util.ArrayList[Consumer]] with Array[LongMap[Consumer]]
    keyed by Consumer.id. Slots are lazily allocated (null = empty) and
    released to null when drained, eliminating baseline memory for empty
    slots and enabling GC of drained LongMaps.
    
    Hot path uses getOrNull + -= (two primitive hash lookups) instead of
    remove (which would allocate Option), achieving zero heap allocation
    per add/remove cycle. No Long boxing since LongMap stores primitive
    long keys.
    
    Adds null guards in Advance/NeedWakeup event handlers to prevent
    latent NPE when findAndRemoveConsumer returns null. Updates
    onUpstreamFailure and wakeupIdx to skip null (empty) slots.
    
    Result:
    Consumer add/remove is O(1) with zero Long boxing and zero Option
    allocation. High-consumer lockstep scenarios see dramatically reduced
    producer backpressure from wheel slot contention. Memory for empty
    wheel slots drops from ~40 bytes per ArrayList to 0 (null).
    
    Tests:
    - sbt "stream-tests/Test/testOnly *HubSpec" → 50 passed, 0 failed
    - sbt "++3.3.8; stream/compile" → success
    - sbt "stream/mimaReportBinaryIssues" → no issues
    - sbt "bench-jmh/compile" → success
    
    References:
    Inspired by akkadotnet/akka.net#8264 (Dictionary-based consumer wheel).
    Pekko uses scala.collection.mutable.LongMap instead of HashMap for
    zero boxing on Long keys and contiguous open-addressing memory layout.
---
 .../pekko/stream/BroadcastHubBenchmark.scala       | 50 ++++++++++++++-
 .../org/apache/pekko/stream/scaladsl/HubSpec.scala | 45 ++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Hub.scala     | 72 +++++++++++++++-------
 3 files changed, 141 insertions(+), 26 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
index c46bcf5aee..687f695183 100644
--- 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
@@ -32,8 +32,24 @@ import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit
 
 import com.typesafe.config.ConfigFactory
 
+/**
+ * Benchmarks BroadcastHub throughput under high-fan-out lockstep consumer 
scenarios.
+ *
+ * The consumer wheel uses a LongMap per slot for O(1) keyed add/remove 
without Long boxing.
+ * In lockstep, all consumers cluster in the same wheel slot, maximizing 
per-slot contention.
+ * With a small buffer (64), the wheel has only 128 slots, so `consumerCount / 
128` consumers
+ * share each slot — the old ArrayList.removeIf was O(k) per removal, now O(1).
+ *
+ * The `broadcast` benchmark parameterizes over consumer count with a fixed 
small buffer,
+ * measuring how throughput scales as wheel slot pressure increases.
+ *
+ * The `broadcastLargeBuffer` benchmark uses a larger buffer (256) for 
comparison,
+ * showing how the optimization holds up when consumers are spread across more 
slots.
+ */
 object BroadcastHubBenchmark {
   final val OperationsPerInvocation = 100000
+  final val SmallBufferSize = 64
+  final val LargeBufferSize = 256
 }
 
 @State(Scope.Benchmark)
@@ -56,7 +72,7 @@ class BroadcastHubBenchmark {
 
   var testSource: Source[java.lang.Integer, NotUsed] = _
 
-  @Param(Array("64", "256"))
+  @Param(Array("64", "256", "1000", "2000"))
   var parallelism = 0
 
   @Setup
@@ -71,12 +87,40 @@ class BroadcastHubBenchmark {
     Await.result(system.terminate(), 5.seconds)
   }
 
+  /**
+   * Lockstep broadcast with small buffer (64).
+   * All consumers stay at roughly the same wheel offset, clustering in the 
same slot.
+   * With 128 wheel slots and 2000 consumers, ~16 consumers share each slot on 
average;
+   * during NeedWakeup bursts, thousands cluster in a single slot.
+   * This maximizes the O(1) vs O(k) per-removal difference.
+   */
   @Benchmark
   @OperationsPerInvocation(OperationsPerInvocation)
   def broadcast(): Unit = {
     val latch = new CountDownLatch(parallelism)
     val broadcastSink =
-      BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism, 
startAfterNrOfConsumers = parallelism)
+      BroadcastHub.sink[java.lang.Integer](bufferSize = SmallBufferSize, 
startAfterNrOfConsumers = parallelism)
+    val sink = new LatchSink(OperationsPerInvocation, latch)
+    val source = testSource.runWith(broadcastSink)
+    var idx = 0
+    while (idx < parallelism) {
+      source.runWith(sink)
+      idx += 1
+    }
+    awaitLatch(latch)
+  }
+
+  /**
+   * Lockstep broadcast with larger buffer (256) for comparison.
+   * The wheel has 512 slots, so consumers are spread more thinly.
+   * Shows how the optimization scales when per-slot pressure is lower.
+   */
+  @Benchmark
+  @OperationsPerInvocation(OperationsPerInvocation)
+  def broadcastLargeBuffer(): Unit = {
+    val latch = new CountDownLatch(parallelism)
+    val broadcastSink =
+      BroadcastHub.sink[java.lang.Integer](bufferSize = LargeBufferSize, 
startAfterNrOfConsumers = parallelism)
     val sink = new LatchSink(OperationsPerInvocation, latch)
     val source = testSource.runWith(broadcastSink)
     var idx = 0
@@ -88,7 +132,7 @@ class BroadcastHubBenchmark {
   }
 
   private def awaitLatch(latch: CountDownLatch): Unit = {
-    if (!latch.await(30, TimeUnit.SECONDS)) {
+    if (!latch.await(60, TimeUnit.SECONDS)) {
       
StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor)
       throw new RuntimeException("Latch didn't complete in time")
     }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index a6d57dee19..b32c8746b1 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -629,6 +629,51 @@ class HubSpec extends StreamSpec {
       in.sendComplete()
       sinkProbe2.cancel()
     }
+
+    "deliver all elements in order to many consumers" in {
+      val consumerCount = 200
+      val messageCount = 2000
+
+      val source = Source(0 until 
messageCount).runWith(BroadcastHub.sink(bufferSize = 256,
+        startAfterNrOfConsumers = consumerCount))
+
+      val futures = (0 until consumerCount).map { _ =>
+        source.runWith(Sink.seq)
+      }
+
+      val results = Await.result(Future.sequence(futures), 30.seconds)
+      results.foreach { result =>
+        result should ===(0 until messageCount)
+      }
+    }
+
+    "handle many consumers when some cancel mid-stream" in {
+      val totalConsumers = 64
+      val cancellingConsumers = 16
+      val cancelAfter = 64
+      val messageCount = 512
+
+      val source = Source(0 until messageCount).runWith(
+        BroadcastHub.sink(bufferSize = 256, startAfterNrOfConsumers = 
totalConsumers))
+
+      val cancellingFutures = (0 until cancellingConsumers).map { _ =>
+        source.take(cancelAfter).runWith(Sink.seq)
+      }
+
+      val remainingFutures = (0 until (totalConsumers - 
cancellingConsumers)).map { _ =>
+        source.runWith(Sink.seq)
+      }
+
+      val cancellingResults = Await.result(Future.sequence(cancellingFutures), 
30.seconds)
+      cancellingResults.foreach { result =>
+        result should ===(0 until cancelAfter)
+      }
+
+      val remainingResults = Await.result(Future.sequence(remainingFutures), 
30.seconds)
+      remainingResults.foreach { result =>
+        result should ===(0 until messageCount)
+      }
+    }
   }
 
   "PartitionHub" must {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 4b6ca0063e..8849529e6d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -536,14 +536,17 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
      * of priorities always fall to a range
      *
      * This wheel tracks the position of Consumers relative to the slowest 
ones. Every slot
-     * contains a list of Consumers being known at that location (this might 
be out of date!).
+     * contains a map of Consumers being known at that location (this might be 
out of date!).
      * Consumers from time to time send Advance messages to indicate that they 
have progressed
      * by reading from the broadcast queue. Consumers that are blocked (due to 
reaching tail) request
      * a wakeup and update their position at the same time.
      *
+     * Each slot uses a LongMap keyed by Consumer.id for O(1) add/remove 
without Long boxing.
+     * Empty slots are null (no backing map allocated), reducing baseline 
memory and GC pressure.
+     * When a slot drains to zero consumers, its map is released (set to null).
      */
     private[this] val consumerWheel =
-      Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new 
util.ArrayList[Consumer]())
+      new Array[LongMap[Consumer]](bufferSize * 2)
     private[this] var activeConsumers = 0
 
     override def preStart(): Unit = {
@@ -574,15 +577,19 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
           val newOffset = previousOffset + DemandThreshold
           // Move the consumer from its last known offset to its new one. 
Check if we are unblocked.
           val consumer = findAndRemoveConsumer(id, previousOffset)
-          addConsumer(consumer, newOffset)
+          if (consumer ne null) {
+            addConsumer(consumer, newOffset)
+          }
           checkUnblock(previousOffset)
         case NeedWakeup(id, previousOffset, currentOffset) =>
           // Move the consumer from its last known offset to its new one. 
Check if we are unblocked.
           val consumer = findAndRemoveConsumer(id, previousOffset)
-          addConsumer(consumer, currentOffset)
+          if (consumer ne null) {
+            addConsumer(consumer, currentOffset)
 
-          // Also check if the consumer is now unblocked since we published an 
element since it went asleep.
-          if (currentOffset != tail) consumer.callback.invoke(Wakeup)
+            // Also check if the consumer is now unblocked since we published 
an element since it went asleep.
+            if (currentOffset != tail) consumer.callback.invoke(Wakeup)
+          }
           checkUnblock(previousOffset)
 
         case RegistrationPending =>
@@ -650,10 +657,14 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
         consumer.callback.invoke(failMessage)
       }
 
-      // Notify registered consumers
+      // Notify registered consumers — skip null (empty) slots
       var idx = 0
       while (idx < consumerWheel.length) {
-        consumerWheel(idx).forEach(_.callback.invoke(failMessage))
+        val bucket = consumerWheel(idx)
+        if (bucket ne null) {
+          val itr = bucket.valuesIterator
+          while (itr.hasNext) itr.next().callback.invoke(failMessage)
+        }
         idx += 1
       }
       failStage(ex)
@@ -664,21 +675,19 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
      *
      * NB: You cannot remove a consumer without knowing its last offset! 
Consumers on the Source side always must
      * track this so this can be a fast operation.
+     *
+     * Uses LongMap.getOrNull + -= to avoid Option allocation on the hot path.
      */
     private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = {
-      // TODO: Try to eliminate modulo division somehow...
       val wheelSlot = offset & WheelMask
-      val consumersInSlot = consumerWheel(wheelSlot)
-      var removedConsumer: Consumer = null
-      if (consumersInSlot.size() > 0) {
-        consumersInSlot.removeIf(consumer => {
-          if (consumer.id == id) {
-            removedConsumer = consumer
-            true
-          } else false
-        })
+      val bucket = consumerWheel(wheelSlot)
+      if (bucket eq null) return null
+      val consumer = bucket.getOrNull(id)
+      if (consumer ne null) {
+        bucket -= id
+        if (bucket.isEmpty) consumerWheel(wheelSlot) = null
       }
-      removedConsumer
+      consumer
     }
 
     /*
@@ -697,7 +706,7 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
       if (offsetOfConsumerRemoved == head) {
         // Try to advance along the wheel. We can skip any wheel slots which 
have no waiting Consumers, until
         // we either find a nonempty one, or we reached the end of the buffer.
-        while (consumerWheel(head & WheelMask).isEmpty && head != tail) {
+        while (isConsumerWheelSlotEmpty(head & WheelMask) && head != tail) {
           queue(head & Mask) = null
           head += 1
           unblocked = true
@@ -706,18 +715,35 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
       unblocked
     }
 
+    private def isConsumerWheelSlotEmpty(slot: Int): Boolean = {
+      val bucket = consumerWheel(slot)
+      (bucket eq null) || bucket.isEmpty
+    }
+
     private def addConsumer(consumer: Consumer, offset: Int): Unit = {
       val slot = offset & WheelMask
-      consumerWheel(slot).add(consumer)
+      val bucket = consumerWheel(slot)
+      if (bucket ne null) bucket.update(consumer.id, consumer)
+      else {
+        val newBucket = LongMap.empty[Consumer]
+        newBucket.update(consumer.id, consumer)
+        consumerWheel(slot) = newBucket
+      }
     }
 
     /*
      * Send a wakeup signal to all the Consumers at a certain wheel index. 
Note, this needs the actual index,
      * which is offset modulo (bufferSize + 1).
+     *
+     * Enumeration order of the bucket is not semantically significant — every 
consumer receives the same
+     * wakeup signal independently.
      */
     private def wakeupIdx(idx: Int): Unit = {
-      val itr = consumerWheel(idx).iterator
-      while (itr.hasNext) itr.next().callback.invoke(Wakeup)
+      val bucket = consumerWheel(idx)
+      if (bucket ne null) {
+        val itr = bucket.valuesIterator
+        while (itr.hasNext) itr.next().callback.invoke(Wakeup)
+      }
     }
 
     private def complete(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to