This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 4223ac20b Use scope config for compute-engine auth
4223ac20b is described below

commit 4223ac20b3eec15fd3a2020b86397c0bcf262056
Author: Matthew de Detrich <[email protected]>
AuthorDate: Mon Jan 1 17:01:59 2024 +1100

    Use scope config for compute-engine auth
---
 .../google/auth/ComputeEngineCredentials.scala          |  9 +++++----
 .../stream/connectors/google/auth/Credentials.scala     |  6 ++++--
 .../connectors/google/auth/GoogleComputeMetadata.scala  | 17 +++++++++++++----
 3 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/ComputeEngineCredentials.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/ComputeEngineCredentials.scala
index 9a7bb7a04..3709b096d 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/ComputeEngineCredentials.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/ComputeEngineCredentials.scala
@@ -25,18 +25,19 @@ import scala.concurrent.Future
 @InternalApi
 private[auth] object ComputeEngineCredentials {
 
-  def apply()(implicit system: ClassicActorSystemProvider): 
Future[Credentials] =
+  def apply(scopes: Set[String])(implicit system: ClassicActorSystemProvider): 
Future[Credentials] = {
     GoogleComputeMetadata
       .getProjectId()
-      .map(new ComputeEngineCredentials(_))(system.classicSystem.dispatcher)
+      .map(projectId => new ComputeEngineCredentials(projectId, 
scopes))(system.classicSystem.dispatcher)
+  }
 
 }
 
 @InternalApi
-private final class ComputeEngineCredentials(projectId: String)(implicit mat: 
Materializer)
+private final class ComputeEngineCredentials(projectId: String, scopes: 
Set[String])(implicit mat: Materializer)
     extends OAuth2Credentials(projectId) {
   override protected def getAccessToken()(implicit mat: Materializer,
       settings: RequestSettings,
       clock: Clock): Future[AccessToken] =
-    GoogleComputeMetadata.getAccessToken()
+    GoogleComputeMetadata.getAccessToken(scopes)
 }
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
index c94f34fbc..7d1ec3e24 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala
@@ -75,8 +75,10 @@ object Credentials {
     ServiceAccountCredentials(c.getConfig("service-account"), scopes)
   }
 
-  private def parseComputeEngine(c: Config)(implicit system: 
ClassicActorSystemProvider) =
-    Await.result(ComputeEngineCredentials(), 
c.getDuration("compute-engine.timeout").asScala)
+  private def parseComputeEngine(c: Config)(implicit system: 
ClassicActorSystemProvider) = {
+    val scopes = c.getStringList("scopes").asScala.toSet
+    Await.result(ComputeEngineCredentials(scopes), 
c.getDuration("compute-engine.timeout").asScala)
+  }
 
   private def parseUserAccess(c: Config)(implicit system: 
ClassicActorSystemProvider) =
     UserAccessCredentials(c.getConfig("user-access"))
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleComputeMetadata.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleComputeMetadata.scala
index b205a11cb..6336b74aa 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleComputeMetadata.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleComputeMetadata.scala
@@ -19,7 +19,7 @@ import pekko.annotation.InternalApi
 import pekko.http.scaladsl.Http
 import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
 import pekko.http.scaladsl.model.HttpMethods.GET
-import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.{ HttpRequest, Uri }
 import pekko.http.scaladsl.model.headers.RawHeader
 import pekko.http.scaladsl.unmarshalling.Unmarshal
 import pekko.stream.Materializer
@@ -35,17 +35,26 @@ private[auth] object GoogleComputeMetadata {
   private val projectIdUrl = s"$metadataUrl/project/project-id"
   private val `Metadata-Flavor` = RawHeader("Metadata-Flavor", "Google")
 
-  private val tokenRequest = HttpRequest(GET, 
tokenUrl).addHeader(`Metadata-Flavor`)
+  private def tokenRequest(scopes: Set[String]) = {
+    val finalUri =
+      if (scopes.nonEmpty)
+        Uri(tokenUrl).withQuery(Uri.Query(Map(
+          "scopes" -> scopes.mkString(","))))
+      else
+        Uri(tokenUrl)
+
+    HttpRequest(GET, finalUri).addHeader(`Metadata-Flavor`)
+  }
   private val projectIdRequest = HttpRequest(GET, 
projectIdUrl).addHeader(`Metadata-Flavor`)
 
-  def getAccessToken()(
+  def getAccessToken(scopes: Set[String])(
       implicit mat: Materializer,
       clock: Clock): Future[AccessToken] = {
     import SprayJsonSupport._
     import mat.executionContext
     implicit val system: ActorSystem = mat.system
     for {
-      response <- Http().singleRequest(tokenRequest)
+      response <- Http().singleRequest(tokenRequest(scopes))
       token <- Unmarshal(response.entity).to[AccessToken]
     } yield token
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to