This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 28d61c5368 perf: replace ArrayList consumer wheel with LongMap for 
O(1) keyed removal (#3063)
28d61c5368 is described below

commit 28d61c53684dcd22302560220698d74a7c5da789
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Jun 14 19:15:54 2026 +0800

    perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal 
(#3063)
    
    * perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal
    
    Motivation:
    BroadcastHub's findAndRemoveConsumer used ArrayList.removeIf which is
    O(k) per event with lambda allocation on every call. In high-fan-out
    scenarios (thousands of consumers clustered in the same wheel slot),
    this creates a producer backpressure bottleneck: the head can only
    advance after the head slot is empty, and draining a large slot
    requires k linear scans each of O(k) cost.
    
    Modification:
    Replace Array[java.util.ArrayList[Consumer]] with Array[LongMap[Consumer]]
    keyed by Consumer.id. Slots are lazily allocated (null = empty) and
    released to null when drained, eliminating baseline memory for empty
    slots and enabling GC of drained LongMaps.
    
    Hot path uses getOrNull + -= (two primitive hash lookups) instead of
    remove (which would allocate Option), achieving zero heap allocation
    per add/remove cycle. No Long boxing since LongMap stores primitive
    long keys.
    
    Adds null guards in Advance/NeedWakeup event handlers to prevent
    latent NPE when findAndRemoveConsumer returns null. Updates
    onUpstreamFailure and wakeupIdx to skip null (empty) slots.
    
    Result:
    Consumer add/remove is O(1) with zero Long boxing and zero Option
    allocation. High-consumer lockstep scenarios see dramatically reduced
    producer backpressure from wheel slot contention. Memory for empty
    wheel slots drops from ~40 bytes per ArrayList to 0 (null).
    
    Tests:
    - sbt "stream-tests/Test/testOnly *HubSpec" → 50 passed, 0 failed
    - sbt "++3.3.8; stream/compile" → success
    - sbt "stream/mimaReportBinaryIssues" → no issues
    - sbt "bench-jmh/compile" → success
    
    References:
    Inspired by akkadotnet/akka.net#8264 (Dictionary-based consumer wheel).
    Pekko uses scala.collection.mutable.LongMap instead of HashMap for
    zero boxing on Long keys and contiguous open-addressing memory layout.
    
    * perf: add standalone BroadcastHub wheel benchmark runner
    
    Adds BroadcastHubBenchRunner for direct measurement of consumer wheel
    throughput under high-fan-out scenarios, bypassing JMH infrastructure
    classpath issues in the bench-jmh module.
    
    Measures lockstep broadcast throughput at 4 consumer counts (64, 256,
    1000, 2000) across 2 buffer sizes (64, 256) with 2 warmup + 3 measured
    runs per configuration.
    
    Results on Apple M-series (elements/sec, higher is better):
    
    Buffer=64 (128 wheel slots, max clustering):
      64 consumers:    296,756 elem/s
      256 consumers:    76,075 elem/s
      1000 consumers:   19,737 elem/s
      2000 consumers:   10,223 elem/s
    
    Buffer=256 (512 wheel slots, moderate clustering):
      64 consumers:  1,148,340 elem/s
      256 consumers:   271,505 elem/s
      1000 consumers:   70,727 elem/s
      2000 consumers:   33,717 elem/s
    
    Throughput degrades gracefully with consumer count, demonstrating the
    O(1) LongMap removal holds up under high per-slot contention.
    
    Tests:
    - sbt "bench-jmh/compile" → success
    - sbt "bench-jmh/runMain org.apache.pekko.stream.BroadcastHubBenchRunner" → 
completed
    
    References:
    Refs #3063
    
    * perf: add license header to BroadcastHubBenchRunner and comparison results
    
    Run headerCreateAll for the new benchmark runner file.
    
    Comparison benchmark results (old ArrayList vs new LongMap):
    
    Buffer=64 (128 wheel slots):
      Consumers  ArrayList(elem/s)  LongMap(elem/s)  Speedup
      64         305,657            296,756          0.97x
      256         72,446             76,075          1.05x
      1000        13,070             19,737          1.51x
      2000         4,348             10,223          2.35x
    
    Buffer=256 (512 wheel slots):
      Consumers  ArrayList(elem/s)  LongMap(elem/s)  Speedup
      64       1,099,345          1,148,340          1.04x
      256        197,676            271,505          1.37x
      1000        27,804             70,727          2.54x
      2000         7,943             33,717          4.24x
    
    The LongMap optimization provides 2.35x-4.24x speedup at 2000 consumers,
    with the gap widening as consumer count increases — confirming the O(k)
    linear scan was the dominant bottleneck.
    
    Tests:
    - sbt "bench-jmh/headerCreateAll" → header created
    - sbt "bench-jmh/compile" → success
    
    References:
    Refs #3063
---
 .../pekko/stream/BroadcastHubBenchRunner.scala     | 113 +++++++++++++++++++++
 .../pekko/stream/BroadcastHubBenchmark.scala       |  50 ++++++++-
 .../org/apache/pekko/stream/scaladsl/HubSpec.scala |  45 ++++++++
 .../org/apache/pekko/stream/scaladsl/Hub.scala     |  72 ++++++++-----
 4 files changed, 254 insertions(+), 26 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchRunner.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchRunner.scala
new file mode 100644
index 0000000000..0efe9e8d49
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchRunner.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.remote.artery.{ BenchTestSource, LatchSink }
+import org.apache.pekko.stream.scaladsl._
+
+import com.typesafe.config.ConfigFactory
+
+/**
+ * Standalone benchmark runner for BroadcastHub consumer wheel performance.
+ * Run with: sbt "bench-jmh/runMain 
org.apache.pekko.stream.BroadcastHubBenchRunner"
+ */
+object BroadcastHubBenchRunner {
+
+  final val Elements = 100000
+  final val SmallBuffer = 64
+  final val LargeBuffer = 256
+  final val WarmupRuns = 2
+  final val MeasureRuns = 3
+
+  def main(args: Array[String]): Unit = {
+    val config = ConfigFactory.parseString("""
+      pekko.actor.default-dispatcher {
+        executor = "fork-join-executor"
+        fork-join-executor {
+          parallelism-factor = 1
+        }
+      }
+    """)
+
+    val consumerCounts = Array(64, 256, 1000, 2000)
+
+    println("=" * 80)
+    println("BroadcastHub Consumer Wheel Benchmark")
+    println(s"Elements per run: $Elements")
+    println(s"Warmup: $WarmupRuns runs, Measure: $MeasureRuns runs")
+    println("=" * 80)
+
+    for (bufferSize <- Array(SmallBuffer, LargeBuffer)) {
+      println(s"\n--- Buffer size: $bufferSize (wheel slots: ${bufferSize * 
2}) ---")
+      println(f"${"Consumers"}%-12s ${"Avg (elem/s)"}%16s ${"Min"}%12s 
${"Max"}%12s ${"StdDev"}%10s")
+      println("-" * 70)
+
+      for (consumerCount <- consumerCounts) {
+        implicit val system: ActorSystem = 
ActorSystem(s"bench-$consumerCount-$bufferSize", config)
+
+        // eager init
+        SystemMaterializer(system).materializer
+
+        val results = new Array[Double](WarmupRuns + MeasureRuns)
+
+        for (run <- 0 until WarmupRuns + MeasureRuns) {
+          val latch = new CountDownLatch(consumerCount)
+          val broadcastSink =
+            BroadcastHub.sink[java.lang.Integer](bufferSize = bufferSize, 
startAfterNrOfConsumers = consumerCount)
+          val testSource = Source.fromGraph(new BenchTestSource(Elements))
+          val source = testSource.runWith(broadcastSink)
+
+          val start = System.nanoTime()
+          var idx = 0
+          while (idx < consumerCount) {
+            source.runWith(new LatchSink(Elements, latch))
+            idx += 1
+          }
+
+          if (!latch.await(120, TimeUnit.SECONDS)) {
+            println(s"  TIMEOUT at consumers=$consumerCount buffer=$bufferSize 
run=$run")
+            Await.result(system.terminate(), 10.seconds)
+            System.exit(1)
+          }
+          val elapsed = (System.nanoTime() - start) / 1e9
+          results(run) = Elements / elapsed
+        }
+
+        val measured = results.drop(WarmupRuns)
+        val avg = measured.sum / measured.length
+        val min = measured.min
+        val max = measured.max
+        val variance = measured.map(x => (x - avg) * (x - avg)).sum / 
measured.length
+        val stddev = math.sqrt(variance)
+
+        println(f"$consumerCount%-12d $avg%16.0f $min%12.0f $max%12.0f 
$stddev%10.0f")
+
+        Await.result(system.terminate(), 10.seconds)
+      }
+    }
+
+    println("\n" + "=" * 80)
+    println("Done.")
+  }
+}
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
index c46bcf5aee..687f695183 100644
--- 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
@@ -32,8 +32,24 @@ import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit
 
 import com.typesafe.config.ConfigFactory
 
+/**
+ * Benchmarks BroadcastHub throughput under high-fan-out lockstep consumer 
scenarios.
+ *
+ * The consumer wheel uses a LongMap per slot for O(1) keyed add/remove 
without Long boxing.
+ * In lockstep, all consumers cluster in the same wheel slot, maximizing 
per-slot contention.
+ * With a small buffer (64), the wheel has only 128 slots, so `consumerCount / 
128` consumers
+ * share each slot — the old ArrayList.removeIf was O(k) per removal, now O(1).
+ *
+ * The `broadcast` benchmark parameterizes over consumer count with a fixed 
small buffer,
+ * measuring how throughput scales as wheel slot pressure increases.
+ *
+ * The `broadcastLargeBuffer` benchmark uses a larger buffer (256) for 
comparison,
+ * showing how the optimization holds up when consumers are spread across more 
slots.
+ */
 object BroadcastHubBenchmark {
   final val OperationsPerInvocation = 100000
+  final val SmallBufferSize = 64
+  final val LargeBufferSize = 256
 }
 
 @State(Scope.Benchmark)
@@ -56,7 +72,7 @@ class BroadcastHubBenchmark {
 
   var testSource: Source[java.lang.Integer, NotUsed] = _
 
-  @Param(Array("64", "256"))
+  @Param(Array("64", "256", "1000", "2000"))
   var parallelism = 0
 
   @Setup
@@ -71,12 +87,40 @@ class BroadcastHubBenchmark {
     Await.result(system.terminate(), 5.seconds)
   }
 
+  /**
+   * Lockstep broadcast with small buffer (64).
+   * All consumers stay at roughly the same wheel offset, clustering in the 
same slot.
+   * With 128 wheel slots and 2000 consumers, ~16 consumers share each slot on 
average;
+   * during NeedWakeup bursts, thousands cluster in a single slot.
+   * This maximizes the O(1) vs O(k) per-removal difference.
+   */
   @Benchmark
   @OperationsPerInvocation(OperationsPerInvocation)
   def broadcast(): Unit = {
     val latch = new CountDownLatch(parallelism)
     val broadcastSink =
-      BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism, 
startAfterNrOfConsumers = parallelism)
+      BroadcastHub.sink[java.lang.Integer](bufferSize = SmallBufferSize, 
startAfterNrOfConsumers = parallelism)
+    val sink = new LatchSink(OperationsPerInvocation, latch)
+    val source = testSource.runWith(broadcastSink)
+    var idx = 0
+    while (idx < parallelism) {
+      source.runWith(sink)
+      idx += 1
+    }
+    awaitLatch(latch)
+  }
+
+  /**
+   * Lockstep broadcast with larger buffer (256) for comparison.
+   * The wheel has 512 slots, so consumers are spread more thinly.
+   * Shows how the optimization scales when per-slot pressure is lower.
+   */
+  @Benchmark
+  @OperationsPerInvocation(OperationsPerInvocation)
+  def broadcastLargeBuffer(): Unit = {
+    val latch = new CountDownLatch(parallelism)
+    val broadcastSink =
+      BroadcastHub.sink[java.lang.Integer](bufferSize = LargeBufferSize, 
startAfterNrOfConsumers = parallelism)
     val sink = new LatchSink(OperationsPerInvocation, latch)
     val source = testSource.runWith(broadcastSink)
     var idx = 0
@@ -88,7 +132,7 @@ class BroadcastHubBenchmark {
   }
 
   private def awaitLatch(latch: CountDownLatch): Unit = {
-    if (!latch.await(30, TimeUnit.SECONDS)) {
+    if (!latch.await(60, TimeUnit.SECONDS)) {
       
StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor)
       throw new RuntimeException("Latch didn't complete in time")
     }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index a6d57dee19..b32c8746b1 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -629,6 +629,51 @@ class HubSpec extends StreamSpec {
       in.sendComplete()
       sinkProbe2.cancel()
     }
+
+    "deliver all elements in order to many consumers" in {
+      val consumerCount = 200
+      val messageCount = 2000
+
+      val source = Source(0 until 
messageCount).runWith(BroadcastHub.sink(bufferSize = 256,
+        startAfterNrOfConsumers = consumerCount))
+
+      val futures = (0 until consumerCount).map { _ =>
+        source.runWith(Sink.seq)
+      }
+
+      val results = Await.result(Future.sequence(futures), 30.seconds)
+      results.foreach { result =>
+        result should ===(0 until messageCount)
+      }
+    }
+
+    "handle many consumers when some cancel mid-stream" in {
+      val totalConsumers = 64
+      val cancellingConsumers = 16
+      val cancelAfter = 64
+      val messageCount = 512
+
+      val source = Source(0 until messageCount).runWith(
+        BroadcastHub.sink(bufferSize = 256, startAfterNrOfConsumers = 
totalConsumers))
+
+      val cancellingFutures = (0 until cancellingConsumers).map { _ =>
+        source.take(cancelAfter).runWith(Sink.seq)
+      }
+
+      val remainingFutures = (0 until (totalConsumers - 
cancellingConsumers)).map { _ =>
+        source.runWith(Sink.seq)
+      }
+
+      val cancellingResults = Await.result(Future.sequence(cancellingFutures), 
30.seconds)
+      cancellingResults.foreach { result =>
+        result should ===(0 until cancelAfter)
+      }
+
+      val remainingResults = Await.result(Future.sequence(remainingFutures), 
30.seconds)
+      remainingResults.foreach { result =>
+        result should ===(0 until messageCount)
+      }
+    }
   }
 
   "PartitionHub" must {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 4b6ca0063e..8849529e6d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -536,14 +536,17 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
      * of priorities always fall to a range
      *
      * This wheel tracks the position of Consumers relative to the slowest 
ones. Every slot
-     * contains a list of Consumers being known at that location (this might 
be out of date!).
+     * contains a map of Consumers being known at that location (this might be 
out of date!).
      * Consumers from time to time send Advance messages to indicate that they 
have progressed
      * by reading from the broadcast queue. Consumers that are blocked (due to 
reaching tail) request
      * a wakeup and update their position at the same time.
      *
+     * Each slot uses a LongMap keyed by Consumer.id for O(1) add/remove 
without Long boxing.
+     * Empty slots are null (no backing map allocated), reducing baseline 
memory and GC pressure.
+     * When a slot drains to zero consumers, its map is released (set to null).
      */
     private[this] val consumerWheel =
-      Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new 
util.ArrayList[Consumer]())
+      new Array[LongMap[Consumer]](bufferSize * 2)
     private[this] var activeConsumers = 0
 
     override def preStart(): Unit = {
@@ -574,15 +577,19 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
           val newOffset = previousOffset + DemandThreshold
           // Move the consumer from its last known offset to its new one. 
Check if we are unblocked.
           val consumer = findAndRemoveConsumer(id, previousOffset)
-          addConsumer(consumer, newOffset)
+          if (consumer ne null) {
+            addConsumer(consumer, newOffset)
+          }
           checkUnblock(previousOffset)
         case NeedWakeup(id, previousOffset, currentOffset) =>
           // Move the consumer from its last known offset to its new one. 
Check if we are unblocked.
           val consumer = findAndRemoveConsumer(id, previousOffset)
-          addConsumer(consumer, currentOffset)
+          if (consumer ne null) {
+            addConsumer(consumer, currentOffset)
 
-          // Also check if the consumer is now unblocked since we published an 
element since it went asleep.
-          if (currentOffset != tail) consumer.callback.invoke(Wakeup)
+            // Also check if the consumer is now unblocked since we published 
an element since it went asleep.
+            if (currentOffset != tail) consumer.callback.invoke(Wakeup)
+          }
           checkUnblock(previousOffset)
 
         case RegistrationPending =>
@@ -650,10 +657,14 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
         consumer.callback.invoke(failMessage)
       }
 
-      // Notify registered consumers
+      // Notify registered consumers — skip null (empty) slots
       var idx = 0
       while (idx < consumerWheel.length) {
-        consumerWheel(idx).forEach(_.callback.invoke(failMessage))
+        val bucket = consumerWheel(idx)
+        if (bucket ne null) {
+          val itr = bucket.valuesIterator
+          while (itr.hasNext) itr.next().callback.invoke(failMessage)
+        }
         idx += 1
       }
       failStage(ex)
@@ -664,21 +675,19 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
      *
      * NB: You cannot remove a consumer without knowing its last offset! 
Consumers on the Source side always must
      * track this so this can be a fast operation.
+     *
+     * Uses LongMap.getOrNull + -= to avoid Option allocation on the hot path.
      */
     private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = {
-      // TODO: Try to eliminate modulo division somehow...
       val wheelSlot = offset & WheelMask
-      val consumersInSlot = consumerWheel(wheelSlot)
-      var removedConsumer: Consumer = null
-      if (consumersInSlot.size() > 0) {
-        consumersInSlot.removeIf(consumer => {
-          if (consumer.id == id) {
-            removedConsumer = consumer
-            true
-          } else false
-        })
+      val bucket = consumerWheel(wheelSlot)
+      if (bucket eq null) return null
+      val consumer = bucket.getOrNull(id)
+      if (consumer ne null) {
+        bucket -= id
+        if (bucket.isEmpty) consumerWheel(wheelSlot) = null
       }
-      removedConsumer
+      consumer
     }
 
     /*
@@ -697,7 +706,7 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
       if (offsetOfConsumerRemoved == head) {
         // Try to advance along the wheel. We can skip any wheel slots which 
have no waiting Consumers, until
         // we either find a nonempty one, or we reached the end of the buffer.
-        while (consumerWheel(head & WheelMask).isEmpty && head != tail) {
+        while (isConsumerWheelSlotEmpty(head & WheelMask) && head != tail) {
           queue(head & Mask) = null
           head += 1
           unblocked = true
@@ -706,18 +715,35 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
       unblocked
     }
 
+    private def isConsumerWheelSlotEmpty(slot: Int): Boolean = {
+      val bucket = consumerWheel(slot)
+      (bucket eq null) || bucket.isEmpty
+    }
+
     private def addConsumer(consumer: Consumer, offset: Int): Unit = {
       val slot = offset & WheelMask
-      consumerWheel(slot).add(consumer)
+      val bucket = consumerWheel(slot)
+      if (bucket ne null) bucket.update(consumer.id, consumer)
+      else {
+        val newBucket = LongMap.empty[Consumer]
+        newBucket.update(consumer.id, consumer)
+        consumerWheel(slot) = newBucket
+      }
     }
 
     /*
      * Send a wakeup signal to all the Consumers at a certain wheel index. 
Note, this needs the actual index,
      * which is offset modulo (bufferSize + 1).
+     *
+     * Enumeration order of the bucket is not semantically significant — every 
consumer receives the same
+     * wakeup signal independently.
      */
     private def wakeupIdx(idx: Int): Unit = {
-      val itr = consumerWheel(idx).iterator
-      while (itr.hasNext) itr.next().callback.invoke(Wakeup)
+      val bucket = consumerWheel(idx)
+      if (bucket ne null) {
+        val itr = bucket.valuesIterator
+        while (itr.hasNext) itr.next().callback.invoke(Wakeup)
+      }
     }
 
     private def complete(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to