This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new b13d68f622 refactor: inline onEvent into MergeHub.tryProcessNext
(#2901)
b13d68f622 is described below
commit b13d68f62218cf3e1679d50ac9af699fe084cb37
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 10 03:51:22 2026 +0800
refactor: inline onEvent into MergeHub.tryProcessNext (#2901)
Drop the Boolean-returning onEvent helper; pattern-match the event
directly inside tryProcessNext. Element pushes and exits, Register and
Deregister recurse via @tailrec to drain control messages from the
queue. Same protocol, no behavior change.
---
.../org/apache/pekko/stream/scaladsl/Hub.scala | 29 ++++++++++------------
1 file changed, 13 insertions(+), 16 deletions(-)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
index 7a8adf7c53..4b6ca0063e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala
@@ -201,21 +201,6 @@ private[pekko] class MergeHub[T](perProducerBufferSize:
Int, drainingEnabled: Bo
setHandler(out, this)
- // Returns true when we have not consumed demand, false otherwise
- private def onEvent(ev: Event): Boolean = ev match {
- case Element(id, elem) =>
- demands(id).onElement()
- push(out, elem)
- false
- case Register(id, callback) =>
- demands.put(id, new InputState(callback))
- true
- case Deregister(id) =>
- demands.remove(id)
- if (drainingEnabled && draining) tryCompleteOnDraining()
- true
- }
-
private def tryCompleteOnDraining(): Unit = {
if (demands.isEmpty && (queue.peek() eq null)) {
completeStage()
@@ -235,7 +220,19 @@ private[pekko] class MergeHub[T](perProducerBufferSize:
Int, drainingEnabled: Bo
// queue, but then we will eventually reach the Deregister message, too.
if (nextElem ne null) {
needWakeup = false
- if (onEvent(nextElem)) tryProcessNext(firstAttempt = true)
+ nextElem match {
+ case Element(id, elem) =>
+ demands(id).onElement()
+ push(out, elem)
+ // demand consumed by push — exit and wait for the next onPull
+ case Register(id, callback) =>
+ demands.put(id, new InputState(callback))
+ tryProcessNext(firstAttempt = true)
+ case Deregister(id) =>
+ demands.remove(id)
+ if (drainingEnabled && draining) tryCompleteOnDraining()
+ tryProcessNext(firstAttempt = true)
+ }
} else {
needWakeup = true
// additional poll() to grab any elements that might missed the
needWakeup
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]