This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d9eb8ade close input streams (#814)
6d9eb8ade is described below

commit 6d9eb8ade83fcc4c5fba8340ac0a57f46d634c97
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Oct 2 18:19:33 2025 +0100

    close input streams (#814)
    
    * close input streams
    
    * Update HeaderDecompression.scala
---
 .../impl/engine/http2/hpack/HeaderDecompression.scala |  5 ++++-
 .../apache/pekko/http/scaladsl/coding/CoderSpec.scala |  8 +++-----
 .../impl/engine/http2/Http2FrameHpackSupport.scala    | 19 ++++++++++---------
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
index a22e7c6c4..b496090f0 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala
@@ -92,8 +92,9 @@ private[http2] final class 
HeaderDecompression(masterHeaderParser: HttpHeaderPar
             }
           }
         }
+        val bis = ByteStringInputStream(payload)
         try {
-          decoder.decode(ByteStringInputStream(payload), Receiver)
+          decoder.decode(bis, Receiver)
           decoder.endHeaderBlock() // TODO: do we have to check the result 
here?
 
           push(eventsOut, ParsedHeadersFrame(streamId, endStream, 
headers.result(), prioInfo))
@@ -102,6 +103,8 @@ private[http2] final class 
HeaderDecompression(masterHeaderParser: HttpHeaderPar
             // this is signalled by the decoder when it failed, we want to 
react to this by rendering a GOAWAY frame
             fail(eventsOut,
               new 
Http2Compliance.Http2ProtocolException(ErrorCode.COMPRESSION_ERROR, 
"Decompression failed."))
+        } finally {
+          bis.close()
         }
       }
 
diff --git 
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
 
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
index 672adf0ef..708d39aed 100644
--- 
a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
+++ 
b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/coding/CoderSpec.scala
@@ -13,11 +13,10 @@
 
 package org.apache.pekko.http.scaladsl.coding
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, 
OutputStream }
+import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
 import java.util.concurrent.ThreadLocalRandom
 import java.util.zip.DataFormatException
 
-import scala.annotation.nowarn
 import scala.annotation.tailrec
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -36,7 +35,6 @@ import pekko.util.ByteString
 import org.scalatest.Inspectors
 import org.scalatest.wordspec.AnyWordSpec
 
-@nowarn("msg=deprecated .* is internal API")
 abstract class CoderSpec extends AnyWordSpec with CodecSpecSupport with 
Inspectors {
   protected def Coder: Coder
   protected def newDecodedInputStream(underlying: InputStream): InputStream
@@ -191,7 +189,7 @@ abstract class CoderSpec extends AnyWordSpec with 
CodecSpecSupport with Inspecto
 
   def streamDecode(bytes: ByteString): ByteString = {
     val output = new ByteArrayOutputStream()
-    val input = newDecodedInputStream(new ByteArrayInputStream(bytes.toArray))
+    val input = newDecodedInputStream(bytes.asInputStream)
 
     val buffer = new Array[Byte](500)
     @tailrec def copy(from: InputStream, to: OutputStream): Unit = {
@@ -203,7 +201,7 @@ abstract class CoderSpec extends AnyWordSpec with 
CodecSpecSupport with Inspecto
     }
 
     copy(input, output)
-    ByteString(output.toByteArray)
+    ByteString.fromArrayUnsafe(output.toByteArray)
   }
 
   def decodeChunks(input: Source[ByteString, NotUsed]): ByteString =
diff --git 
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
 
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
index 8dfe2da65..07cff1158 100644
--- 
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
+++ 
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2FrameHpackSupport.scala
@@ -59,16 +59,17 @@ trait Http2FrameHpackSupport extends 
Http2FrameProbeDelegator with Http2FrameSen
   val decoder = new Decoder(Http2Protocol.InitialMaxHeaderListSize, 
Http2Protocol.InitialMaxHeaderTableSize)
 
   def decodeHeaders(bytes: ByteString): Seq[(String, String)] = {
-    val bis = ByteStringInputStream(bytes)
     val hs = new VectorBuilder[(String, String)]()
-
-    decoder.decode(bis,
-      new HeaderListener {
-        def addHeader(name: String, value: String, parsedValue: AnyRef, 
sensitive: Boolean): AnyRef = {
-          hs += name -> value
-          parsedValue
-        }
-      })
+    val bis = ByteStringInputStream(bytes)
+    try
+      decoder.decode(bis,
+        new HeaderListener {
+          def addHeader(name: String, value: String, parsedValue: AnyRef, 
sensitive: Boolean): AnyRef = {
+            hs += name -> value
+            parsedValue
+          }
+        })
+    finally bis.close()
     hs.result()
   }
   def decodeHeadersToResponse(bytes: ByteString): HttpResponse =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to