This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.5.x by this push:
     new 28d86d86d5 chore: Use array list for better performance in 
BroadcastHub (#2262)
28d86d86d5 is described below

commit 28d86d86d5a5fb7c4020f953eb4c395b2b3a7e37
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu Sep 25 08:50:27 2025 +0800

    chore: Use array list for better performance in BroadcastHub (#2262)
    
    * chore: Use array list for better performance in BroadcastHub
    
    * chore: add benchmark
    
    (cherry picked from commit 3cfe37f016ae2397c375fcfd267323f9ef85fa55)
---
 .../pekko/stream/BroadcastHubBenchmark.scala       | 94 ++++++++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Hub.scala     | 29 +++----
 2 files changed, 109 insertions(+), 14 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
new file mode 100644
index 0000000000..ede811c798
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/BroadcastHubBenchmark.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko.NotUsed
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.remote.artery.{ BenchTestSource, LatchSink }
+import org.apache.pekko.stream.scaladsl._
+import org.apache.pekko.stream.testkit.scaladsl.StreamTestKit
+import org.openjdk.jmh.annotations._
+
+import java.util.concurrent.{ CountDownLatch, TimeUnit }
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+object BroadcastHubBenchmark {
+  final val OperationsPerInvocation = 100000
+}
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+class BroadcastHubBenchmark {
+  import BroadcastHubBenchmark._
+
+  val config = ConfigFactory.parseString("""
+    pekko.actor.default-dispatcher {
+      executor = "fork-join-executor"
+      fork-join-executor {
+        parallelism-factor = 1
+      }
+    }
+    """)
+
+  implicit val system: ActorSystem = ActorSystem("BroadcastHubBenchmark", 
config)
+  import system.dispatcher
+
+  var testSource: Source[java.lang.Integer, NotUsed] = _
+
+  @Param(Array("64", "256"))
+  var parallelism = 0
+
+  @Setup
+  def setup(): Unit = {
+    // eager init of materializer
+    SystemMaterializer(system).materializer
+    testSource = Source.fromGraph(new BenchTestSource(OperationsPerInvocation))
+  }
+
+  @TearDown
+  def shutdown(): Unit = {
+    Await.result(system.terminate(), 5.seconds)
+  }
+
+  @Benchmark
+  @OperationsPerInvocation(OperationsPerInvocation)
+  def broadcast(): Unit = {
+    val latch = new CountDownLatch(parallelism)
+    val broadcastSink =
+      BroadcastHub.sink[java.lang.Integer](bufferSize = parallelism, 
startAfterNrOfConsumers = parallelism)
+    val sink = new LatchSink(OperationsPerInvocation, latch)
+    val source = testSource.runWith(broadcastSink)
+    var idx = 0
+    while (idx < parallelism) {
+      source.runWith(sink)
+      idx += 1
+    }
+    awaitLatch(latch)
+  }
+
+  private def awaitLatch(latch: CountDownLatch): Unit = {
+    if (!latch.await(30, TimeUnit.SECONDS)) {
+      
StreamTestKit.printDebugDump(SystemMaterializer(system).materializer.supervisor)
+      throw new RuntimeException("Latch didn't complete in time")
+    }
+  }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 218c605c23..b997ba74a2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -543,7 +543,8 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
      * a wakeup and update their position at the same time.
      *
      */
-    private[this] val consumerWheel = Array.fill[List[Consumer]](bufferSize * 
2)(Nil)
+    private[this] val consumerWheel =
+      Array.fill[java.util.ArrayList[Consumer]](bufferSize * 2)(new 
util.ArrayList[Consumer]())
     private[this] var activeConsumers = 0
 
     override def preStart(): Unit = {
@@ -651,8 +652,10 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
       }
 
       // Notify registered consumers
-      consumerWheel.iterator.flatMap(_.iterator).foreach { consumer =>
-        consumer.callback.invoke(failMessage)
+      var idx = 0
+      while (idx < consumerWheel.length) {
+        consumerWheel(idx).forEach(_.callback.invoke(failMessage))
+        idx += 1
       }
       failStage(ex)
     }
@@ -666,18 +669,16 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
     private def findAndRemoveConsumer(id: Long, offset: Int): Consumer = {
       // TODO: Try to eliminate modulo division somehow...
       val wheelSlot = offset & WheelMask
-      var consumersInSlot = consumerWheel(wheelSlot)
-      // debug(s"consumers before removal $consumersInSlot")
-      var remainingConsumersInSlot: List[Consumer] = Nil
+      val consumersInSlot = consumerWheel(wheelSlot)
       var removedConsumer: Consumer = null
-
-      while (consumersInSlot.nonEmpty) {
-        val consumer = consumersInSlot.head
-        if (consumer.id != id) remainingConsumersInSlot = consumer :: 
remainingConsumersInSlot
-        else removedConsumer = consumer
-        consumersInSlot = consumersInSlot.tail
+      if (consumersInSlot.size() > 0) {
+        consumersInSlot.removeIf(consumer => {
+          if (consumer.id == id) {
+            removedConsumer = consumer
+            true
+          } else false
+        })
       }
-      consumerWheel(wheelSlot) = remainingConsumersInSlot
       removedConsumer
     }
 
@@ -708,7 +709,7 @@ private[pekko] class 
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
 
     private def addConsumer(consumer: Consumer, offset: Int): Unit = {
       val slot = offset & WheelMask
-      consumerWheel(slot) = consumer :: consumerWheel(slot)
+      consumerWheel(slot).add(consumer)
     }
 
     /*


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to