This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 750bf235f1 chore: Fuse in and out handler for setupStage (#2259)
750bf235f1 is described below
commit 750bf235f1df8e7aaef95314c856fa519cb31ba7
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Sep 22 17:16:02 2025 +0800
chore: Fuse in and out handler for setupStage (#2259)
---
.../remove-deprecated-methods.excludes | 3 +-
.../org/apache/pekko/stream/impl/SetupStage.scala | 139 +++++++++------------
2 files changed, 60 insertions(+), 82 deletions(-)
diff --git
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
index c9b81f0149..a4007bd8de 100644
---
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
+++
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -200,4 +200,5 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Acto
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.isShutdown")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.system")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.this")
-
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage$")
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
index ac47aa4afc..966fd14d56 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
@@ -46,33 +46,43 @@ import pekko.stream.stage.OutHandler
}
@scala.annotation.nowarn("msg=inferred structural type")
- private def createStageLogic(matPromise: Promise[M]) = new
GraphStageLogic(shape) {
- import SetupStage._
-
- val subInlet = new SubSinkInlet[U]("SetupFlowStage")
- val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
-
- subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out),
fail(out, _), subInlet))
- subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in,
cause)))
-
- setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet))
- setHandler(out, delegateToSubInlet(subInlet))
-
- override def preStart(): Unit = {
- try {
- val flow = factory(materializer, attributes)
-
- val mat = subFusingMaterializer.materialize(
-
Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
- attributes)
- matPromise.success(mat)
- } catch {
- case NonFatal(ex) =>
- matPromise.failure(ex)
- throw ex
+ private def createStageLogic(matPromise: Promise[M]) =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private val subInlet = new SubSinkInlet[U]("SetupFlowStage")
+ private val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
+ private val subletInOutHandler = new InHandler with OutHandler {
+ override def onPush(): Unit = push(out, subInlet.grab())
+ override def onUpstreamFinish(): Unit = complete(out)
+ override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
+ override def onPull(): Unit = pull(in)
+ override def onDownstreamFinish(cause: Throwable): Unit = cancel(in,
cause)
+ }
+ subInlet.setHandler(subletInOutHandler)
+ subOutlet.setHandler(subletInOutHandler)
+
+ override def onPush(): Unit = subOutlet.push(grab(in))
+ override def onUpstreamFinish(): Unit = subOutlet.complete()
+ override def onUpstreamFailure(ex: Throwable): Unit = subOutlet.fail(ex)
+ override def onPull(): Unit = subInlet.pull()
+ override def onDownstreamFinish(cause: Throwable): Unit =
subInlet.cancel(cause)
+
+ setHandlers(in, out, this)
+
+ override def preStart(): Unit = {
+ try {
+ val flow = factory(materializer, attributes)
+
+ val mat = subFusingMaterializer.materialize(
+
Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
+ attributes)
+ matPromise.success(mat)
+ } catch {
+ case NonFatal(ex) =>
+ matPromise.failure(ex)
+ throw ex
+ }
}
}
- }
}
/** Internal Api */
@@ -90,62 +100,29 @@ import pekko.stream.stage.OutHandler
}
@scala.annotation.nowarn("msg=inferred structural type")
- private def createStageLogic(matPromise: Promise[M]) = new
GraphStageLogic(shape) {
- import SetupStage._
-
- val subInlet = new SubSinkInlet[T]("SetupSourceStage")
- subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out),
fail(out, _), subInlet))
- setHandler(out, delegateToSubInlet(subInlet))
-
- override def preStart(): Unit = {
- try {
- val source = factory(materializer, attributes)
-
- val mat =
subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)),
attributes)
- matPromise.success(mat)
- } catch {
- case NonFatal(ex) =>
- matPromise.failure(ex)
- throw ex
+ private def createStageLogic(matPromise: Promise[M]) =
+ new GraphStageLogic(shape) with OutHandler with InHandler {
+ private val subInlet = new SubSinkInlet[T]("SetupSourceStage")
+ override def onPush(): Unit = push(out, subInlet.grab())
+ override def onUpstreamFinish(): Unit = complete(out)
+ override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
+ override def onPull(): Unit = subInlet.pull()
+ override def onDownstreamFinish(cause: Throwable): Unit =
subInlet.cancel(cause)
+
+ subInlet.setHandler(this)
+ setHandler(out, this)
+
+ override def preStart(): Unit = {
+ try {
+ val source = factory(materializer, attributes)
+
+ val mat =
subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)),
attributes)
+ matPromise.success(mat)
+ } catch {
+ case NonFatal(ex) =>
+ matPromise.failure(ex)
+ throw ex
+ }
}
}
- }
-}
-
-private object SetupStage {
- def delegateToSubOutlet[T](grab: () => T, subOutlet:
GraphStageLogic#SubSourceOutlet[T]) = new InHandler {
- override def onPush(): Unit =
- subOutlet.push(grab())
- override def onUpstreamFinish(): Unit =
- subOutlet.complete()
- override def onUpstreamFailure(ex: Throwable): Unit =
- subOutlet.fail(ex)
- }
-
- def delegateToOutlet[T](
- push: T => Unit,
- complete: () => Unit,
- fail: Throwable => Unit,
- subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler {
- override def onPush(): Unit =
- push(subInlet.grab())
- override def onUpstreamFinish(): Unit =
- complete()
- override def onUpstreamFailure(ex: Throwable): Unit =
- fail(ex)
- }
-
- def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new
OutHandler {
- override def onPull(): Unit =
- subInlet.pull()
- override def onDownstreamFinish(cause: Throwable): Unit =
- subInlet.cancel(cause)
- }
-
- def delegateToInlet(pull: () => Unit, cancel: (Throwable) => Unit) = new
OutHandler {
- override def onPull(): Unit =
- pull()
- override def onDownstreamFinish(cause: Throwable): Unit =
- cancel(cause)
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]