This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new d2d1902ce4 =str Fold InHandler and OutHandler into GraphStageLogic for
wiretap.
d2d1902ce4 is described below
commit d2d1902ce40309703b6a40373c810c5fddc2841e
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 2 21:13:48 2023 +0800
=str Fold InHandler and OutHandler into GraphStageLogic for wiretap.
---
.../org/apache/pekko/stream/scaladsl/Graph.scala | 88 +++++++++++-----------
1 file changed, 42 insertions(+), 46 deletions(-)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
index b3d0ec2591..f5a5b92bfd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
@@ -710,61 +710,57 @@ private[stream] final class WireTap[T] extends
GraphStage[FanOutShape2[T, T, T]]
val in: Inlet[T] = Inlet[T]("WireTap.in")
val outMain: Outlet[T] = Outlet[T]("WireTap.outMain")
val outTap: Outlet[T] = Outlet[T]("WireTap.outTap")
- override def initialAttributes = DefaultAttributes.wireTap
+ override def initialAttributes: Attributes = DefaultAttributes.wireTap
override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain,
outTap)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
- private var pendingTap: Option[T] = None
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private var pendingTap: Option[T] = None
- setHandler(in,
- new InHandler {
- override def onPush() = {
- val elem = grab(in)
- push(outMain, elem)
- if (isAvailable(outTap)) {
- push(outTap, elem)
- } else {
- pendingTap = Some(elem)
- }
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ push(outMain, elem)
+ if (isAvailable(outTap)) {
+ push(outTap, elem)
+ } else {
+ pendingTap = Some(elem)
}
- })
+ }
+ override def onPull(): Unit = {
+ pull(in)
+ }
- setHandler(outMain,
- new OutHandler {
- override def onPull() = {
- pull(in)
- }
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ cancelStage(cause)
+ }
- override def onDownstreamFinish(cause: Throwable): Unit = {
- cancelStage(cause)
- }
- })
+ // The 'tap' output can neither backpressure, nor cancel, the stage.
+ setHandler(
+ outTap,
+ new OutHandler {
+ override def onPull() = {
+ pendingTap match {
+ case Some(elem) =>
+ push(outTap, elem)
+ pendingTap = None
+ case None => // no pending element to emit
+ }
+ }
- // The 'tap' output can neither backpressure, nor cancel, the stage.
- setHandler(
- outTap,
- new OutHandler {
- override def onPull() = {
- pendingTap match {
- case Some(elem) =>
- push(outTap, elem)
- pendingTap = None
- case None => // no pending element to emit
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ setHandler(in,
+ new InHandler {
+ override def onPush() = {
+ push(outMain, grab(in))
+ }
+ })
+ // Allow any outstanding element to be garbage-collected
+ pendingTap = None
}
- }
+ })
- override def onDownstreamFinish(cause: Throwable): Unit = {
- setHandler(in,
- new InHandler {
- override def onPush() = {
- push(outMain, grab(in))
- }
- })
- // Allow any outstanding element to be garbage-collected
- pendingTap = None
- }
- })
- }
+ setHandlers(in, outMain, this)
+ }
override def toString = "WireTap"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]