This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 56d5aee76 Force US_EAST_1 for S3 listBuckets call
56d5aee76 is described below
commit 56d5aee761b4ef89d1aafb0ce7d0f3ee080a2b59
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu Mar 16 19:21:14 2023 +0100
Force US_EAST_1 for S3 listBuckets call
---
.../pekko/stream/connectors/s3/impl/S3Stream.scala | 24 +++++++++++++---------
.../connectors/s3/scaladsl/S3IntegrationSpec.scala | 15 ++++++++++++++
2 files changed, 29 insertions(+), 10 deletions(-)
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index ec70ae8fb..04d836af6 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -174,12 +174,12 @@ import scala.util.{ Failure, Success, Try }
Flow[ByteString].orElse(Source.single(ByteString.empty))
// def because tokens can expire
- private def signingKey(implicit settings: S3Settings) = {
+ private def signingKey(overrideRegion: Option[Region] = None)(implicit
settings: S3Settings) = {
val requestDate = ZonedDateTime.now(ZoneOffset.UTC)
SigningKey(
requestDate,
settings.credentialsProvider,
- CredentialScope(requestDate.toLocalDate,
settings.s3RegionProvider.getRegion, "s3"))
+ CredentialScope(requestDate.toLocalDate,
overrideRegion.getOrElse(settings.s3RegionProvider.getRegion), "s3"))
}
def download(
@@ -336,7 +336,9 @@ import scala.util.{ Failure, Success, Try }
Source
.future {
signAndGetAs[ListBucketsResult](
- HttpRequests.listBuckets(s3Headers.headers)).map { (res:
ListBucketsResult) =>
+ // This request only works when its called from US_EAST_1. Note
that buckets are region
+ // agnostic
+ HttpRequests.listBuckets(s3Headers.headers),
Some(Region.US_EAST_1)).map { (res: ListBucketsResult) =>
res.buckets
}(ExecutionContexts.parasitic)
}
@@ -1128,7 +1130,7 @@ import scala.util.{ Failure, Success, Try }
}
.flatMapConcat {
case (req, info) =>
- Signer.signedRequest(req, signingKey,
conf.signAnonymousRequests).zip(Source.single(info))
+ Signer.signedRequest(req, signingKey(),
conf.signAnonymousRequests).zip(Source.single(info))
}
.via(superPool[(MultipartUpload, Int)])
@@ -1220,7 +1222,7 @@ import scala.util.{ Failure, Success, Try }
}
.flatMapConcat {
case ((req, info), allContext) =>
- Signer.signedRequest(req, signingKey,
conf.signAnonymousRequests).zip(Source.single(info)).map {
+ Signer.signedRequest(req, signingKey(),
conf.signAnonymousRequests).zip(Source.single(info)).map {
case (httpRequest, data) => (httpRequest, (data, allContext))
}
}
@@ -1356,12 +1358,13 @@ import scala.util.{ Failure, Success, Try }
.mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic))
private def signAndGetAs[T](
- request: HttpRequest)(
+ request: HttpRequest,
+ overrideRegion: Option[Region] = None)(
implicit um: Unmarshaller[ResponseEntity, T], mat: Materializer, attr:
Attributes): Future[T] = {
import mat.executionContext
implicit val sys: ActorSystem = mat.system
for {
- response <- signAndRequest(request).runWith(Sink.head)
+ response <- signAndRequest(request, overrideRegion).runWith(Sink.head)
(entity, _) <- entityForSuccess(response)
t <- Unmarshal(entity).to[T]
} yield t
@@ -1382,14 +1385,15 @@ import scala.util.{ Failure, Success, Try }
}
private def signAndRequest(
- request: HttpRequest)(
+ request: HttpRequest,
+ overrideRegion: Option[Region] = None)(
implicit sys: ActorSystem, mat: Materializer, attr: Attributes):
Source[HttpResponse, NotUsed] = {
implicit val conf: S3Settings = resolveSettings(attr, sys)
import conf.retrySettings._
import mat.executionContext
val retriableFlow = Flow[HttpRequest]
- .flatMapConcat(req => Signer.signedRequest(req, signingKey,
conf.signAnonymousRequests))
+ .flatMapConcat(req => Signer.signedRequest(req,
signingKey(overrideRegion), conf.signAnonymousRequests))
.mapAsync(parallelism = 1)(req =>
singleRequest(req)
.map(Success.apply)
@@ -1461,7 +1465,7 @@ import scala.util.{ Failure, Success, Try }
.mapConcat(identity)
.flatMapConcat {
case (req, info) =>
- Signer.signedRequest(req, signingKey,
conf.signAnonymousRequests).zip(Source.single(info))
+ Signer.signedRequest(req, signingKey(),
conf.signAnonymousRequests).zip(Source.single(info))
}
}
.mapMaterializedValue(_ => NotUsed)
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 0532d1530..4e9c4ad8a 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -113,6 +113,21 @@ trait S3IntegrationSpec
buckets.map(_.name) should contain(defaultBucket)
}
+ it should "list buckets in current AWS account using non US_EAST_1 region"
in {
+ // Its only AWS that complains if listBuckets is called from a non
US_EAST_1 region
+ assume(this.isInstanceOf[AWSS3IntegrationSpec])
+ val result = for {
+ buckets <- S3.listBuckets().withAttributes(
+ S3Attributes.settings(defaultS3Settings.withS3RegionProvider(new
AwsRegionProvider {
+ override def getRegion: Region = Region.EU_CENTRAL_1
+ }))).runWith(Sink.seq)
+ } yield buckets
+
+ val buckets = result.futureValue
+
+ buckets.map(_.name) should contain(defaultBucket)
+ }
+
it should "list with real credentials" in {
val result = S3
.listBucket(defaultBucket, None)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]