This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch fuse in repository https://gitbox.apache.org/repos/asf/pekko-http.git
commit 0def8222f9eb3c52f912ad5cc64cbbfeb5b24a3e Author: He-Pin <[email protected]> AuthorDate: Sat Oct 11 00:15:10 2025 +0800 chore: fuse in/out handler to logic --- .../http/impl/engine/http2/ProtocolSwitch.scala | 6 +- .../rendering/HttpResponseRendererFactory.scala | 52 +++++++-------- .../http/impl/engine/ws/FrameEventRenderer.scala | 76 +++++++++++----------- .../pekko/http/impl/engine/ws/WebSocket.scala | 50 ++++++-------- .../org/apache/pekko/http/impl/util/package.scala | 73 ++++++++++----------- 5 files changed, 121 insertions(+), 136 deletions(-) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala index fa54b75a9..eb540e397 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala @@ -73,11 +73,7 @@ private[http] object ProtocolSwitch { } }) - private val ignorePull = new OutHandler { - def onPull(): Unit = () - } - - setHandler(netOut, ignorePull) + setHandler(netOut, GraphStageLogic.EagerTerminateOutput) def install(serverImplementation: HttpImplementation, firstElement: SslTlsInbound): Unit = { val networkSide = Flow.fromSinkAndSource(serverDataIn.sink, serverDataOut.source) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala index 5946b516a..1a4b58e0d 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala @@ -59,41 +59,41 @@ private[http] class HttpResponseRendererFactory( val shape: FlowShape[ResponseRenderingContext, ResponseRenderingOutput] = FlowShape(in, out) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { + new GraphStageLogic(shape) with InHandler { var closeMode: CloseMode = DontClose // signals what to do after the current response def close: Boolean = closeMode != DontClose def closeIf(cond: Boolean): Unit = if (cond) closeMode = CloseConnection var transferSink: Option[SubSinkInlet[ByteString]] = None def transferring: Boolean = transferSink.isDefined - setHandler(in, - new InHandler { - override def onPush(): Unit = - render(grab(in)) match { - case Strict(outElement) => - push(out, outElement) - if (close) completeStage() - case HeadersAndStreamedEntity(headerData, outStream) => - try transfer(headerData, outStream) - catch { - case NonFatal(e) => - log.error(e, - s"Rendering of response failed because response entity stream materialization failed with '${e.getMessage}'. Sending out 500 response instead.") - push(out, - render(ResponseRenderingContext(HttpResponse(500, - entity = StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes) - } + override def onPush(): Unit = + render(grab(in)) match { + case Strict(outElement) => + push(out, outElement) + if (close) completeStage() + case HeadersAndStreamedEntity(headerData, outStream) => + try transfer(headerData, outStream) + catch { + case NonFatal(e) => + log.error(e, + s"Rendering of response failed because response entity stream materialization failed with '${e.getMessage}'. Sending out 500 response instead.") + push(out, + render(ResponseRenderingContext(HttpResponse(500, + entity = StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes) } + } - override def onUpstreamFinish(): Unit = - if (transferring) closeMode = CloseConnection - else completeStage() + override def onUpstreamFinish(): Unit = + if (transferring) closeMode = CloseConnection + else completeStage() + + override def onUpstreamFailure(ex: Throwable): Unit = { + stopTransfer() + failStage(ex) + } + + setHandler(in, this) - override def onUpstreamFailure(ex: Throwable): Unit = { - stopTransfer() - failStage(ex) - } - }) private val waitForDemandHandler = new OutHandler { def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in) } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala index 603be504e..8c1051d1b 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala @@ -32,52 +32,52 @@ private[http] final class FrameEventRenderer extends GraphStage[FlowShape[FrameE val out = Outlet[ByteString]("FrameEventRenderer.out") override val shape = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { - val Initial = new InHandler { - override def onPush(): Unit = grab(in) match { - case start @ FrameStart(header, data) => - require(header.length >= data.size) - if (!start.lastPart && header.length > 0) - setHandler(in, renderData(header.length - data.length, this)) + val Initial = new InHandler { + override def onPush(): Unit = grab(in) match { + case start @ FrameStart(header, data) => + require(header.length >= data.size) + if (!start.lastPart && header.length > 0) + setHandler(in, renderData(header.length - data.length, this)) - push(out, renderStart(start)) + push(out, renderStart(start)) - case f: FrameData => - fail(out, new IllegalStateException("unexpected FrameData (need FrameStart first)")) + case _: FrameData => + fail(out, new IllegalStateException("unexpected FrameData (need FrameStart first)")) + } } - } - def renderData(initialRemaining: Long, nextState: InHandler): InHandler = - new InHandler { - var remaining: Long = initialRemaining - - override def onPush(): Unit = { - grab(in) match { - case FrameData(data, lastPart) => - if (data.size > remaining) - throw new IllegalStateException(s"Expected $remaining frame bytes but got ${data.size}") - else if (data.size == remaining) { - if (!lastPart) throw new IllegalStateException(s"Frame data complete but `lastPart` flag not set") - setHandler(in, nextState) - push(out, data) - } else { - remaining -= data.size - push(out, data) - } - - case f: FrameStart => - fail(out, new IllegalStateException("unexpected FrameStart (need more FrameData first)")) + def renderData(initialRemaining: Long, nextState: InHandler): InHandler = + new InHandler { + var remaining: Long = initialRemaining + + override def onPush(): Unit = { + grab(in) match { + case FrameData(data, lastPart) => + if (data.size > remaining) + throw new IllegalStateException(s"Expected $remaining frame bytes but got ${data.size}") + else if (data.size == remaining) { + if (!lastPart) throw new IllegalStateException(s"Frame data complete but `lastPart` flag not set") + setHandler(in, nextState) + push(out, data) + } else { + remaining -= data.size + push(out, data) + } + + case f: FrameStart => + fail(out, new IllegalStateException("unexpected FrameStart (need more FrameData first)")) + } } } - } - setHandler(in, Initial) - setHandler(out, - new OutHandler { - override def onPull(): Unit = pull(in) - }) - } + override def onPull(): Unit = pull(in) + + setHandler(in, Initial) + setHandler(out, this) + } private def renderStart(start: FrameStart): ByteString = renderHeader(start.header) ++ start.data diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala index 9ca214a34..41c17cb00 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala @@ -225,27 +225,24 @@ private[http] object WebSocket { val shape = new FanOutShape2(outputIn, bypassOut, messageOut) - def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { - - setHandler(outputIn, - new InHandler { - override def onPush(): Unit = { - grab(outputIn) match { - case b: BypassEvent with MessagePart => emit(bypassOut, b, () => emit(messageOut, b, pullIn)) - case b: BypassEvent => emit(bypassOut, b, pullIn) - case m: MessagePart => emit(messageOut, m, pullIn) - } + def createLogic(effectiveAttributes: Attributes) = + new GraphStageLogic(shape) with InHandler { + + override def onPush(): Unit = { + grab(outputIn) match { + case b: BypassEvent with MessagePart => emit(bypassOut, b, () => emit(messageOut, b, pullIn)) + case b: BypassEvent => emit(bypassOut, b, pullIn) + case m: MessagePart => emit(messageOut, m, pullIn) } - }) - val pullIn = () => tryPull(outputIn) + } - setHandler(bypassOut, eagerTerminateOutput) - setHandler(messageOut, ignoreTerminateOutput) + setHandler(outputIn, this) - override def preStart(): Unit = { - pullIn() + setHandler(bypassOut, eagerTerminateOutput) + setHandler(messageOut, ignoreTerminateOutput) + + override def preStart(): Unit = tryPull(outputIn) } - } } private case object BypassMerge extends GraphStage[FanInShape3[BypassEvent, AnyRef, Tick.type, AnyRef]] { @@ -288,18 +285,13 @@ private[http] object WebSocket { val shape = new FlowShape(in, out) - def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) { - setHandler(out, - new OutHandler { - override def onPull(): Unit = pull(in) - }) - setHandler(in, - new InHandler { - override def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFinish(): Unit = emit(out, UserHandlerCompleted, () => completeStage()) - override def onUpstreamFailure(ex: Throwable): Unit = - emit(out, UserHandlerErredOut(ex), () => completeStage()) - }) + def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPull(): Unit = pull(in) + override def onPush(): Unit = push(out, grab(in)) + override def onUpstreamFinish(): Unit = emit(out, UserHandlerCompleted, () => completeStage()) + override def onUpstreamFailure(ex: Throwable): Unit = emit(out, UserHandlerErredOut(ex), () => completeStage()) + + setHandlers(in, out, this) } } diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala index feef415f7..3ffe461bb 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala @@ -126,46 +126,43 @@ package util { override val shape = FlowShape(byteStringIn, httpEntityOut) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - val bytes = ByteString.newBuilder - private var emptyStream = false - - override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer", timeout) - - setHandler(httpEntityOut, - new OutHandler { - override def onPull(): Unit = { - if (emptyStream) { - push(httpEntityOut, HttpEntity.Strict(contentType, ByteString.empty)) - completeStage() - } else pull(byteStringIn) - } - }) - - setHandler(byteStringIn, - new InHandler { - override def onPush(): Unit = { - bytes ++= grab(byteStringIn) - maxBytes match { - case Some(max) if bytes.length > max => - failStage(new EntityStreamException(new ErrorInfo("Request too large", - s"Request was longer than the maximum of $max"))) - case _ => - pull(byteStringIn) - } - } - override def onUpstreamFinish(): Unit = { - if (isAvailable(httpEntityOut)) { - push(httpEntityOut, HttpEntity.Strict(contentType, bytes.result())) - completeStage() - } else emptyStream = true + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private val bytes = ByteString.newBuilder + private var emptyStream = false + + override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer", timeout) + + override def onPull(): Unit = { + if (emptyStream) { + push(httpEntityOut, HttpEntity.Strict(contentType, ByteString.empty)) + completeStage() + } else pull(byteStringIn) + } + + override def onPush(): Unit = { + bytes ++= grab(byteStringIn) + maxBytes match { + case Some(max) if bytes.length > max => + failStage(new EntityStreamException(new ErrorInfo("Request too large", + s"Request was longer than the maximum of $max"))) + case _ => + pull(byteStringIn) } - }) + } + override def onUpstreamFinish(): Unit = { + if (isAvailable(httpEntityOut)) { + push(httpEntityOut, HttpEntity.Strict(contentType, bytes.result())) + completeStage() + } else emptyStream = true + } - override def onTimer(key: Any): Unit = - failStage(new java.util.concurrent.TimeoutException( - s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")) - } + setHandlers(byteStringIn, httpEntityOut, this) + + override def onTimer(key: Any): Unit = + failStage(new java.util.concurrent.TimeoutException( + s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")) + } override def toString = "ToStrict" } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
