This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 56d5aee76 Force US_EAST_1 for S3 listBuckets call
56d5aee76 is described below

commit 56d5aee761b4ef89d1aafb0ce7d0f3ee080a2b59
Author: Matthew de Detrich <[email protected]>
AuthorDate: Thu Mar 16 19:21:14 2023 +0100

    Force US_EAST_1 for S3 listBuckets call
---
 .../pekko/stream/connectors/s3/impl/S3Stream.scala | 24 +++++++++++++---------
 .../connectors/s3/scaladsl/S3IntegrationSpec.scala | 15 ++++++++++++++
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index ec70ae8fb..04d836af6 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -174,12 +174,12 @@ import scala.util.{ Failure, Success, Try }
     Flow[ByteString].orElse(Source.single(ByteString.empty))
 
   // def because tokens can expire
-  private def signingKey(implicit settings: S3Settings) = {
+  private def signingKey(overrideRegion: Option[Region] = None)(implicit 
settings: S3Settings) = {
     val requestDate = ZonedDateTime.now(ZoneOffset.UTC)
     SigningKey(
       requestDate,
       settings.credentialsProvider,
-      CredentialScope(requestDate.toLocalDate, 
settings.s3RegionProvider.getRegion, "s3"))
+      CredentialScope(requestDate.toLocalDate, 
overrideRegion.getOrElse(settings.s3RegionProvider.getRegion), "s3"))
   }
 
   def download(
@@ -336,7 +336,9 @@ import scala.util.{ Failure, Success, Try }
         Source
           .future {
             signAndGetAs[ListBucketsResult](
-              HttpRequests.listBuckets(s3Headers.headers)).map { (res: 
ListBucketsResult) =>
+              // This request only works when its called from US_EAST_1. Note 
that buckets are region
+              // agnostic
+              HttpRequests.listBuckets(s3Headers.headers), 
Some(Region.US_EAST_1)).map { (res: ListBucketsResult) =>
               res.buckets
             }(ExecutionContexts.parasitic)
           }
@@ -1128,7 +1130,7 @@ import scala.util.{ Failure, Success, Try }
             }
             .flatMapConcat {
               case (req, info) =>
-                Signer.signedRequest(req, signingKey, 
conf.signAnonymousRequests).zip(Source.single(info))
+                Signer.signedRequest(req, signingKey(), 
conf.signAnonymousRequests).zip(Source.single(info))
             }
             .via(superPool[(MultipartUpload, Int)])
 
@@ -1220,7 +1222,7 @@ import scala.util.{ Failure, Success, Try }
             }
             .flatMapConcat {
               case ((req, info), allContext) =>
-                Signer.signedRequest(req, signingKey, 
conf.signAnonymousRequests).zip(Source.single(info)).map {
+                Signer.signedRequest(req, signingKey(), 
conf.signAnonymousRequests).zip(Source.single(info)).map {
                   case (httpRequest, data) => (httpRequest, (data, allContext))
                 }
             }
@@ -1356,12 +1358,13 @@ import scala.util.{ Failure, Success, Try }
       .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic))
 
   private def signAndGetAs[T](
-      request: HttpRequest)(
+      request: HttpRequest,
+      overrideRegion: Option[Region] = None)(
       implicit um: Unmarshaller[ResponseEntity, T], mat: Materializer, attr: 
Attributes): Future[T] = {
     import mat.executionContext
     implicit val sys: ActorSystem = mat.system
     for {
-      response <- signAndRequest(request).runWith(Sink.head)
+      response <- signAndRequest(request, overrideRegion).runWith(Sink.head)
       (entity, _) <- entityForSuccess(response)
       t <- Unmarshal(entity).to[T]
     } yield t
@@ -1382,14 +1385,15 @@ import scala.util.{ Failure, Success, Try }
   }
 
   private def signAndRequest(
-      request: HttpRequest)(
+      request: HttpRequest,
+      overrideRegion: Option[Region] = None)(
       implicit sys: ActorSystem, mat: Materializer, attr: Attributes): 
Source[HttpResponse, NotUsed] = {
     implicit val conf: S3Settings = resolveSettings(attr, sys)
     import conf.retrySettings._
     import mat.executionContext
 
     val retriableFlow = Flow[HttpRequest]
-      .flatMapConcat(req => Signer.signedRequest(req, signingKey, 
conf.signAnonymousRequests))
+      .flatMapConcat(req => Signer.signedRequest(req, 
signingKey(overrideRegion), conf.signAnonymousRequests))
       .mapAsync(parallelism = 1)(req =>
         singleRequest(req)
           .map(Success.apply)
@@ -1461,7 +1465,7 @@ import scala.util.{ Failure, Success, Try }
           .mapConcat(identity)
           .flatMapConcat {
             case (req, info) =>
-              Signer.signedRequest(req, signingKey, 
conf.signAnonymousRequests).zip(Source.single(info))
+              Signer.signedRequest(req, signingKey(), 
conf.signAnonymousRequests).zip(Source.single(info))
           }
       }
       .mapMaterializedValue(_ => NotUsed)
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 0532d1530..4e9c4ad8a 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -113,6 +113,21 @@ trait S3IntegrationSpec
     buckets.map(_.name) should contain(defaultBucket)
   }
 
+  it should "list buckets in current AWS account using non US_EAST_1 region" 
in {
+    // Its only AWS that complains if listBuckets is called from a non 
US_EAST_1 region
+    assume(this.isInstanceOf[AWSS3IntegrationSpec])
+    val result = for {
+      buckets <- S3.listBuckets().withAttributes(
+        S3Attributes.settings(defaultS3Settings.withS3RegionProvider(new 
AwsRegionProvider {
+          override def getRegion: Region = Region.EU_CENTRAL_1
+        }))).runWith(Sink.seq)
+    } yield buckets
+
+    val buckets = result.futureValue
+
+    buckets.map(_.name) should contain(defaultBucket)
+  }
+
   it should "list with real credentials" in {
     val result = S3
       .listBucket(defaultBucket, None)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to