This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 0eed6a128b =str Fold InHandler and OutHandler for operator Scan.
0eed6a128b is described below
commit 0eed6a128b28e8aaf866639397a2c5dfedbd6aac
Author: He-Pin <[email protected]>
AuthorDate: Sun Dec 17 15:47:32 2023 +0800
=str Fold InHandler and OutHandler for operator Scan.
---
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 2b2eb94705..5c9f45689e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -424,17 +424,10 @@ private[stream] object Collect {
import shape.{ in, out }
// Initial behavior makes sure that the zero gets flushed if upstream is
empty
- setHandler(out,
- new OutHandler {
- override def onPull(): Unit = {
- push(out, aggregator)
- setHandlers(in, out, self)
- }
- })
-
- setHandler(
+ setHandlers(
in,
- new InHandler {
+ out,
+ new InHandler with OutHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit =
@@ -445,6 +438,11 @@ private[stream] object Collect {
completeStage()
}
})
+
+ override def onPull(): Unit = {
+ push(out, aggregator)
+ setHandlers(in, out, self)
+ }
})
override def onPull(): Unit = pull(in)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]