This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git
The following commit(s) were added to refs/heads/main by this push:
new 6d9eb8ade close input streams (#814)
6d9eb8ade is described below
commit 6d9eb8ade83fcc4c5fba8340ac0a57f46d634c97
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Oct 2 18:19:33 2025 +0100
close input streams (#814)
* close input streams
* Update HeaderDecompression.scala
---
.../impl/engine/http2/hpack/HeaderDecompression.scala | 5 ++++-
.../apache/pekko/http/scaladsl/coding/CoderSpec.scala | 8 +++-----
.../impl/engine/http2/Http2FrameHpackSupport.scala | 19 ++++++++++---------
3 files changed, 17 insertions(+), 15 deletions(-)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
index a22e7c6c4..b496090f0 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
@@ -92,8 +92,9 @@ private[http2] final class
HeaderDecompression(masterHeaderParser: HttpHeaderPar
}
}
}
+ val bis = ByteStringInputStream(payload)
try {
- decoder.decode(ByteStringInputStream(payload), Receiver)
+ decoder.decode(bis, Receiver)
decoder.endHeaderBlock() // TODO: do we have to check the result
here?
push(eventsOut, ParsedHeadersFrame(streamId, endStream,
headers.result(), prioInfo))
@@ -102,6 +103,8 @@ private[http2] final class
HeaderDecompression(masterHeaderParser: HttpHeaderPar
// this is signalled by the decoder when it failed, we want to
react to this by rendering a GOAWAY frame
fail(eventsOut,
new
Http2Compliance.Http2ProtocolException(ErrorCode.COMPRESSION_ERROR,
"Decompression failed."))
+ } finally {
+ bis.close()
}
}
diff --git
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
index 672adf0ef..708d39aed 100644
---
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
+++
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
@@ -13,11 +13,10 @@
package org.apache.pekko.http.scaladsl.coding
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream,
OutputStream }
+import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
import java.util.concurrent.ThreadLocalRandom
import java.util.zip.DataFormatException
-import scala.annotation.nowarn
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
@@ -36,7 +35,6 @@ import pekko.util.ByteString
import org.scalatest.Inspectors
import org.scalatest.wordspec.AnyWordSpec
-@nowarn("msg=deprecated .* is internal API")
abstract class CoderSpec extends AnyWordSpec with CodecSpecSupport with
Inspectors {
protected def Coder: Coder
protected def newDecodedInputStream(underlying: InputStream): InputStream
@@ -191,7 +189,7 @@ abstract class CoderSpec extends AnyWordSpec with
CodecSpecSupport with Inspecto
def streamDecode(bytes: ByteString): ByteString = {
val output = new ByteArrayOutputStream()
- val input = newDecodedInputStream(new ByteArrayInputStream(bytes.toArray))
+ val input = newDecodedInputStream(bytes.asInputStream)
val buffer = new Array[Byte](500)
@tailrec def copy(from: InputStream, to: OutputStream): Unit = {
@@ -203,7 +201,7 @@ abstract class CoderSpec extends AnyWordSpec with
CodecSpecSupport with Inspecto
}
copy(input, output)
- ByteString(output.toByteArray)
+ ByteString.fromArrayUnsafe(output.toByteArray)
}
def decodeChunks(input: Source[ByteString, NotUsed]): ByteString =
diff --git
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
index 8dfe2da65..07cff1158 100644
---
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
+++
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
@@ -59,16 +59,17 @@ trait Http2FrameHpackSupport extends
Http2FrameProbeDelegator with Http2FrameSen
val decoder = new Decoder(Http2Protocol.InitialMaxHeaderListSize,
Http2Protocol.InitialMaxHeaderTableSize)
def decodeHeaders(bytes: ByteString): Seq[(String, String)] = {
- val bis = ByteStringInputStream(bytes)
val hs = new VectorBuilder[(String, String)]()
-
- decoder.decode(bis,
- new HeaderListener {
- def addHeader(name: String, value: String, parsedValue: AnyRef,
sensitive: Boolean): AnyRef = {
- hs += name -> value
- parsedValue
- }
- })
+ val bis = ByteStringInputStream(bytes)
+ try
+ decoder.decode(bis,
+ new HeaderListener {
+ def addHeader(name: String, value: String, parsedValue: AnyRef,
sensitive: Boolean): AnyRef = {
+ hs += name -> value
+ parsedValue
+ }
+ })
+ finally bis.close()
hs.result()
}
def decodeHeadersToResponse(bytes: ByteString): HttpResponse =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]