This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new dfed9afab fix(aws-spi-pekko-http): preserve SDK Content-Length to 
prevent chunked encoding breaking SigV4 signing (#1546)
dfed9afab is described below

commit dfed9afab3772f581854580f2506d4617a9e8903
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Apr 3 13:04:59 2026 +0200

    fix(aws-spi-pekko-http): preserve SDK Content-Length to prevent chunked 
encoding breaking SigV4 signing (#1546)
    
    * Initial plan
    
    * fix: preserve Content-Length from AWS SDK headers to prevent chunked 
transfer encoding in S3 multipart uploads
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/5e1c413d-04aa-4dd9-b961-bb747147fe43
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * fix: use HttpEntity.Chunked.fromData when content length is unknown
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/71bfd062-b42b-4547-b302-04d169bdb895
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Revert "fix: use HttpEntity.Chunked.fromData when content length is 
unknown"
    
    This reverts commit 9155a0634e0d0ac4d3ce8e925ff6b816f0903129.
    
    * Create modify-convert-headers.backwards.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../modify-convert-headers.backwards.excludes      | 20 +++++++
 .../stream/connectors/awsspi/PekkoHttpClient.scala | 65 +++++++++++++---------
 .../connectors/awsspi/PekkoHttpClientSpec.scala    | 50 ++++++++++++++++-
 3 files changed, 107 insertions(+), 28 deletions(-)

diff --git 
a/aws-spi-pekko-http/src/main/mima-filters/1.4.x.backward.excludes/modify-convert-headers.backwards.excludes
 
b/aws-spi-pekko-http/src/main/mima-filters/1.4.x.backward.excludes/modify-convert-headers.backwards.excludes
new file mode 100644
index 000000000..7b6b0a081
--- /dev/null
+++ 
b/aws-spi-pekko-http/src/main/mima-filters/1.4.x.backward.excludes/modify-convert-headers.backwards.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# convertHeaders and entityForMethodAndContentType are package private and 
were recently changed
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient.convertHeaders")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient.entityForMethodAndContentType")
diff --git 
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
index b4d73d7ff..18895e9ec 100644
--- 
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
+++ 
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
@@ -78,22 +78,30 @@ object PekkoHttpClient {
 
   private[awsspi] def toPekkoRequest(request: SdkHttpRequest,
       contentPublisher: SdkHttpContentPublisher): HttpRequest = {
-    val (contentTypeHeader, reqheaders) = convertHeaders(request.headers())
+    val (contentTypeHeader, reqheaders, sdkContentLength) = 
convertHeaders(request.headers())
     val method = convertMethod(request.method().name())
     HttpRequest(
       method = method,
       uri = Uri(request.getUri.toString),
       headers = reqheaders,
-      entity =
-        entityForMethodAndContentType(method, 
contentTypeHeaderToContentType(contentTypeHeader), contentPublisher),
+      entity = entityForMethodAndContentType(method, 
contentTypeHeaderToContentType(contentTypeHeader),
+        contentPublisher, sdkContentLength),
       protocol = HttpProtocols.`HTTP/1.1`)
   }
 
   private[awsspi] def entityForMethodAndContentType(method: HttpMethod,
       contentType: ContentType,
-      contentPublisher: SdkHttpContentPublisher): RequestEntity =
+      contentPublisher: SdkHttpContentPublisher,
+      sdkContentLength: Option[Long] = None): RequestEntity =
     method.requestEntityAcceptance match {
-      case Expected => contentPublisher.contentLength().toScala match {
+      case Expected =>
+        // Prefer the content length from the SDK request headers over the 
publisher's value.
+        // This ensures that when the AWS SDK has set a Content-Length (which 
it always does for
+        // non-chunked-signing requests like UploadPart), Pekko HTTP sends a 
Content-Length entity
+        // rather than falling back to chunked transfer encoding, which would 
break AWS SigV4 signing.
+        val contentLength: Option[Long] =
+          
sdkContentLength.orElse(contentPublisher.contentLength().toScala.map(_.toLong))
+        contentLength match {
           case Some(length) =>
             HttpEntity(contentType, length, 
Source.fromPublisher(contentPublisher).map(ByteString(_)))
           case None => HttpEntity(contentType, 
Source.fromPublisher(contentPublisher).map(ByteString(_)))
@@ -119,35 +127,40 @@ object PekkoHttpClient {
       //
       .getOrElse(ContentTypes.NoContentType)
 
-  // This method converts the headers to Pekko-http headers and drops 
content-length and returns content-type separately
+  // This method converts the headers to Pekko-http headers, drops 
content-length (returning its value separately),
+  // and returns content-type separately
   private[awsspi] def convertHeaders(
-      headers: java.util.Map[String, java.util.List[String]]): 
(Option[HttpHeader], immutable.Seq[HttpHeader]) = {
+      headers: java.util.Map[String, java.util.List[String]]): 
(Option[HttpHeader], immutable.Seq[HttpHeader],
+      Option[Long]) = {
     val headersAsScala = {
       val builder = collection.mutable.Map.newBuilder[String, 
java.util.List[String]]
       headers.forEach { case (k, v) => builder += k -> v }
       builder.result()
     }
 
-    headersAsScala.foldLeft((Option.empty[HttpHeader], 
List.empty[HttpHeader])) { case ((ctHeader, hdrs), header) =>
-      val (headerName, headerValue) = header
-      if (headerValue.size() != 1) {
-        throw new IllegalArgumentException(
-          s"Found invalid header: key: $headerName, Value: ${val list = 
List.newBuilder[String]
-            headerValue.forEach(v => list += v)
-            list.result()}.")
-      }
-      // skip content-length as it will be calculated by pekko-http itself and 
must not be provided in the request headers
-      if (`Content-Length`.lowercaseName == headerName.toLowerCase) (ctHeader, 
hdrs)
-      else {
-        HttpHeader.parse(headerName, headerValue.get(0)) match {
-          case ok: Ok =>
-            // return content-type separately as it will be used to calculate 
ContentType, which is used on HttpEntity
-            if (ok.header.lowercaseName() == `Content-Type`.lowercaseName) 
(Some(ok.header), hdrs)
-            else (ctHeader, hdrs :+ ok.header)
-          case error: ParsingResult.Error =>
-            throw new IllegalArgumentException(s"Found invalid header: 
${error.errors}.")
+    headersAsScala.foldLeft((Option.empty[HttpHeader], List.empty[HttpHeader], 
Option.empty[Long])) {
+      case ((ctHeader, hdrs, contentLength), header) =>
+        val (headerName, headerValue) = header
+        if (headerValue.size() != 1) {
+          throw new IllegalArgumentException(
+            s"Found invalid header: key: $headerName, Value: ${val list = 
List.newBuilder[String]
+              headerValue.forEach(v => list += v)
+              list.result()}.")
+        }
+        // skip content-length as it will be managed by pekko-http in the 
entity, but capture its value
+        // so we can use it to build a fixed-length entity, preventing a 
fallback to chunked transfer encoding
+        if (`Content-Length`.lowercaseName == headerName.toLowerCase)
+          (ctHeader, hdrs, Some(headerValue.get(0).toLong))
+        else {
+          HttpHeader.parse(headerName, headerValue.get(0)) match {
+            case ok: Ok =>
+              // return content-type separately as it will be used to 
calculate ContentType, which is used on HttpEntity
+              if (ok.header.lowercaseName() == `Content-Type`.lowercaseName) 
(Some(ok.header), hdrs, contentLength)
+              else (ctHeader, hdrs :+ ok.header, contentLength)
+            case error: ParsingResult.Error =>
+              throw new IllegalArgumentException(s"Found invalid header: 
${error.errors}.")
+          }
         }
-      }
     }
   }
 
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
index d32824dda..b9c35672e 100644
--- 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -18,16 +18,19 @@
 package org.apache.pekko.stream.connectors.awsspi
 
 import java.util.Collections
+import java.nio.ByteBuffer
 import com.typesafe.config.ConfigFactory
 
 import org.apache.pekko
 import pekko.http.scaladsl.model.headers.`Content-Type`
-import pekko.http.scaladsl.model.MediaTypes
+import pekko.http.scaladsl.model.{ ContentTypes, HttpMethods, MediaTypes }
 import pekko.http.scaladsl.settings.{ ClientConnectionSettings, 
ConnectionPoolSettings }
+import org.reactivestreams.Subscriber
 import org.scalatest.OptionValues
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpec
 import software.amazon.awssdk.http.SdkHttpConfigurationOption
+import software.amazon.awssdk.http.async.SdkHttpContentPublisher
 import software.amazon.awssdk.utils.AttributeMap
 
 import scala.concurrent.duration._
@@ -49,11 +52,54 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers 
with OptionValues {
       headers.put("Content-Length", Collections.singletonList("123"))
       headers.put("Accept", Collections.singletonList("*/*"))
 
-      val (contentTypeHeader, reqHeaders) = 
PekkoHttpClient.convertHeaders(headers)
+      val (contentTypeHeader, reqHeaders, contentLength) = 
PekkoHttpClient.convertHeaders(headers)
 
       contentTypeHeader.value.lowercaseName() shouldBe 
`Content-Type`.lowercaseName
       reqHeaders should have size 1
+      contentLength shouldBe Some(123L)
     }
+
+    "return None content length when Content-Length header is absent" in {
+      val headers = new java.util.HashMap[String, java.util.List[String]]
+      headers.put("Content-Type", Collections.singletonList("application/xml"))
+      headers.put("Accept", Collections.singletonList("*/*"))
+
+      val (_, _, contentLength) = PekkoHttpClient.convertHeaders(headers)
+
+      contentLength shouldBe None
+    }
+    "use sdk content length from headers when publisher returns empty 
contentLength" in {
+      val publisher = new SdkHttpContentPublisher {
+        override def contentLength(): java.util.Optional[java.lang.Long] = 
java.util.Optional.empty()
+        override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+      }
+      val entity =
+        PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT, 
ContentTypes.NoContentType, publisher,
+          Some(42L))
+      entity.contentLengthOption shouldBe Some(42L)
+    }
+
+    "use publisher contentLength when sdkContentLength is absent" in {
+      val publisher = new SdkHttpContentPublisher {
+        override def contentLength(): java.util.Optional[java.lang.Long] = 
java.util.Optional.of(99L)
+        override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+      }
+      val entity =
+        PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT, 
ContentTypes.NoContentType, publisher, None)
+      entity.contentLengthOption shouldBe Some(99L)
+    }
+
+    "prefer sdk content length over publisher contentLength when both are 
present" in {
+      val publisher = new SdkHttpContentPublisher {
+        override def contentLength(): java.util.Optional[java.lang.Long] = 
java.util.Optional.of(55L)
+        override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+      }
+      val entity =
+        PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT, 
ContentTypes.NoContentType, publisher,
+          Some(42L))
+      entity.contentLengthOption shouldBe Some(42L)
+    }
+
     "build() should use default ConnectionPoolSettings" in {
       val pekkoClient: PekkoHttpClient = new 
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
         .build()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to