This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new a93fdc80c AWS S3: Handle Illegal Headers in Copy Part
a93fdc80c is described below
commit a93fdc80cafad890c149b916a5e9f92351428a22
Author: Luka J <[email protected]>
AuthorDate: Fri Nov 28 09:36:08 2025 +0100
AWS S3: Handle Illegal Headers in Copy Part
(cherry picked from commit 53d7b1dabbd50aface0a89b51a0c20ea1624630e)
---
s3/src/main/resources/reference.conf | 210 +++++++++++++++++++++
.../pekko/stream/connectors/s3/S3Headers.scala | 6 +-
.../stream/connectors/s3/impl/S3Request.scala | 97 ++++++++--
.../pekko/stream/connectors/s3/impl/S3Stream.scala | 60 +++---
.../pekko/stream/connectors/s3/settings.scala | 112 +++++++++--
s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala | 14 +-
.../pekko/stream/connectors/s3/S3HeadersSpec.scala | 113 +++++++++++
.../stream/connectors/s3/S3SettingsSpec.scala | 59 ++++++
.../connectors/s3/impl/HttpRequestsSpec.scala | 2 +-
.../stream/connectors/s3/impl/S3StreamSpec.scala | 4 +-
10 files changed, 615 insertions(+), 62 deletions(-)
diff --git a/s3/src/main/resources/reference.conf
b/s3/src/main/resources/reference.conf
index d24cd82ee..4521e6475 100644
--- a/s3/src/main/resources/reference.conf
+++ b/s3/src/main/resources/reference.conf
@@ -125,4 +125,214 @@ pekko.connectors.s3 {
# Add signature headers to requests when aws.credentials.provider is anon
sign-anonymous-requests = true
+
+ # An allow list of headers for each S3Request type as defined by the AWS
specification - this merged with additional-allowed-headers
+ allowed-headers {
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_RequestSyntax
+ GetObject = [
+ "Host",
+ "If-Match",
+ "If-Modified-Since",
+ "If-None-Match",
+ "If-Unmodified-Since",
+ "Range",
+ "x-amz-server-side-encryption-customer-algorithm",
+ "x-amz-server-side-encryption-customer-key",
+ "x-amz-server-side-encryption-customer-key-MD5",
+ "x-amz-request-payer",
+ "x-amz-expected-bucket-owner",
+ "x-amz-checksum-mode"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
+ HeadObject = [
+ "Host",
+ "If-Match",
+ "If-Modified-Since",
+ "If-None-Match",
+ "If-Unmodified-Since",
+ "Range",
+ "x-amz-server-side-encryption-customer-algorithm",
+ "x-amz-server-side-encryption-customer-key",
+ "x-amz-server-side-encryption-customer-key-MD5",
+ "x-amz-request-payer",
+ "x-amz-expected-bucket-owner",
+ "x-amz-checksum-mode"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
+ PutObject = [
+ "Host",
+ "x-amz-acl",
+ "Cache-Control",
+ "Content-Disposition",
+ "Content-Encoding",
+ "Content-Language",
+ "Content-Length",
+ "Content-MD5",
+ "Content-Type",
+ "x-amz-sdk-checksum-algorithm",
+ "x-amz-checksum-crc32",
+ "x-amz-checksum-crc32c",
+ "x-amz-checksum-crc64nvme",
+ "x-amz-checksum-sha1",
+ "x-amz-checksum-sha256",
+ "Expires",
+ "If-Match",
+ "If-None-Match",
+ "x-amz-grant-full-control",
+ "x-amz-grant-read",
+ "x-amz-grant-read-acp",
+ "x-amz-grant-write-acp",
+ "x-amz-write-offset-bytes",
+ "x-amz-server-side-encryption",
+ "x-amz-storage-class",
+ "x-amz-website-redirect-location",
+ "x-amz-server-side-encryption-customer-algorithm",
+ "x-amz-server-side-encryption-customer-key",
+ "x-amz-server-side-encryption-customer-key-MD5",
+ "x-amz-server-side-encryption-aws-kms-key-id",
+ "x-amz-server-side-encryption-context",
+ "x-amz-server-side-encryption-bucket-key-enabled",
+ "x-amz-request-payer",
+ "x-amz-tagging",
+ "x-amz-object-lock-mode",
+ "x-amz-object-lock-retain-until-date",
+ "x-amz-object-lock-legal-hold",
+ "x-amz-expected-bucket-owner"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_RequestSyntax
+ InitiateMultipartUpload = [
+ "Host",
+ "x-amz-acl",
+ "Cache-Control",
+ "Content-Disposition",
+ "Content-Encoding",
+ "Content-Language",
+ "Content-Type",
+ "Expires",
+ "x-amz-grant-full-control",
+ "x-amz-grant-read",
+ "x-amz-grant-read-acp",
+ "x-amz-grant-write-acp",
+ "x-amz-server-side-encryption",
+ "x-amz-storage-class",
+ "x-amz-website-redirect-location",
+ "x-amz-server-side-encryption-customer-algorithm",
+ "x-amz-server-side-encryption-customer-key",
+ "x-amz-server-side-encryption-customer-key-MD5",
+ "x-amz-server-side-encryption-aws-kms-key-id",
+ "x-amz-server-side-encryption-context",
+ "x-amz-server-side-encryption-bucket-key-enabled",
+ "x-amz-request-payer",
+ "x-amz-tagging",
+ "x-amz-object-lock-mode",
+ "x-amz-object-lock-retain-until-date",
+ "x-amz-object-lock-legal-hold",
+ "x-amz-expected-bucket-owner",
+ "x-amz-checksum-algorithm",
+ "x-amz-checksum-type"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html#API_UploadPart_RequestSyntax
+ UploadPart = [
+ "Host",
+ "Content-Length",
+ "Content-MD5",
+ "x-amz-sdk-checksum-algorithm",
+ "x-amz-checksum-crc32",
+ "x-amz-checksum-crc32c",
+ "x-amz-checksum-crc64nvme",
+ "x-amz-checksum-sha1",
+ "x-amz-checksum-sha256",
+ "x-amz-server-side-encryption-customer-algorithm",
+ "x-amz-server-side-encryption-customer-key",
+ "x-amz-server-side-encryption-customer-key-MD5",
+ "x-amz-request-payer",
+ "x-amz-expected-bucket-owner"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html#API_UploadPartCopy_RequestSyntax
+ CopyPart = [
+ "Host",
+ "x-amz-copy-source",
+ "x-amz-copy-source-if-match",
+ "x-amz-copy-source-if-modified-since",
+ "x-amz-copy-source-if-none-match",
+ "x-amz-copy-source-if-unmodified-since",
+ "x-amz-copy-source-range",
+ "x-amz-server-side-encryption-customer-algorithm",
+ "x-amz-server-side-encryption-customer-key",
+ "x-amz-server-side-encryption-customer-key-MD5",
+ "x-amz-copy-source-server-side-encryption-customer-algorithm",
+ "x-amz-copy-source-server-side-encryption-customer-key",
+ "x-amz-copy-source-server-side-encryption-customer-key-MD5",
+ "x-amz-request-payer",
+ "x-amz-expected-bucket-owner",
+ "x-amz-source-expected-bucket-owner"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html#API_DeleteObject_RequestSyntax
+ DeleteObject = [
+ "Host",
+ "x-amz-mfa",
+ "x-amz-request-payer",
+ "x-amz-bypass-governance-retention",
+ "x-amz-expected-bucket-owner",
+ "If-Match",
+ "x-amz-if-match-last-modified-time",
+ "x-amz-if-match-size"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html#API_ListBuckets_RequestSyntax
+ ListBucket = [
+ "Host"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateBucket.html#API_CreateBucket_RequestSyntax
+ MakeBucket = [
+ "Host",
+ "x-amz-acl",
+ "x-amz-grant-full-control",
+ "x-amz-grant-read",
+ "x-amz-grant-read-acp",
+ "x-amz-grant-write",
+ "x-amz-grant-write-acp",
+ "x-amz-bucket-object-lock-enabled",
+ "x-amz-object-ownership"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucket.html#API_DeleteBucket_RequestSyntax
+ DeleteBucket = [
+ "Host",
+ "x-amz-expected-bucket-owner"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html#API_HeadBucket_RequestSyntax
+ CheckBucket = [
+ "Host",
+ "x-amz-expected-bucket-owner"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_PutBucketVersioning.html#API_control_PutBucketVersioning_RequestSyntax
+ PutBucketVersioning = [
+ "Host",
+ "Content-MD5",
+ "x-amz-sdk-checksum-algorithm",
+ "x-amz-mfa",
+ "x-amz-expected-bucket-owner"
+ ]
+ #
https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_GetBucketVersioning.html#API_control_GetBucketVersioning_RequestSyntax
+ GetBucketVersioning = [
+ "Host",
+ "x-amz-expected-bucket-owner"
+ ]
+ }
+
+ # An additional allow list for each S3Request type for cases where non
standard request headers are needed or the S3 specification evolves to include
new headers
+ additional-allowed-headers {
+ GetObject = []
+ HeadObject = []
+ PutObject = []
+ InitiateMultipartUpload = []
+ UploadPart = []
+ CopyPart = []
+ DeleteObject = []
+ ListBucket = []
+ MakeBucket = []
+ DeleteBucket = []
+ CheckBucket = []
+ PutBucketVersioning = []
+ GetBucketVersioning = []
+ }
}
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
index 7ab95395b..71cdb126d 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
@@ -76,8 +76,10 @@ final class S3Headers private (val cannedAcl:
Option[CannedAcl] = None,
RawHeader(header._1, header._2)
}
- @InternalApi private[s3] def headersFor(request: S3Request) =
- headers ++ serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(request))
+ @InternalApi private[s3] def headersFor(request: S3Request)(implicit
s3Settings: S3Settings) =
+ headers.filter(header =>
+ s3Settings.concreteAllowedHeaders.getOrElse(request, Set.empty).contains(
+ header.name())) ++
serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(request))
def withCannedAcl(cannedAcl: CannedAcl): S3Headers = copy(cannedAcl =
Some(cannedAcl))
def withMetaHeaders(metaHeaders: MetaHeaders): S3Headers = copy(metaHeaders
= Some(metaHeaders))
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
index 19e3858f2..4789ca19f 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
@@ -13,7 +13,10 @@
package org.apache.pekko.stream.connectors.s3.impl
-import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko
+
+import pekko.annotation.InternalApi
+import pekko.util.OptionVal
/**
* Internal Api
@@ -23,64 +26,130 @@ import org.apache.pekko.annotation.InternalApi
/**
* Internal Api
*/
-@InternalApi private[s3] case object GetObject extends S3Request
+@InternalApi private[s3] object S3Request {
+ def fromString(str: String): OptionVal[S3Request] = {
+ str match {
+ case "GetObject" => OptionVal(GetObject)
+ case "HeadObject" => OptionVal(HeadObject)
+ case "PutObject" => OptionVal(PutObject)
+ case "InitiateMultipartUpload" => OptionVal(InitiateMultipartUpload)
+ case "UploadPart" => OptionVal(UploadPart)
+ case "CopyPart" => OptionVal(CopyPart)
+ case "DeleteObject" => OptionVal(DeleteObject)
+ case "ListBucket" => OptionVal(ListBucket)
+ case "MakeBucket" => OptionVal(MakeBucket)
+ case "DeleteBucket" => OptionVal(DeleteBucket)
+ case "CheckBucket" => OptionVal(CheckBucket)
+ case "PutBucketVersioning" => OptionVal(PutBucketVersioning)
+ case "GetBucketVersioning" => OptionVal(GetBucketVersioning)
+ case _ => OptionVal.None
+ }
+ }
+
+ val allRequests: List[S3Request] = List(
+ GetObject,
+ HeadObject,
+ PutObject,
+ InitiateMultipartUpload,
+ UploadPart,
+ CopyPart,
+ DeleteObject,
+ ListBucket,
+ MakeBucket,
+ DeleteBucket,
+ CheckBucket,
+ PutBucketVersioning,
+ GetBucketVersioning
+ )
+}
+
+/**
+ * Internal Api
+ */
+@InternalApi private[s3] case object GetObject extends S3Request {
+ override def toString() = "GetObject"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object HeadObject extends S3Request
+@InternalApi private[s3] case object HeadObject extends S3Request {
+ override def toString() = "HeadObject"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object PutObject extends S3Request
+@InternalApi private[s3] case object PutObject extends S3Request {
+ override def toString() = "PutObject"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object InitiateMultipartUpload extends S3Request
+@InternalApi private[s3] case object InitiateMultipartUpload extends S3Request
{
+ override def toString() = "InitiateMultipartUpload"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object UploadPart extends S3Request
+@InternalApi private[s3] case object UploadPart extends S3Request {
+ override def toString() = "UploadPart"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object CopyPart extends S3Request
+@InternalApi private[s3] case object CopyPart extends S3Request {
+ override def toString() = "CopyPart"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object DeleteObject extends S3Request
+@InternalApi private[s3] case object DeleteObject extends S3Request {
+ override def toString() = "DeleteObject"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object ListBucket extends S3Request
+@InternalApi private[s3] case object ListBucket extends S3Request {
+ override def toString() = "ListBucket"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object MakeBucket extends S3Request
+@InternalApi private[s3] case object MakeBucket extends S3Request {
+ override def toString() = "MakeBucket"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object DeleteBucket extends S3Request
+@InternalApi private[s3] case object DeleteBucket extends S3Request {
+ override def toString() = "DeleteBucket"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object CheckBucket extends S3Request
+@InternalApi private[s3] case object CheckBucket extends S3Request {
+ override def toString() = "CheckBucket"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object PutBucketVersioning extends S3Request
+@InternalApi private[s3] case object PutBucketVersioning extends S3Request {
+ override def toString() = "PutBucketVersioning"
+}
/**
* Internal Api
*/
-@InternalApi private[s3] case object GetBucketVersioning extends S3Request
+@InternalApi private[s3] case object GetBucketVersioning extends S3Request {
+ override def toString() = "GetBucketVersioning"
+}
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 89341f9a4..1b080e24a 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -187,11 +187,12 @@ import scala.util.{ Failure, Success, Try }
range: Option[ByteRange],
versionId: Option[String],
s3Headers: S3Headers): Source[Option[(Source[ByteString, NotUsed],
ObjectMetadata)], NotUsed] = {
- val headers = s3Headers.headersFor(GetObject)
Source
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
+ implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+ val headers = s3Headers.headersFor(GetObject)
issueRequest(s3Location, rangeOption = range, versionId = versionId,
s3Headers = headers)(mat, attr)
.map(response =>
response.withEntity(response.entity.withoutSizeLimit))
.mapAsync(parallelism = 1)(entityForSuccess)
@@ -210,12 +211,13 @@ import scala.util.{ Failure, Success, Try }
range: Option[ByteRange],
versionId: Option[String],
s3Headers: S3Headers): Source[ByteString, Future[ObjectMetadata]] = {
- val headers = s3Headers.headersFor(GetObject)
Source
.fromMaterializer { (mat, attr) =>
val objectMetadataMat = Promise[ObjectMetadata]()
implicit val materializer: Materializer = mat
+ implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+ val headers = s3Headers.headersFor(GetObject)
issueRequest(s3Location, rangeOption = range, versionId = versionId,
s3Headers = headers)(mat, attr)
.map(response =>
response.withEntity(response.entity.withoutSizeLimit))
.mapAsync(parallelism = 1)(entityForSuccess)
@@ -580,6 +582,7 @@ import scala.util.{ Failure, Success, Try }
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
import mat.executionContext
+ implicit val conf: S3Settings = resolveSettings(attr, mat.system)
val headers = s3Headers.headersFor(HeadObject)
issueRequest(S3Location(bucket, key), HttpMethods.HEAD, versionId =
versionId, s3Headers = headers)(mat, attr)
.flatMapConcat {
@@ -610,7 +613,7 @@ import scala.util.{ Failure, Success, Try }
Source
.fromMaterializer { (mat, attr) =>
implicit val m: Materializer = mat
-
+ implicit val conf: S3Settings = resolveSettings(attr, mat.system)
val headers = s3Headers.headersFor(DeleteObject)
issueRequest(s3Location, HttpMethods.DELETE, versionId = versionId,
s3Headers = headers)(mat, attr)
.flatMapConcat {
@@ -659,8 +662,6 @@ import scala.util.{ Failure, Success, Try }
// TODO can we take in a Source[ByteString, NotUsed] without forcing
chunking
// chunked requests are causing S3 to think this is a multipart upload
- val headers = s3Headers.headersFor(PutObject)
-
Source
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
@@ -668,6 +669,7 @@ import scala.util.{ Failure, Success, Try }
import mat.executionContext
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+ val headers = s3Headers.headersFor(PutObject)
val req = uploadRequest(s3Location, data, contentLength, contentType,
headers)
@@ -717,8 +719,9 @@ import scala.util.{ Failure, Success, Try }
case _ => downloadRequest
}
- private def bucketManagementRequest(bucket: String)(method: HttpMethod,
conf: S3Settings): HttpRequest =
- HttpRequests.bucketManagementRequest(S3Location(bucket, key = ""),
method)(conf)
+ private def bucketManagementRequest(bucket: String, headers: S3Headers,
request: S3Request)(method: HttpMethod,
+ conf: S3Settings): HttpRequest =
+ HttpRequests.bucketManagementRequest(S3Location(bucket, key = ""), method,
headers.headersFor(request)(conf))(conf)
def makeBucketSource(bucket: String, headers: S3Headers): Source[Done,
NotUsed] = {
Source
@@ -738,8 +741,7 @@ import scala.util.{ Failure, Success, Try }
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
- httpRequest = bucketManagementRequest(bucket),
- headers.headersFor(MakeBucket),
+ httpRequest = bucketManagementRequest(bucket, headers, MakeBucket),
process = processS3LifecycleResponse,
httpEntity = maybeRegionPayload)
}
@@ -753,9 +755,9 @@ import scala.util.{ Failure, Success, Try }
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
- httpRequest = bucketManagementRequest(bucket),
- headers.headersFor(DeleteBucket),
- process = processS3LifecycleResponse)
+ httpRequest = bucketManagementRequest(bucket, headers, DeleteBucket),
+ process = processS3LifecycleResponse
+ )
def deleteBucket(bucket: String, headers: S3Headers)(implicit mat:
Materializer, attr: Attributes): Future[Done] =
deleteBucketSource(bucket,
headers).withAttributes(attr).runWith(Sink.ignore)
@@ -764,42 +766,42 @@ import scala.util.{ Failure, Success, Try }
s3ManagementRequest[BucketAccess](
bucket = bucketName,
method = HttpMethods.HEAD,
- httpRequest = bucketManagementRequest(bucketName),
- headers.headersFor(CheckBucket),
+ httpRequest = bucketManagementRequest(bucketName, headers, CheckBucket),
process = processCheckIfExistsResponse)
def checkIfBucketExists(bucket: String, headers: S3Headers)(implicit mat:
Materializer,
attr: Attributes): Future[BucketAccess] =
checkIfBucketExistsSource(bucket,
headers).withAttributes(attr).runWith(Sink.head)
- private def uploadManagementRequest(bucket: String, key: String, uploadId:
String)(method: HttpMethod,
+ private def uploadManagementRequest(bucket: String, key: String, uploadId:
String, s3Headers: S3Headers,
+ s3Request: S3Request)(method: HttpMethod,
conf: S3Settings): HttpRequest =
- HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId,
method)(conf)
+ HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId,
method,
+ s3Headers.headersFor(s3Request)(conf))(conf)
def deleteUploadSource(bucket: String, key: String, uploadId: String,
headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
- httpRequest = uploadManagementRequest(bucket, key, uploadId),
- headers.headersFor(DeleteBucket),
+ httpRequest = uploadManagementRequest(bucket, key, uploadId, headers,
DeleteBucket),
process = processS3LifecycleResponse)
def deleteUpload(bucket: String, key: String, uploadId: String, headers:
S3Headers)(implicit mat: Materializer,
attr: Attributes): Future[Done] =
deleteUploadSource(bucket, key, uploadId,
headers).withAttributes(attr).runWith(Sink.ignore)
- private def bucketVersioningRequest(bucket: String, mfaStatus:
Option[MFAStatus], headers: S3Headers)(
+ private def bucketVersioningRequest(bucket: String, mfaStatus:
Option[MFAStatus], headers: S3Headers,
+ s3Request: S3Request)(
method: HttpMethod,
conf: S3Settings): HttpRequest =
- HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method,
headers.headers)(conf)
+ HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method,
headers.headersFor(s3Request)(conf))(conf)
def putBucketVersioningSource(
bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers):
Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
- httpRequest = bucketVersioningRequest(bucket,
bucketVersioning.mfaDelete, headers),
- headers.headersFor(PutBucketVersioning),
+ httpRequest = bucketVersioningRequest(bucket,
bucketVersioning.mfaDelete, headers, PutBucketVersioning),
process = processS3LifecycleResponse,
httpEntity =
Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))
@@ -813,8 +815,7 @@ import scala.util.{ Failure, Success, Try }
s3ManagementRequest[BucketVersioningResult](
bucket = bucket,
method = HttpMethods.GET,
- httpRequest = bucketVersioningRequest(bucket, None, headers),
- headers.headersFor(GetBucketVersioning),
+ httpRequest = bucketVersioningRequest(bucket, None, headers,
GetBucketVersioning),
process = { (response: HttpResponse, mat: Materializer) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
@@ -833,7 +834,6 @@ import scala.util.{ Failure, Success, Try }
bucket: String,
method: HttpMethod,
httpRequest: (HttpMethod, S3Settings) => HttpRequest,
- headers: Seq[HttpHeader],
process: (HttpResponse, Materializer) => Future[T],
httpEntity: Option[Future[RequestEntity]] = None): Source[T, NotUsed] =
Source
@@ -1337,7 +1337,8 @@ import scala.util.{ Failure, Success, Try }
private def requestInfoOrUploadState(s3Location: S3Location,
contentType: ContentType,
s3Headers: S3Headers,
- initialUploadState: Option[(String, Int)]): Source[(MultipartUpload,
Int), NotUsed] = {
+ initialUploadState: Option[(String, Int)])(
+ implicit s3Settings: S3Settings): Source[(MultipartUpload, Int),
NotUsed] = {
initialUploadState match {
case Some((uploadId, initialIndex)) =>
// We are resuming from a previously aborted Multipart upload so
rather than creating a new MultipartUpload
@@ -1504,15 +1505,14 @@ import scala.util.{ Failure, Success, Try }
contentType: ContentType,
s3Headers: S3Headers,
partitions: Source[List[CopyPartition], NotUsed]) = {
- val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
- initiateUpload(location, contentType,
s3Headers.headersFor(InitiateMultipartUpload))
-
- val headers = s3Headers.headersFor(CopyPart)
Source
.fromMaterializer { (mat, attr) =>
implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+ val headers = s3Headers.headersFor(CopyPart)
+ val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
+ initiateUpload(location, contentType,
s3Headers.headersFor(InitiateMultipartUpload))
requestInfo
.zipWith(partitions) {
case ((upload, _), ls) =>
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
index 5e32a228a..8d19d685f 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
@@ -22,7 +22,10 @@ import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.http.scaladsl.model.Uri
import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle,
VirtualHostAccessStyle }
+import pekko.stream.connectors.s3.impl.S3Request
import pekko.util.OptionConverters._
+import pekko.util.OptionVal
+import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials._
@@ -359,7 +362,9 @@ final class S3Settings private (
val validateObjectKey: Boolean,
val retrySettings: RetrySettings,
val multipartUploadSettings: MultipartUploadSettings,
- val signAnonymousRequests: Boolean) {
+ val signAnonymousRequests: Boolean,
+ val allowedHeaders: Map[String, Set[String]]
+) {
/** Java API */
def getBufferType: BufferType = bufferType
@@ -420,6 +425,17 @@ final class S3Settings private (
def withSignAnonymousRequests(value: Boolean): S3Settings =
if (signAnonymousRequests == value) this else copy(signAnonymousRequests =
value)
+ private[s3] val concreteAllowedHeaders: Map[S3Request, Set[String]] = {
+ allowedHeaders.foldLeft(Map.empty[S3Request, Set[String]]) {
+ case (acc, (header, value)) =>
+ S3Request.fromString(header) match {
+ case OptionVal.Some(header) => acc + (header -> value)
+ case OptionVal.None => acc
+ case other => throw new MatchError(other)
+ }
+ }
+ }
+
private def copy(
bufferType: BufferType = bufferType,
credentialsProvider: AwsCredentialsProvider = credentialsProvider,
@@ -431,7 +447,9 @@ final class S3Settings private (
validateObjectKey: Boolean = validateObjectKey,
retrySettings: RetrySettings = retrySettings,
multipartUploadSettings: MultipartUploadSettings =
multipartUploadSettings,
- signAnonymousRequests: Boolean = signAnonymousRequests): S3Settings =
new S3Settings(
+ signAnonymousRequests: Boolean = signAnonymousRequests,
+ allowedHeaders: Map[String, Set[String]] = allowedHeaders
+ ): S3Settings = new S3Settings(
bufferType,
credentialsProvider,
s3RegionProvider,
@@ -442,7 +460,9 @@ final class S3Settings private (
validateObjectKey,
retrySettings,
multipartUploadSettings,
- signAnonymousRequests)
+ signAnonymousRequests,
+ allowedHeaders
+ )
override def toString: String =
"S3Settings(" +
@@ -455,8 +475,15 @@ final class S3Settings private (
s"forwardProxy=$forwardProxy," +
s"validateObjectKey=$validateObjectKey" +
s"retrySettings=$retrySettings" +
- s"multipartUploadSettings=$multipartUploadSettings)" +
- s"signAnonymousRequests=$signAnonymousRequests"
+ s"multipartUploadSettings=$multipartUploadSettings" +
+ s"signAnonymousRequests=$signAnonymousRequests" +
+ s"allowedHeaders=${
+ val entries = allowedHeaders.toSeq.sortBy(_._1).map { case (key,
values) =>
+ s"$key -> Set(${values.mkString(", ")})"
+ }.mkString(", ")
+ s"Map($entries)"
+ }" +
+ ")"
override def equals(other: Any): Boolean = other match {
case that: S3Settings =>
@@ -470,7 +497,8 @@ final class S3Settings private (
this.validateObjectKey == that.validateObjectKey &&
Objects.equals(this.retrySettings, that.retrySettings) &&
Objects.equals(this.multipartUploadSettings, multipartUploadSettings) &&
- this.signAnonymousRequests == that.signAnonymousRequests
+ this.signAnonymousRequests == that.signAnonymousRequests &&
+ this.allowedHeaders == that.allowedHeaders
case _ => false
}
@@ -628,6 +656,37 @@ object S3Settings {
val signAnonymousRequests = c.getBoolean("sign-anonymous-requests")
+ val allowedHeadersConfig = c.getConfig("allowed-headers")
+
+ val allowedHeadersBase = S3Request.allRequests.map {
+ requestType =>
+ val requestTypeString = requestType.toString()
+ val value = if (allowedHeadersConfig.hasPath(requestTypeString)) {
+ allowedHeadersConfig.getStringList(requestTypeString).asScala.toSet
+ } else {
+ Set.empty[String]
+ }
+ (requestType.toString(), value)
+ }.toMap
+
+ val additionalAllowedHeadersConfig =
c.getConfig("additional-allowed-headers")
+
+ val additionalAllowedHeaders: Map[String, Set[String]] =
+ S3Request.allRequests.map {
+ requestType =>
+ val requestTypeString = requestType.toString()
+ val value = if
(additionalAllowedHeadersConfig.hasPath(requestTypeString)) {
+
additionalAllowedHeadersConfig.getStringList(requestTypeString).asScala.toSet
+ } else {
+ Set.empty[String]
+ }
+ (requestType.toString(), value)
+ }.toMap
+
+ val finalAllowedHeaders = allowedHeadersBase ++
additionalAllowedHeaders.map { case (k, v) =>
+ k -> (v ++ allowedHeadersBase.getOrElse(k, Set.empty[String]))
+ }
+
new S3Settings(
bufferType,
credentialsProvider,
@@ -639,7 +698,9 @@ object S3Settings {
validateObjectKey,
retrySettings,
multipartUploadSettings,
- signAnonymousRequests)
+ signAnonymousRequests,
+ finalAllowedHeaders
+ )
}
/**
@@ -648,11 +709,27 @@ object S3Settings {
def create(c: Config): S3Settings = apply(c)
/** Scala API */
+
+ def apply(
+ bufferType: BufferType,
+ credentialsProvider: AwsCredentialsProvider,
+ s3RegionProvider: AwsRegionProvider,
+ listBucketApiVersion: ApiVersion): S3Settings =
+ apply(
+ bufferType,
+ credentialsProvider,
+ s3RegionProvider,
+ listBucketApiVersion,
+ allowedHeaders = Map.empty[String, Set[String]] // default value
+ )
+
def apply(
bufferType: BufferType,
credentialsProvider: AwsCredentialsProvider,
s3RegionProvider: AwsRegionProvider,
- listBucketApiVersion: ApiVersion): S3Settings = new S3Settings(
+ listBucketApiVersion: ApiVersion,
+ allowedHeaders: Map[String, Set[String]]
+ ): S3Settings = new S3Settings(
bufferType,
credentialsProvider,
s3RegionProvider,
@@ -663,18 +740,31 @@ object S3Settings {
validateObjectKey = true,
RetrySettings.default,
MultipartUploadSettings(RetrySettings.default),
- signAnonymousRequests = true)
+ signAnonymousRequests = true,
+ allowedHeaders = allowedHeaders
+ )
/** Java API */
+
def create(
bufferType: BufferType,
credentialsProvider: AwsCredentialsProvider,
s3RegionProvider: AwsRegionProvider,
- listBucketApiVersion: ApiVersion): S3Settings = apply(
+ listBucketApiVersion: ApiVersion): S3Settings =
+ create(bufferType, credentialsProvider, s3RegionProvider,
listBucketApiVersion, Map.empty)
+
+ def create(
+ bufferType: BufferType,
+ credentialsProvider: AwsCredentialsProvider,
+ s3RegionProvider: AwsRegionProvider,
+ listBucketApiVersion: ApiVersion,
+ allowedHeaders: Map[String, Set[String]]): S3Settings = apply(
bufferType,
credentialsProvider,
s3RegionProvider,
- listBucketApiVersion)
+ listBucketApiVersion,
+ allowedHeaders
+ )
/**
* Scala API: Creates [[S3Settings]] from the [[com.typesafe.config.Config
Config]] attached to an actor system.
diff --git a/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
b/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
index bfb809b41..de669ee75 100644
--- a/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
+++ b/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
@@ -240,6 +240,8 @@ class S3SinkSpec extends S3WireMockBase with
S3ClientIntegrationSpec with Option
val requestPayerHeader = "x-amz-request-payer"
val requestPayerHeaderValue = "requester"
+ val storageClassHeader = "x-amz-storage-class"
+ val storageClassHeaderValue = "STANDARD_IA"
val keys = ServerSideEncryption
.customerKeys(sseCustomerKey)
@@ -263,7 +265,13 @@ class S3SinkSpec extends S3WireMockBase with
S3ClientIntegrationSpec with Option
targetBucketKey,
s3Headers = S3Headers()
.withServerSideEncryption(keys)
- .withCustomHeaders(Map(requestPayerHeader ->
requestPayerHeaderValue)))
+ .withCustomHeaders(
+ Map(
+ requestPayerHeader -> requestPayerHeaderValue,
+ storageClassHeader -> storageClassHeaderValue
+ )
+ )
+ )
.run()
result.futureValue shouldBe MultipartUploadResult(targetUrl, targetBucket,
targetBucketKey, etag, None)
@@ -286,7 +294,9 @@ class S3SinkSpec extends S3WireMockBase with
S3ClientIntegrationSpec with Option
.withHeader(sseCKeyHeader, new EqualToPattern(sseCKeyHeaderValue))
.withHeader(sseCSourceAlgorithmHeader, new
EqualToPattern(sseCSourceAlgorithmHeaderValue))
.withHeader(sseCSourceKeyHeader, new
EqualToPattern(sseCSourceKeyHeaderValue))
- .withHeader(requestPayerHeader, new
EqualToPattern(requestPayerHeaderValue)))
+ .withHeader(requestPayerHeader, new
EqualToPattern(requestPayerHeaderValue))
+ .withoutHeader(storageClassHeader)
+ )
// SSE headers only
mock.verifyThat(
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3HeadersSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3HeadersSpec.scala
new file mode 100644
index 000000000..a5e7cec31
--- /dev/null
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3HeadersSpec.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.s3
+
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko
+import org.scalatest.flatspec.AnyFlatSpecLike
+import org.scalatest.matchers.should.Matchers
+
+import pekko.stream.connectors.s3.impl._
+
+class S3HeadersSpec extends AnyFlatSpecLike with Matchers {
+ it should "filter headers based on what's allowed" in {
+ val testOverrideConfig = ConfigFactory.parseString("""
+ | pekko.connectors.s3 {
+ | allowed-headers {
+ | GetObject = [base]
+ | HeadObject = [base]
+ | PutObject = [base]
+ | InitiateMultipartUpload = [base]
+ | UploadPart = [base]
+ | CopyPart = [base]
+ | DeleteObject = [base]
+ | ListBucket = [base]
+ | MakeBucket = [base]
+ | DeleteBucket = [base]
+ | CheckBucket = [base]
+ | PutBucketVersioning = [base]
+ | GetBucketVersioning = [base]
+ | }
+ | additional-allowed-headers {
+ | GetObject = [allowedExtra]
+ | HeadObject = [allowedExtra]
+ | PutObject = [allowedExtra]
+ | InitiateMultipartUpload = [allowedExtra]
+ | UploadPart = [allowedExtra]
+ | CopyPart = [allowedExtra]
+ | DeleteObject = [allowedExtra]
+ | ListBucket = [allowedExtra]
+ | MakeBucket = [allowedExtra]
+ | DeleteBucket = [allowedExtra]
+ | CheckBucket = [allowedExtra]
+ | PutBucketVersioning = [allowedExtra]
+ | GetBucketVersioning = [allowedExtra]
+ | }
+ |}
+ |""".stripMargin)
+
+ val defaultConfig = ConfigFactory.load()
+ val finalConfig = testOverrideConfig.withFallback(defaultConfig)
+
+ S3Request.allRequests.foreach {
+ requestType =>
+ val extraHeaders = Map("allowedExtra" -> "allGood", "notAllowed" ->
"shouldBeGone")
+ val header = S3Headers().withCustomHeaders(Map("base" ->
requestType.toString()) ++ extraHeaders)
+ val s3Config = finalConfig.getConfig("pekko.connectors.s3")
+ val headerFilter =
header.headersFor(requestType)(S3Settings.apply(s3Config))
+ val result = headerFilter.map(header => (header.name(),
header.value()))
+ result should contain allElementsOf (Seq("base" ->
requestType.toString) ++ Seq("allowedExtra" -> "allGood"))
+ }
+
+ }
+
+ it should "be able to convert all headers toString and back correctly" in {
+ val roundTrip = S3Request.allRequests
+ .map(_.toString())
+ .flatMap(S3Request.fromString(_).toOption)
+
+ roundTrip should contain allElementsOf S3Request.allRequests
+
+ }
+
+ it should "contain all S3Request types" in {
+ val actualSet = S3Request.allRequests
+
+ actualSet should have size 13
+
+ // Use exhaustive match to verify all are present
+ def verifyAllPresent(req: S3Request): Boolean = req match {
+ case e @ GetObject => actualSet.contains(e)
+ case e @ HeadObject => actualSet.contains(e)
+ case e @ PutObject => actualSet.contains(e)
+ case e @ InitiateMultipartUpload => actualSet.contains(e)
+ case e @ UploadPart => actualSet.contains(e)
+ case e @ CopyPart => actualSet.contains(e)
+ case e @ DeleteObject => actualSet.contains(e)
+ case e @ ListBucket => actualSet.contains(e)
+ case e @ MakeBucket => actualSet.contains(e)
+ case e @ DeleteBucket => actualSet.contains(e)
+ case e @ CheckBucket => actualSet.contains(e)
+ case e @ PutBucketVersioning => actualSet.contains(e)
+ case e @ GetBucketVersioning => actualSet.contains(e)
+ }
+
+ actualSet.foreach(req => verifyAllPresent(req) shouldBe true)
+
+ }
+}
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
index cfdb66f5e..9dd1bd6bb 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
@@ -41,6 +41,36 @@ class S3SettingsSpec extends S3WireMockBase with
S3ClientIntegrationSpec with Op
|}
|multipart-upload.retry-settings = $${retry-settings}
|sign-anonymous-requests = true
+ |allowed-headers {
+ | GetObject = [GetObject0]
+ | HeadObject = [HeadObject0]
+ | PutObject = [PutObject0]
+ | InitiateMultipartUpload = [InitiateMultipartUpload0]
+ | UploadPart = [UploadPart0]
+ | CopyPart = [CopyPart0]
+ | DeleteObject = [DeleteObject0]
+ | ListBucket = [ListBucket0]
+ | MakeBucket = [MakeBucket0]
+ | DeleteBucket = [DeleteBucket0]
+ | CheckBucket = [CheckBucket0]
+ | PutBucketVersioning = [PutBucketVersioning0]
+ | GetBucketVersioning = [GetBucketVersioning0]
+ |}
+ |additional-allowed-headers {
+ | GetObject = []
+ | HeadObject = []
+ | PutObject = []
+ | InitiateMultipartUpload = []
+ | UploadPart = []
+ | CopyPart = []
+ | DeleteObject = []
+ | ListBucket = []
+ | MakeBucket = []
+ | DeleteBucket = []
+ | CheckBucket = []
+ | PutBucketVersioning = []
+ | GetBucketVersioning = []
+ |}
|$more
""".stripMargin)
.resolve)
@@ -273,4 +303,33 @@ class S3SettingsSpec extends S3WireMockBase with
S3ClientIntegrationSpec with Op
val settings = mkSettings("sign-anonymous-requests = false")
settings.signAnonymousRequests shouldBe false
}
+
+ it should "parse additional-allowed-headers" in {
+ val settings = mkSettings("""
+ |additional-allowed-headers {
+ | GetObject = [GetObject1, GetObject2]
+ | HeadObject = [HeadObject1, HeadObject2]
+ | PutObject = [PutObject1, PutObject2]
+ | InitiateMultipartUpload = [InitiateMultipartUpload1,
InitiateMultipartUpload2 ]
+ | UploadPart = [UploadPart1, UploadPart2]
+ | CopyPart = [CopyPart1, CopyPart2]
+ | DeleteObject = [DeleteObject1, DeleteObject2]
+ | ListBucket = [ListBucket1, ListBucket2]
+ | MakeBucket = [MakeBucket1, MakeBucket2]
+ | DeleteBucket = [DeleteBucket1, DeleteBucket2]
+ | CheckBucket = [CheckBucket1, CheckBucket2]
+ | PutBucketVersioning = [PutBucketVersioning1,
PutBucketVersioning2]
+ | GetBucketVersioning = [GetBucketVersioning1,
GetBucketVersioning2]
+ |}""".stripMargin)
+
+ settings.allowedHeaders.keySet should contain allElementsOf
(pekko.stream.connectors.s3.impl.S3Request.allRequests.map(
+ _.toString()))
+ settings.allowedHeaders.foreach {
+ case (key, value) =>
+ assert(value.nonEmpty)
+ val result = value.map(_.replaceAll(key, "").toInt)
+ result should contain allElementsOf (Seq(0, 1, 2))
+ }
+ }
+
}
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
index f5d851291..e901e2764 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
@@ -46,7 +46,7 @@ class HttpRequestsSpec extends AnyFlatSpec with Matchers with
ScalaFutures with
def getRegion = s3Region
}
- S3Settings(bufferType, awsCredentials, regionProvider,
listBucketApiVersion)
+ S3Settings(bufferType, awsCredentials, regionProvider,
listBucketApiVersion, Map.empty)
}
val location = S3Location("bucket", "image-1024@2x")
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
index 5c24dbc7f..5801061fc 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
@@ -63,7 +63,7 @@ class S3StreamSpec(_system: ActorSystem)
val location = S3Location("test-bucket", "test-key")
implicit val settings: S3Settings =
- S3Settings(MemoryBufferType, credentialsProvider, regionProvider,
ApiVersion.ListBucketVersion2)
+ S3Settings(MemoryBufferType, credentialsProvider, regionProvider,
ApiVersion.ListBucketVersion2, Map.empty)
val result: HttpRequest =
S3Stream.invokePrivate(requestHeaders(getDownloadRequest(location), None))
result.headers.size shouldBe 2
@@ -87,7 +87,7 @@ class S3StreamSpec(_system: ActorSystem)
val range = ByteRange(1, 4)
implicit val settings: S3Settings =
- S3Settings(MemoryBufferType, credentialsProvider, regionProvider,
ApiVersion.ListBucketVersion2)
+ S3Settings(MemoryBufferType, credentialsProvider, regionProvider,
ApiVersion.ListBucketVersion2, Map.empty)
val result: HttpRequest =
S3Stream.invokePrivate(requestHeaders(getDownloadRequest(location),
Some(range)))
result.headers.size shouldBe 3
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]