This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git
The following commit(s) were added to refs/heads/main by this push:
new cf6e289a6 chore: fuse in/out handler to logic (#834)
cf6e289a6 is described below
commit cf6e289a6ed07442f367bd8bac5dc44ff30a6650
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Oct 11 00:38:20 2025 +0800
chore: fuse in/out handler to logic (#834)
* chore: fuse in/out handler to logic
* .
---
.../http/impl/engine/http2/ProtocolSwitch.scala | 6 +-
.../rendering/HttpResponseRendererFactory.scala | 52 +++++++--------
.../http/impl/engine/ws/FrameEventRenderer.scala | 76 +++++++++++-----------
.../pekko/http/impl/engine/ws/WebSocket.scala | 45 ++++++-------
.../org/apache/pekko/http/impl/util/package.scala | 73 ++++++++++-----------
5 files changed, 119 insertions(+), 133 deletions(-)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
index fa54b75a9..eb540e397 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/ProtocolSwitch.scala
@@ -73,11 +73,7 @@ private[http] object ProtocolSwitch {
}
})
- private val ignorePull = new OutHandler {
- def onPull(): Unit = ()
- }
-
- setHandler(netOut, ignorePull)
+ setHandler(netOut, GraphStageLogic.EagerTerminateOutput)
def install(serverImplementation: HttpImplementation,
firstElement: SslTlsInbound): Unit = {
val networkSide = Flow.fromSinkAndSource(serverDataIn.sink,
serverDataOut.source)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
index 5946b516a..1a4b58e0d 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala
@@ -59,41 +59,41 @@ private[http] class HttpResponseRendererFactory(
val shape: FlowShape[ResponseRenderingContext, ResponseRenderingOutput] =
FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) {
+ new GraphStageLogic(shape) with InHandler {
var closeMode: CloseMode = DontClose // signals what to do after the
current response
def close: Boolean = closeMode != DontClose
def closeIf(cond: Boolean): Unit = if (cond) closeMode =
CloseConnection
var transferSink: Option[SubSinkInlet[ByteString]] = None
def transferring: Boolean = transferSink.isDefined
- setHandler(in,
- new InHandler {
- override def onPush(): Unit =
- render(grab(in)) match {
- case Strict(outElement) =>
- push(out, outElement)
- if (close) completeStage()
- case HeadersAndStreamedEntity(headerData, outStream) =>
- try transfer(headerData, outStream)
- catch {
- case NonFatal(e) =>
- log.error(e,
- s"Rendering of response failed because response entity
stream materialization failed with '${e.getMessage}'. Sending out 500 response
instead.")
- push(out,
- render(ResponseRenderingContext(HttpResponse(500,
- entity =
StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes)
- }
+ override def onPush(): Unit =
+ render(grab(in)) match {
+ case Strict(outElement) =>
+ push(out, outElement)
+ if (close) completeStage()
+ case HeadersAndStreamedEntity(headerData, outStream) =>
+ try transfer(headerData, outStream)
+ catch {
+ case NonFatal(e) =>
+ log.error(e,
+ s"Rendering of response failed because response entity
stream materialization failed with '${e.getMessage}'. Sending out 500 response
instead.")
+ push(out,
+ render(ResponseRenderingContext(HttpResponse(500,
+ entity =
StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes)
}
+ }
- override def onUpstreamFinish(): Unit =
- if (transferring) closeMode = CloseConnection
- else completeStage()
+ override def onUpstreamFinish(): Unit =
+ if (transferring) closeMode = CloseConnection
+ else completeStage()
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ stopTransfer()
+ failStage(ex)
+ }
+
+ setHandler(in, this)
- override def onUpstreamFailure(ex: Throwable): Unit = {
- stopTransfer()
- failStage(ex)
- }
- })
private val waitForDemandHandler = new OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
}
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
index 603be504e..8c1051d1b 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameEventRenderer.scala
@@ -32,52 +32,52 @@ private[http] final class FrameEventRenderer extends
GraphStage[FlowShape[FrameE
val out = Outlet[ByteString]("FrameEventRenderer.out")
override val shape = FlowShape(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with OutHandler {
- val Initial = new InHandler {
- override def onPush(): Unit = grab(in) match {
- case start @ FrameStart(header, data) =>
- require(header.length >= data.size)
- if (!start.lastPart && header.length > 0)
- setHandler(in, renderData(header.length - data.length, this))
+ val Initial = new InHandler {
+ override def onPush(): Unit = grab(in) match {
+ case start @ FrameStart(header, data) =>
+ require(header.length >= data.size)
+ if (!start.lastPart && header.length > 0)
+ setHandler(in, renderData(header.length - data.length, this))
- push(out, renderStart(start))
+ push(out, renderStart(start))
- case f: FrameData =>
- fail(out, new IllegalStateException("unexpected FrameData (need
FrameStart first)"))
+ case _: FrameData =>
+ fail(out, new IllegalStateException("unexpected FrameData (need
FrameStart first)"))
+ }
}
- }
- def renderData(initialRemaining: Long, nextState: InHandler): InHandler =
- new InHandler {
- var remaining: Long = initialRemaining
-
- override def onPush(): Unit = {
- grab(in) match {
- case FrameData(data, lastPart) =>
- if (data.size > remaining)
- throw new IllegalStateException(s"Expected $remaining frame
bytes but got ${data.size}")
- else if (data.size == remaining) {
- if (!lastPart) throw new IllegalStateException(s"Frame data
complete but `lastPart` flag not set")
- setHandler(in, nextState)
- push(out, data)
- } else {
- remaining -= data.size
- push(out, data)
- }
-
- case f: FrameStart =>
- fail(out, new IllegalStateException("unexpected FrameStart (need
more FrameData first)"))
+ def renderData(initialRemaining: Long, nextState: InHandler): InHandler =
+ new InHandler {
+ var remaining: Long = initialRemaining
+
+ override def onPush(): Unit = {
+ grab(in) match {
+ case FrameData(data, lastPart) =>
+ if (data.size > remaining)
+ throw new IllegalStateException(s"Expected $remaining frame
bytes but got ${data.size}")
+ else if (data.size == remaining) {
+ if (!lastPart) throw new IllegalStateException(s"Frame data
complete but `lastPart` flag not set")
+ setHandler(in, nextState)
+ push(out, data)
+ } else {
+ remaining -= data.size
+ push(out, data)
+ }
+
+ case f: FrameStart =>
+ fail(out, new IllegalStateException("unexpected FrameStart
(need more FrameData first)"))
+ }
}
}
- }
- setHandler(in, Initial)
- setHandler(out,
- new OutHandler {
- override def onPull(): Unit = pull(in)
- })
- }
+ override def onPull(): Unit = pull(in)
+
+ setHandler(in, Initial)
+ setHandler(out, this)
+ }
private def renderStart(start: FrameStart): ByteString =
renderHeader(start.header) ++ start.data
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
index 9ca214a34..5ac882f0d 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/WebSocket.scala
@@ -225,26 +225,24 @@ private[http] object WebSocket {
val shape = new FanOutShape2(outputIn, bypassOut, messageOut)
- def createLogic(effectiveAttributes: Attributes) = new
GraphStageLogic(shape) {
+ def createLogic(effectiveAttributes: Attributes) = new
GraphStageLogic(shape) with InHandler {
+
+ override def onPush(): Unit = {
+ grab(outputIn) match {
+ case b: BypassEvent with MessagePart => emit(bypassOut, b, () =>
emit(messageOut, b, pullIn))
+ case b: BypassEvent => emit(bypassOut, b, pullIn)
+ case m: MessagePart => emit(messageOut, m, pullIn)
+ }
+ }
- setHandler(outputIn,
- new InHandler {
- override def onPush(): Unit = {
- grab(outputIn) match {
- case b: BypassEvent with MessagePart => emit(bypassOut, b, () =>
emit(messageOut, b, pullIn))
- case b: BypassEvent => emit(bypassOut, b,
pullIn)
- case m: MessagePart => emit(messageOut, m,
pullIn)
- }
- }
- })
val pullIn = () => tryPull(outputIn)
+ setHandler(outputIn, this)
+
setHandler(bypassOut, eagerTerminateOutput)
setHandler(messageOut, ignoreTerminateOutput)
- override def preStart(): Unit = {
- pullIn()
- }
+ override def preStart(): Unit = tryPull(outputIn)
}
}
@@ -288,18 +286,13 @@ private[http] object WebSocket {
val shape = new FlowShape(in, out)
- def createLogic(effectiveAttributes: Attributes) = new
GraphStageLogic(shape) {
- setHandler(out,
- new OutHandler {
- override def onPull(): Unit = pull(in)
- })
- setHandler(in,
- new InHandler {
- override def onPush(): Unit = push(out, grab(in))
- override def onUpstreamFinish(): Unit = emit(out,
UserHandlerCompleted, () => completeStage())
- override def onUpstreamFailure(ex: Throwable): Unit =
- emit(out, UserHandlerErredOut(ex), () => completeStage())
- })
+ def createLogic(effectiveAttributes: Attributes) = new
GraphStageLogic(shape) with InHandler with OutHandler {
+ override def onPull(): Unit = pull(in)
+ override def onPush(): Unit = push(out, grab(in))
+ override def onUpstreamFinish(): Unit = emit(out, UserHandlerCompleted,
() => completeStage())
+ override def onUpstreamFailure(ex: Throwable): Unit = emit(out,
UserHandlerErredOut(ex), () => completeStage())
+
+ setHandlers(in, out, this)
}
}
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
index feef415f7..3ffe461bb 100644
--- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
+++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/package.scala
@@ -126,46 +126,43 @@ package util {
override val shape = FlowShape(byteStringIn, httpEntityOut)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic
= new TimerGraphStageLogic(shape) {
- val bytes = ByteString.newBuilder
- private var emptyStream = false
-
- override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer",
timeout)
-
- setHandler(httpEntityOut,
- new OutHandler {
- override def onPull(): Unit = {
- if (emptyStream) {
- push(httpEntityOut, HttpEntity.Strict(contentType,
ByteString.empty))
- completeStage()
- } else pull(byteStringIn)
- }
- })
-
- setHandler(byteStringIn,
- new InHandler {
- override def onPush(): Unit = {
- bytes ++= grab(byteStringIn)
- maxBytes match {
- case Some(max) if bytes.length > max =>
- failStage(new EntityStreamException(new ErrorInfo("Request too
large",
- s"Request was longer than the maximum of $max")))
- case _ =>
- pull(byteStringIn)
- }
- }
- override def onUpstreamFinish(): Unit = {
- if (isAvailable(httpEntityOut)) {
- push(httpEntityOut, HttpEntity.Strict(contentType,
bytes.result()))
- completeStage()
- } else emptyStream = true
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic
=
+ new TimerGraphStageLogic(shape) with InHandler with OutHandler {
+ private val bytes = ByteString.newBuilder
+ private var emptyStream = false
+
+ override def preStart(): Unit = scheduleOnce("ToStrictTimeoutTimer",
timeout)
+
+ override def onPull(): Unit = {
+ if (emptyStream) {
+ push(httpEntityOut, HttpEntity.Strict(contentType,
ByteString.empty))
+ completeStage()
+ } else pull(byteStringIn)
+ }
+
+ override def onPush(): Unit = {
+ bytes ++= grab(byteStringIn)
+ maxBytes match {
+ case Some(max) if bytes.length > max =>
+ failStage(new EntityStreamException(new ErrorInfo("Request too
large",
+ s"Request was longer than the maximum of $max")))
+ case _ =>
+ pull(byteStringIn)
}
- })
+ }
+ override def onUpstreamFinish(): Unit = {
+ if (isAvailable(httpEntityOut)) {
+ push(httpEntityOut, HttpEntity.Strict(contentType, bytes.result()))
+ completeStage()
+ } else emptyStream = true
+ }
- override def onTimer(key: Any): Unit =
- failStage(new java.util.concurrent.TimeoutException(
- s"HttpEntity.toStrict timed out after $timeout while still waiting
for outstanding data"))
- }
+ setHandlers(byteStringIn, httpEntityOut, this)
+
+ override def onTimer(key: Any): Unit =
+ failStage(new java.util.concurrent.TimeoutException(
+ s"HttpEntity.toStrict timed out after $timeout while still waiting
for outstanding data"))
+ }
override def toString = "ToStrict"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]