This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git


The following commit(s) were added to refs/heads/main by this push:
     new cf6e289a6 chore: fuse in/out handler to logic (#834)
cf6e289a6 is described below

commit cf6e289a6ed07442f367bd8bac5dc44ff30a6650
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Oct 11 00:38:20 2025 +0800

    chore: fuse in/out handler to logic (#834)
    
    * chore: fuse in/out handler to logic
    
    * .
---
 .../http/impl/engine/http2/ProtocolSwitch.scala    |  6 +-
 .../rendering/HttpResponseRendererFactory.scala    | 52 +++++++--------
 .../http/impl/engine/ws/FrameEventRenderer.scala   | 76 +++++++++++-----------
 .../pekko/http/impl/engine/ws/WebSocket.scala      | 45 ++++++-------
 .../org/apache/pekko/http/impl/util/package.scala  | 73 ++++++++++-----------
 5 files changed, 119 insertions(+), 133 deletions(-)

diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
index fa54b75a9..eb540e397 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
@@ -73,11 +73,7 @@ private[http] object ProtocolSwitch {
                   }
               })
 
-            private val ignorePull = new OutHandler {
-              def onPull(): Unit = ()
-            }
-
-            setHandler(netOut, ignorePull)
+            setHandler(netOut, GraphStageLogic.EagerTerminateOutput)
 
             def install(serverImplementation: HttpImplementation, 
firstElement: SslTlsInbound): Unit = {
               val networkSide = Flow.fromSinkAndSource(serverDataIn.sink, 
serverDataOut.source)
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
index 5946b516a..1a4b58e0d 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
@@ -59,41 +59,41 @@ private[http] class HttpResponseRendererFactory(
     val shape: FlowShape[ResponseRenderingContext, ResponseRenderingOutput] = 
FlowShape(in, out)
 
     def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
-      new GraphStageLogic(shape) {
+      new GraphStageLogic(shape) with InHandler {
         var closeMode: CloseMode = DontClose // signals what to do after the 
current response
         def close: Boolean = closeMode != DontClose
         def closeIf(cond: Boolean): Unit = if (cond) closeMode = 
CloseConnection
         var transferSink: Option[SubSinkInlet[ByteString]] = None
         def transferring: Boolean = transferSink.isDefined
 
-        setHandler(in,
-          new InHandler {
-            override def onPush(): Unit =
-              render(grab(in)) match {
-                case Strict(outElement) =>
-                  push(out, outElement)
-                  if (close) completeStage()
-                case HeadersAndStreamedEntity(headerData, outStream) =>
-                  try transfer(headerData, outStream)
-                  catch {
-                    case NonFatal(e) =>
-                      log.error(e,
-                        s"Rendering of response failed because response entity 
stream materialization failed with '${e.getMessage}'. Sending out 500 response 
instead.")
-                      push(out,
-                        render(ResponseRenderingContext(HttpResponse(500,
-                          entity = 
StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes)
-                  }
+        override def onPush(): Unit =
+          render(grab(in)) match {
+            case Strict(outElement) =>
+              push(out, outElement)
+              if (close) completeStage()
+            case HeadersAndStreamedEntity(headerData, outStream) =>
+              try transfer(headerData, outStream)
+              catch {
+                case NonFatal(e) =>
+                  log.error(e,
+                    s"Rendering of response failed because response entity 
stream materialization failed with '${e.getMessage}'. Sending out 500 response 
instead.")
+                  push(out,
+                    render(ResponseRenderingContext(HttpResponse(500,
+                      entity = 
StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes)
               }
+          }
 
-            override def onUpstreamFinish(): Unit =
-              if (transferring) closeMode = CloseConnection
-              else completeStage()
+        override def onUpstreamFinish(): Unit =
+          if (transferring) closeMode = CloseConnection
+          else completeStage()
+
+        override def onUpstreamFailure(ex: Throwable): Unit = {
+          stopTransfer()
+          failStage(ex)
+        }
+
+        setHandler(in, this)
 
-            override def onUpstreamFailure(ex: Throwable): Unit = {
-              stopTransfer()
-              failStage(ex)
-            }
-          })
         private val waitForDemandHandler = new OutHandler {
           def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
         }
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
index 603be504e..8c1051d1b 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
@@ -32,52 +32,52 @@ private[http] final class FrameEventRenderer extends 
GraphStage[FlowShape[FrameE
   val out = Outlet[ByteString]("FrameEventRenderer.out")
   override val shape = FlowShape(in, out)
 
-  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with OutHandler {
 
-    val Initial = new InHandler {
-      override def onPush(): Unit = grab(in) match {
-        case start @ FrameStart(header, data) =>
-          require(header.length >= data.size)
-          if (!start.lastPart && header.length > 0)
-            setHandler(in, renderData(header.length - data.length, this))
+      val Initial = new InHandler {
+        override def onPush(): Unit = grab(in) match {
+          case start @ FrameStart(header, data) =>
+            require(header.length >= data.size)
+            if (!start.lastPart && header.length > 0)
+              setHandler(in, renderData(header.length - data.length, this))
 
-          push(out, renderStart(start))
+            push(out, renderStart(start))
 
-        case f: FrameData =>
-          fail(out, new IllegalStateException("unexpected FrameData (need 
FrameStart first)"))
+          case _: FrameData =>
+            fail(out, new IllegalStateException("unexpected FrameData (need 
FrameStart first)"))
+        }
       }
-    }
 
-    def renderData(initialRemaining: Long, nextState: InHandler): InHandler =
-      new InHandler {
-        var remaining: Long = initialRemaining
-
-        override def onPush(): Unit = {
-          grab(in) match {
-            case FrameData(data, lastPart) =>
-              if (data.size > remaining)
-                throw new IllegalStateException(s"Expected $remaining frame 
bytes but got ${data.size}")
-              else if (data.size == remaining) {
-                if (!lastPart) throw new IllegalStateException(s"Frame data 
complete but `lastPart` flag not set")
-                setHandler(in, nextState)
-                push(out, data)
-              } else {
-                remaining -= data.size
-                push(out, data)
-              }
-
-            case f: FrameStart =>
-              fail(out, new IllegalStateException("unexpected FrameStart (need 
more FrameData first)"))
+      def renderData(initialRemaining: Long, nextState: InHandler): InHandler =
+        new InHandler {
+          var remaining: Long = initialRemaining
+
+          override def onPush(): Unit = {
+            grab(in) match {
+              case FrameData(data, lastPart) =>
+                if (data.size > remaining)
+                  throw new IllegalStateException(s"Expected $remaining frame 
bytes but got ${data.size}")
+                else if (data.size == remaining) {
+                  if (!lastPart) throw new IllegalStateException(s"Frame data 
complete but `lastPart` flag not set")
+                  setHandler(in, nextState)
+                  push(out, data)
+                } else {
+                  remaining -= data.size
+                  push(out, data)
+                }
+
+              case f: FrameStart =>
+                fail(out, new IllegalStateException("unexpected FrameStart 
(need more FrameData first)"))
+            }
           }
         }
-      }
 
-    setHandler(in, Initial)
-    setHandler(out,
-      new OutHandler {
-        override def onPull(): Unit = pull(in)
-      })
-  }
+      override def onPull(): Unit = pull(in)
+
+      setHandler(in, Initial)
+      setHandler(out, this)
+    }
 
   private def renderStart(start: FrameStart): ByteString = 
renderHeader(start.header) ++ start.data
 
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
index 9ca214a34..5ac882f0d 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
@@ -225,26 +225,24 @@ private[http] object WebSocket {
 
     val shape = new FanOutShape2(outputIn, bypassOut, messageOut)
 
-    def createLogic(effectiveAttributes: Attributes) = new 
GraphStageLogic(shape) {
+    def createLogic(effectiveAttributes: Attributes) = new 
GraphStageLogic(shape) with InHandler {
+
+      override def onPush(): Unit = {
+        grab(outputIn) match {
+          case b: BypassEvent with MessagePart => emit(bypassOut, b, () => 
emit(messageOut, b, pullIn))
+          case b: BypassEvent                  => emit(bypassOut, b, pullIn)
+          case m: MessagePart                  => emit(messageOut, m, pullIn)
+        }
+      }
 
-      setHandler(outputIn,
-        new InHandler {
-          override def onPush(): Unit = {
-            grab(outputIn) match {
-              case b: BypassEvent with MessagePart => emit(bypassOut, b, () => 
emit(messageOut, b, pullIn))
-              case b: BypassEvent                  => emit(bypassOut, b, 
pullIn)
-              case m: MessagePart                  => emit(messageOut, m, 
pullIn)
-            }
-          }
-        })
       val pullIn = () => tryPull(outputIn)
 
+      setHandler(outputIn, this)
+
       setHandler(bypassOut, eagerTerminateOutput)
       setHandler(messageOut, ignoreTerminateOutput)
 
-      override def preStart(): Unit = {
-        pullIn()
-      }
+      override def preStart(): Unit = tryPull(outputIn)
     }
   }
 
@@ -288,18 +286,13 @@ private[http] object WebSocket {
 
     val shape = new FlowShape(in, out)
 
-    def createLogic(effectiveAttributes: Attributes) = new 
GraphStageLogic(shape) {
-      setHandler(out,
-        new OutHandler {
-          override def onPull(): Unit = pull(in)
-        })
-      setHandler(in,
-        new InHandler {
-          override def onPush(): Unit = push(out, grab(in))
-          override def onUpstreamFinish(): Unit = emit(out, 
UserHandlerCompleted, () => completeStage())
-          override def onUpstreamFailure(ex: Throwable): Unit =
-            emit(out, UserHandlerErredOut(ex), () => completeStage())
-        })
+    def createLogic(effectiveAttributes: Attributes) = new 
GraphStageLogic(shape) with InHandler with OutHandler {
+      override def onPull(): Unit = pull(in)
+      override def onPush(): Unit = push(out, grab(in))
+      override def onUpstreamFinish(): Unit = emit(out, UserHandlerCompleted, 
() => completeStage())
+      override def onUpstreamFailure(ex: Throwable): Unit = emit(out, 
UserHandlerErredOut(ex), () => completeStage())
+
+      setHandlers(in, out, this)
     }
   }
 
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala 
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
index feef415f7..3ffe461bb 100644
--- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
+++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
@@ -126,46 +126,43 @@ package util {
 
     override val shape = FlowShape(byteStringIn, httpEntityOut)
 
-    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
= new TimerGraphStageLogic(shape) {
-      val bytes = ByteString.newBuilder
-      private var emptyStream = false
-
-      override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer", 
timeout)
-
-      setHandler(httpEntityOut,
-        new OutHandler {
-          override def onPull(): Unit = {
-            if (emptyStream) {
-              push(httpEntityOut, HttpEntity.Strict(contentType, 
ByteString.empty))
-              completeStage()
-            } else pull(byteStringIn)
-          }
-        })
-
-      setHandler(byteStringIn,
-        new InHandler {
-          override def onPush(): Unit = {
-            bytes ++= grab(byteStringIn)
-            maxBytes match {
-              case Some(max) if bytes.length > max =>
-                failStage(new EntityStreamException(new ErrorInfo("Request too 
large",
-                  s"Request was longer than the maximum of $max")))
-              case _ =>
-                pull(byteStringIn)
-            }
-          }
-          override def onUpstreamFinish(): Unit = {
-            if (isAvailable(httpEntityOut)) {
-              push(httpEntityOut, HttpEntity.Strict(contentType, 
bytes.result()))
-              completeStage()
-            } else emptyStream = true
+    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
=
+      new TimerGraphStageLogic(shape) with InHandler with OutHandler {
+        private val bytes = ByteString.newBuilder
+        private var emptyStream = false
+
+        override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer", 
timeout)
+
+        override def onPull(): Unit = {
+          if (emptyStream) {
+            push(httpEntityOut, HttpEntity.Strict(contentType, 
ByteString.empty))
+            completeStage()
+          } else pull(byteStringIn)
+        }
+
+        override def onPush(): Unit = {
+          bytes ++= grab(byteStringIn)
+          maxBytes match {
+            case Some(max) if bytes.length > max =>
+              failStage(new EntityStreamException(new ErrorInfo("Request too 
large",
+                s"Request was longer than the maximum of $max")))
+            case _ =>
+              pull(byteStringIn)
           }
-        })
+        }
+        override def onUpstreamFinish(): Unit = {
+          if (isAvailable(httpEntityOut)) {
+            push(httpEntityOut, HttpEntity.Strict(contentType, bytes.result()))
+            completeStage()
+          } else emptyStream = true
+        }
 
-      override def onTimer(key: Any): Unit =
-        failStage(new java.util.concurrent.TimeoutException(
-          s"HttpEntity.toStrict timed out after $timeout while still waiting 
for outstanding data"))
-    }
+        setHandlers(byteStringIn, httpEntityOut, this)
+
+        override def onTimer(key: Any): Unit =
+          failStage(new java.util.concurrent.TimeoutException(
+            s"HttpEntity.toStrict timed out after $timeout while still waiting 
for outstanding data"))
+      }
 
     override def toString = "ToStrict"
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to