This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new b13d68f622 refactor: inline onEvent into MergeHub.tryProcessNext 
(#2901)
b13d68f622 is described below

commit b13d68f62218cf3e1679d50ac9af699fe084cb37
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 10 03:51:22 2026 +0800

    refactor: inline onEvent into MergeHub.tryProcessNext (#2901)
    
    Drop the Boolean-returning onEvent helper; pattern-match the event
    directly inside tryProcessNext. Element pushes and exits, Register and
    Deregister recurse via @tailrec to drain control messages from the
    queue. Same protocol, no behavior change.
---
 .../org/apache/pekko/stream/scaladsl/Hub.scala     | 29 ++++++++++------------
 1 file changed, 13 insertions(+), 16 deletions(-)

diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 7a8adf7c53..4b6ca0063e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -201,21 +201,6 @@ private[pekko] class MergeHub[T](perProducerBufferSize: 
Int, drainingEnabled: Bo
 
     setHandler(out, this)
 
-    // Returns true when we have not consumed demand, false otherwise
-    private def onEvent(ev: Event): Boolean = ev match {
-      case Element(id, elem) =>
-        demands(id).onElement()
-        push(out, elem)
-        false
-      case Register(id, callback) =>
-        demands.put(id, new InputState(callback))
-        true
-      case Deregister(id) =>
-        demands.remove(id)
-        if (drainingEnabled && draining) tryCompleteOnDraining()
-        true
-    }
-
     private def tryCompleteOnDraining(): Unit = {
       if (demands.isEmpty && (queue.peek() eq null)) {
         completeStage()
@@ -235,7 +220,19 @@ private[pekko] class MergeHub[T](perProducerBufferSize: 
Int, drainingEnabled: Bo
       // queue, but then we will eventually reach the Deregister message, too.
       if (nextElem ne null) {
         needWakeup = false
-        if (onEvent(nextElem)) tryProcessNext(firstAttempt = true)
+        nextElem match {
+          case Element(id, elem) =>
+            demands(id).onElement()
+            push(out, elem)
+          // demand consumed by push — exit and wait for the next onPull
+          case Register(id, callback) =>
+            demands.put(id, new InputState(callback))
+            tryProcessNext(firstAttempt = true)
+          case Deregister(id) =>
+            demands.remove(id)
+            if (drainingEnabled && draining) tryCompleteOnDraining()
+            tryProcessNext(firstAttempt = true)
+        }
       } else {
         needWakeup = true
         // additional poll() to grab any elements that might missed the 
needWakeup


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to