This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 750bf235f1 chore: Fuse in and out handler for setupStage (#2259)
750bf235f1 is described below

commit 750bf235f1df8e7aaef95314c856fa519cb31ba7
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Sep 22 17:16:02 2025 +0800

    chore: Fuse in and out handler for setupStage (#2259)
---
 .../remove-deprecated-methods.excludes             |   3 +-
 .../org/apache/pekko/stream/impl/SetupStage.scala  | 139 +++++++++------------
 2 files changed, 60 insertions(+), 82 deletions(-)

diff --git 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
index c9b81f0149..a4007bd8de 100644
--- 
a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
+++ 
b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes
@@ -200,4 +200,5 @@ 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.Acto
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.isShutdown")
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.system")
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.ActorMaterializer.this")
-
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.SetupStage$")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
index ac47aa4afc..966fd14d56 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/SetupStage.scala
@@ -46,33 +46,43 @@ import pekko.stream.stage.OutHandler
   }
 
   @scala.annotation.nowarn("msg=inferred structural type")
-  private def createStageLogic(matPromise: Promise[M]) = new 
GraphStageLogic(shape) {
-    import SetupStage._
-
-    val subInlet = new SubSinkInlet[U]("SetupFlowStage")
-    val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
-
-    subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), 
fail(out, _), subInlet))
-    subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, 
cause)))
-
-    setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet))
-    setHandler(out, delegateToSubInlet(subInlet))
-
-    override def preStart(): Unit = {
-      try {
-        val flow = factory(materializer, attributes)
-
-        val mat = subFusingMaterializer.materialize(
-          
Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
-          attributes)
-        matPromise.success(mat)
-      } catch {
-        case NonFatal(ex) =>
-          matPromise.failure(ex)
-          throw ex
+  private def createStageLogic(matPromise: Promise[M]) =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private val subInlet = new SubSinkInlet[U]("SetupFlowStage")
+      private val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
+      private val subletInOutHandler = new InHandler with OutHandler {
+        override def onPush(): Unit = push(out, subInlet.grab())
+        override def onUpstreamFinish(): Unit = complete(out)
+        override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
+        override def onPull(): Unit = pull(in)
+        override def onDownstreamFinish(cause: Throwable): Unit = cancel(in, 
cause)
+      }
+      subInlet.setHandler(subletInOutHandler)
+      subOutlet.setHandler(subletInOutHandler)
+
+      override def onPush(): Unit = subOutlet.push(grab(in))
+      override def onUpstreamFinish(): Unit = subOutlet.complete()
+      override def onUpstreamFailure(ex: Throwable): Unit = subOutlet.fail(ex)
+      override def onPull(): Unit = subInlet.pull()
+      override def onDownstreamFinish(cause: Throwable): Unit = 
subInlet.cancel(cause)
+
+      setHandlers(in, out, this)
+
+      override def preStart(): Unit = {
+        try {
+          val flow = factory(materializer, attributes)
+
+          val mat = subFusingMaterializer.materialize(
+            
Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
+            attributes)
+          matPromise.success(mat)
+        } catch {
+          case NonFatal(ex) =>
+            matPromise.failure(ex)
+            throw ex
+        }
       }
     }
-  }
 }
 
 /** Internal Api */
@@ -90,62 +100,29 @@ import pekko.stream.stage.OutHandler
   }
 
   @scala.annotation.nowarn("msg=inferred structural type")
-  private def createStageLogic(matPromise: Promise[M]) = new 
GraphStageLogic(shape) {
-    import SetupStage._
-
-    val subInlet = new SubSinkInlet[T]("SetupSourceStage")
-    subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), 
fail(out, _), subInlet))
-    setHandler(out, delegateToSubInlet(subInlet))
-
-    override def preStart(): Unit = {
-      try {
-        val source = factory(materializer, attributes)
-
-        val mat = 
subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), 
attributes)
-        matPromise.success(mat)
-      } catch {
-        case NonFatal(ex) =>
-          matPromise.failure(ex)
-          throw ex
+  private def createStageLogic(matPromise: Promise[M]) =
+    new GraphStageLogic(shape) with OutHandler with InHandler {
+      private val subInlet = new SubSinkInlet[T]("SetupSourceStage")
+      override def onPush(): Unit = push(out, subInlet.grab())
+      override def onUpstreamFinish(): Unit = complete(out)
+      override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
+      override def onPull(): Unit = subInlet.pull()
+      override def onDownstreamFinish(cause: Throwable): Unit = 
subInlet.cancel(cause)
+
+      subInlet.setHandler(this)
+      setHandler(out, this)
+
+      override def preStart(): Unit = {
+        try {
+          val source = factory(materializer, attributes)
+
+          val mat = 
subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), 
attributes)
+          matPromise.success(mat)
+        } catch {
+          case NonFatal(ex) =>
+            matPromise.failure(ex)
+            throw ex
+        }
       }
     }
-  }
-}
-
-private object SetupStage {
-  def delegateToSubOutlet[T](grab: () => T, subOutlet: 
GraphStageLogic#SubSourceOutlet[T]) = new InHandler {
-    override def onPush(): Unit =
-      subOutlet.push(grab())
-    override def onUpstreamFinish(): Unit =
-      subOutlet.complete()
-    override def onUpstreamFailure(ex: Throwable): Unit =
-      subOutlet.fail(ex)
-  }
-
-  def delegateToOutlet[T](
-      push: T => Unit,
-      complete: () => Unit,
-      fail: Throwable => Unit,
-      subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler {
-    override def onPush(): Unit =
-      push(subInlet.grab())
-    override def onUpstreamFinish(): Unit =
-      complete()
-    override def onUpstreamFailure(ex: Throwable): Unit =
-      fail(ex)
-  }
-
-  def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new 
OutHandler {
-    override def onPull(): Unit =
-      subInlet.pull()
-    override def onDownstreamFinish(cause: Throwable): Unit =
-      subInlet.cancel(cause)
-  }
-
-  def delegateToInlet(pull: () => Unit, cancel: (Throwable) => Unit) = new 
OutHandler {
-    override def onPull(): Unit =
-      pull()
-    override def onDownstreamFinish(cause: Throwable): Unit =
-      cancel(cause)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to