This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fecd517d48 =str Fold InHandler and outHandler for
UniqueBidiKillSwitchStage.
fecd517d48 is described below
commit fecd517d482207780779745c9c22e91e1be0c7fa
Author: He-Pin <[email protected]>
AuthorDate: Mon Dec 18 12:17:03 2023 +0800
=str Fold InHandler and outHandler for UniqueBidiKillSwitchStage.
---
.../scala/org/apache/pekko/stream/KillSwitch.scala | 20 +++++++-------------
1 file changed, 7 insertions(+), 13 deletions(-)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala
b/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala
index ca169e6309..2d7f8a19c3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala
@@ -118,29 +118,23 @@ object KillSwitches {
val logic = new KillableGraphStageLogic(promise.future, shape) {
- setHandler(shape.in1,
- new InHandler {
+ setHandlers(shape.in1, shape.out1,
+ new InHandler with OutHandler {
override def onPush(): Unit = push(shape.out1, grab(shape.in1))
override def onUpstreamFinish(): Unit = complete(shape.out1)
override def onUpstreamFailure(ex: Throwable): Unit =
fail(shape.out1, ex)
+ override def onPull(): Unit = pull(shape.in1)
+ override def onDownstreamFinish(cause: Throwable): Unit =
cancel(shape.in1, cause)
})
- setHandler(shape.in2,
- new InHandler {
+
+ setHandlers(shape.in2, shape.out2,
+ new InHandler with OutHandler {
override def onPush(): Unit = push(shape.out2, grab(shape.in2))
override def onUpstreamFinish(): Unit = complete(shape.out2)
override def onUpstreamFailure(ex: Throwable): Unit =
fail(shape.out2, ex)
- })
- setHandler(shape.out1,
- new OutHandler {
- override def onPull(): Unit = pull(shape.in1)
- override def onDownstreamFinish(cause: Throwable): Unit =
cancel(shape.in1, cause)
- })
- setHandler(shape.out2,
- new OutHandler {
override def onPull(): Unit = pull(shape.in2)
override def onDownstreamFinish(cause: Throwable): Unit =
cancel(shape.in2, cause)
})
-
}
(logic, switch)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]