This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git


The following commit(s) were added to refs/heads/master by this push:
     new cc61a83  [BAHIR-141] Support GCP JSON key type as binary array
cc61a83 is described below

commit cc61a83a79d912f8eb842507ecca0b2d82f734e6
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Thu Jan 24 04:06:57 2019 -0800

    [BAHIR-141] Support GCP JSON key type as binary array
    
    Closes #82
    Closes #53
---
 streaming-pubsub/README.md                         |   8 +-
 .../streaming/pubsub/SparkGCPCredentials.scala     | 161 ++++++++++++---------
 .../pubsub/SparkGCPCredentialsBuilderSuite.scala   | 100 +++++++------
 3 files changed, 149 insertions(+), 120 deletions(-)

diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md
index a8f374f..f20e5c9 100644
--- a/streaming-pubsub/README.md
+++ b/streaming-pubsub/README.md
@@ -27,11 +27,13 @@ The `--packages` argument can also be used with 
`bin/spark-submit`.
 First you need to create credential by SparkGCPCredentials, it support four 
type of credentials
 * application default
     `SparkGCPCredentials.builder.build()`
-* json type service account
+* JSON type service account (based on file or its binary content)
     `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
-* p12 type service account
+    `SparkGCPCredentials.builder.jsonServiceAccount(JSON_KEY_BYTES).build()`
+* P12 type service account
     `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, 
EMAIL_ACCOUNT).build()`
-* metadata service account(running on dataproc)
+    `SparkGCPCredentials.builder.p12ServiceAccount(P12_KEY_BYTES, 
EMAIL_ACCOUNT).build()`
+* Metadata service account (running on dataproc)
     `SparkGCPCredentials.builder.metadataServiceAccount().build()`
 
 ### Scala API
diff --git 
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
 
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
index f15a521..4352d7f 100644
--- 
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
+++ 
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
@@ -17,119 +17,146 @@
 
 package org.apache.spark.streaming.pubsub
 
+import java.io.{ByteArrayInputStream, File, FileOutputStream}
+import java.nio.file.{Files, Paths}
+import java.util
+
 import com.google.api.client.auth.oauth2.Credential
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
+import com.google.api.client.http.HttpTransport
 import com.google.api.client.json.jackson.JacksonFactory
 import com.google.api.services.pubsub.PubsubScopes
 import com.google.cloud.hadoop.util.{CredentialFactory, HttpTransportFactory}
-import java.io.{ByteArrayInputStream, File, FileNotFoundException, 
FileOutputStream}
-import java.nio.file.{Files, Paths}
-import java.util
-import org.apache.hadoop.conf.Configuration
 
 /**
  * Serializable interface providing a method executors can call to obtain an
  * GCPCredentialsProvider instance for authenticating to GCP services.
  */
 private[pubsub] sealed trait SparkGCPCredentials extends Serializable {
-
   def provider: Credential
+
+  def jacksonFactory(): JacksonFactory = new JacksonFactory
+
+  def httpTransport(): HttpTransport = 
HttpTransportFactory.createHttpTransport(
+    HttpTransportFactory.HttpTransportType.JAVA_NET, null
+  )
+
+  def scopes(): util.Collection[String] = PubsubScopes.all()
 }
 
 /**
- * Returns application default type credential
+ * Application default credentials.
  */
 private[pubsub] final case object ApplicationDefaultCredentials extends 
SparkGCPCredentials {
-
   override def provider: Credential = {
     GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all())
   }
 }
 
 /**
- * Returns a Service Account type Credential instance.
- * If all parameters are None, then try metadata service type
- * If jsonFilePath available, try json type
- * If jsonFilePath is None and p12FilePath and emailAccount available, try p12 
type
- *
- * @param jsonFilePath file path for json
- * @param p12FilePath  file path for p12
- * @param emailAccount email account for p12
+ * Credentials based on JSON key file.
  */
-private[pubsub] final case class ServiceAccountCredentials(
-    jsonFilePath: Option[String] = None,
-    p12FilePath: Option[String] = None,
-    emailAccount: Option[String] = None)
+private[pubsub] final case class JsonConfigCredentials(jsonContent: 
Array[Byte])
     extends SparkGCPCredentials {
-  private val fileBytes = getFileBuffer
+  def this(jsonFilePath: String) = 
this(Files.readAllBytes(Paths.get(jsonFilePath)))
 
   override def provider: Credential = {
-    val jsonFactory = new JacksonFactory
-    val scopes = new util.ArrayList(PubsubScopes.all())
-    val transport = HttpTransportFactory.createHttpTransport(
-      HttpTransportFactory.HttpTransportType.JAVA_NET, null)
-
-    if (!jsonFilePath.isEmpty) {
-      val stream = new ByteArrayInputStream(fileBytes)
-      CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential(
-        GoogleCredential.fromStream(stream, transport, jsonFactory)
-        .createScoped(scopes))
-    } else if (!p12FilePath.isEmpty && !emailAccount.isEmpty) {
-      val tempFile = File.createTempFile(emailAccount.get, ".p12")
-      tempFile.deleteOnExit
-      val p12Out = new FileOutputStream(tempFile)
-      p12Out.write(fileBytes, 0, fileBytes.length)
-      p12Out.close
-
-      new CredentialFactory.GoogleCredentialWithRetry(
-        new GoogleCredential.Builder().setTransport(transport)
-        .setJsonFactory(jsonFactory)
-        .setServiceAccountId(emailAccount.get)
-        .setServiceAccountScopes(scopes)
-        .setServiceAccountPrivateKeyFromP12File(tempFile)
-        .setRequestInitializer(new 
CredentialFactory.CredentialHttpRetryInitializer()))
-    } else (new CredentialFactory).getCredentialFromMetadataServiceAccount
+    val stream = new ByteArrayInputStream(jsonContent)
+    val credentials = 
CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential(
+      GoogleCredential.fromStream(
+        stream, httpTransport(), jacksonFactory()
+      ).createScoped(scopes())
+    )
+    stream.close()
+
+    credentials
   }
+}
 
-  private def getFileBuffer: Array[Byte] = {
-    val filePath = jsonFilePath orElse p12FilePath
-    if (filePath.isEmpty) Array[Byte]()
-    else if (!Files.exists(Paths.get(filePath.get))) {
-      throw new FileNotFoundException(s"The key file path(${filePath.get}) 
doesn't exist.")
-    } else Files.readAllBytes(Paths.get(filePath.get))
+/**
+ * Credentials based on e-mail account and P12 key.
+ */
+private[pubsub] final case class EMailPrivateKeyCredentials(
+    emailAccount: String, p12Content: Array[Byte]
+  ) extends SparkGCPCredentials {
+  def this(emailAccount: String, p12FilePath: String) = {
+    this(emailAccount, Files.readAllBytes(Paths.get(p12FilePath)))
   }
 
+  override def provider: Credential = {
+    val tempFile = File.createTempFile(emailAccount, ".p12")
+    tempFile.deleteOnExit()
+    val p12Out = new FileOutputStream(tempFile)
+    p12Out.write(p12Content, 0, p12Content.length)
+    p12Out.flush()
+    p12Out.close()
+
+    new CredentialFactory.GoogleCredentialWithRetry(
+      new GoogleCredential.Builder().setTransport(httpTransport())
+        .setJsonFactory(jacksonFactory())
+        .setServiceAccountId(emailAccount)
+        .setServiceAccountScopes(scopes())
+        .setServiceAccountPrivateKeyFromP12File(tempFile)
+        .setRequestInitializer(new 
CredentialFactory.CredentialHttpRetryInitializer())
+    )
+  }
 }
 
-object SparkGCPCredentials {
+/**
+ * Credentials based on metadata service.
+ */
+private[pubsub] final case class MetadataServiceCredentials() extends 
SparkGCPCredentials {
+  override def provider: Credential = {
+    (new CredentialFactory).getCredentialFromMetadataServiceAccount
+  }
+}
 
+object SparkGCPCredentials {
   /**
    * Builder for SparkGCPCredentials instance.
    */
   class Builder {
-    private var creds: Option[SparkGCPCredentials] = None
+    private var credentials: Option[SparkGCPCredentials] = None
 
     /**
-     * Use a json type key file for service account credential
-     *
-     * @param jsonFilePath json type key file
+     * Use a JSON type key file for service account credential
+     * @param jsonFilePath JSON type key file
      * @return Reference to this SparkGCPCredentials.Builder
      */
     def jsonServiceAccount(jsonFilePath: String): Builder = {
-      creds = Option(ServiceAccountCredentials(Option(jsonFilePath)))
+      credentials = Option(new JsonConfigCredentials(jsonFilePath))
       this
     }
 
     /**
-     * Use a p12 type key file service account credential
-     *
-     * @param p12FilePath p12 type key file
+     * Use a JSON type key file for service account credential
+     * @param jsonFileBuffer binary content of JSON type key file
+     * @return Reference to this SparkGCPCredentials.Builder
+     */
+    def jsonServiceAccount(jsonFileBuffer: Array[Byte]): Builder = {
+      credentials = Option(JsonConfigCredentials(jsonFileBuffer))
+      this
+    }
+
+    /**
+     * Use a P12 type key file service account credential
+     * @param p12FilePath P12 type key file
      * @param emailAccount email of service account
      * @return Reference to this SparkGCPCredentials.Builder
      */
     def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder 
= {
-      creds = Option(ServiceAccountCredentials(
-        p12FilePath = Option(p12FilePath), emailAccount = 
Option(emailAccount)))
+      credentials = Option(new EMailPrivateKeyCredentials(emailAccount, 
p12FilePath))
+      this
+    }
+
+    /**
+     * Use a P12 type key file service account credential
+     * @param p12FileBuffer binary content of P12 type key file
+     * @param emailAccount email of service account
+     * @return Reference to this SparkGCPCredentials.Builder
+     */
+    def p12ServiceAccount(p12FileBuffer: Array[Byte], emailAccount: String): 
Builder = {
+      credentials = Option(EMailPrivateKeyCredentials(emailAccount, 
p12FileBuffer))
       this
     }
 
@@ -138,27 +165,23 @@ object SparkGCPCredentials {
      * @return Reference to this SparkGCPCredentials.Builder
      */
     def metadataServiceAccount(): Builder = {
-      creds = Option(ServiceAccountCredentials())
+      credentials = Option(MetadataServiceCredentials())
       this
     }
 
     /**
      * Returns the appropriate instance of SparkGCPCredentials given the 
configured
      * parameters.
-     *
      * - The service account credentials will be returned if they were 
provided.
-     *
      * - The application default credentials will be returned otherwise.
-     * @return
+     * @return SparkGCPCredentials object
      */
-    def build(): SparkGCPCredentials = 
creds.getOrElse(ApplicationDefaultCredentials)
-
+    def build(): SparkGCPCredentials = 
credentials.getOrElse(ApplicationDefaultCredentials)
   }
 
   /**
    * Creates a SparkGCPCredentials.Builder for constructing
    * SparkGCPCredentials instance.
-   *
    * @return SparkGCPCredentials.Builder instance
    */
   def builder: Builder = new Builder
diff --git 
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
 
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
index be47e18..4f50639 100644
--- 
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
+++ 
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.streaming.pubsub
 
 import java.nio.file.{Files, Paths}
 
-import org.scalatest.concurrent.TimeLimits
 import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.TimeLimits
 
-import org.apache.spark.util.Utils
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.Utils
 
 class SparkGCPCredentialsBuilderSuite
     extends SparkFunSuite with TimeLimits with BeforeAndAfter{
@@ -33,17 +33,17 @@ class SparkGCPCredentialsBuilderSuite
   private val p12FilePath = 
sys.env.get(PubsubTestUtils.envVarNameForP12KeyPath)
   private val emailAccount = sys.env.get(PubsubTestUtils.envVarNameForAccount)
 
-  private def jsonAssumption {
+  private def jsonAssumption() {
     assume(
-      !jsonFilePath.isEmpty,
+      jsonFilePath.isDefined,
       s"as the environment variable 
${PubsubTestUtils.envVarNameForJsonKeyPath} is not set.")
   }
-  private def p12Assumption {
+  private def p12Assumption() {
     assume(
-      !p12FilePath.isEmpty,
+      p12FilePath.isDefined,
       s"as the environment variable ${PubsubTestUtils.envVarNameForP12KeyPath} 
is not set.")
     assume(
-      !emailAccount.isEmpty,
+      emailAccount.isDefined,
       s"as the environment variable ${PubsubTestUtils.envVarNameForAccount} is 
not set.")
   }
 
@@ -52,71 +52,75 @@ class SparkGCPCredentialsBuilderSuite
   }
 
   test("should build json service account") {
-    jsonAssumption
+    jsonAssumption()
 
-    val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
-    assertResult(jsonCreds) {
-      builder.jsonServiceAccount(jsonFilePath.get).build()
-    }
+    assert(builder.jsonServiceAccount(jsonFilePath.get).build() != null)
   }
 
-  test("should provide json creds") {
-    jsonAssumption
+  test("should provide json credentials based on file") {
+    jsonAssumption()
 
-    val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
-    val credential = jsonCreds.provider
-    assert(credential.refreshToken, "Failed to retrive a new access token.")
+    val jsonCred = new JsonConfigCredentials(jsonFilePath.get)
+    assert(jsonCred.provider.refreshToken, "Failed to retrieve new access 
token.")
+  }
+
+  test("should provide json credentials based on binary content") {
+    jsonAssumption()
+
+    val fileContent = Files.readAllBytes(Paths.get(jsonFilePath.get))
+    val jsonCred = JsonConfigCredentials(fileContent)
+    assert(jsonCred.provider.refreshToken, "Failed to retrieve new access 
token.")
   }
 
   test("should build p12 service account") {
-    p12Assumption
+    p12Assumption()
 
-    val p12Creds = ServiceAccountCredentials(
-      p12FilePath = p12FilePath, emailAccount = emailAccount)
-    assertResult(p12Creds) {
-      builder.p12ServiceAccount(p12FilePath.get, emailAccount.get).build()
-    }
+    assert(builder.p12ServiceAccount(p12FilePath.get, 
emailAccount.get).build() != null)
   }
 
-  test("should provide p12 creds") {
-    p12Assumption
+  test("should provide p12 credentials based on file") {
+    p12Assumption()
 
-    val p12Creds = ServiceAccountCredentials(
-      p12FilePath = p12FilePath, emailAccount = emailAccount)
-    val credential = p12Creds.provider
-    assert(credential.refreshToken, "Failed to retrive a new access token.")
+    val p12Cred = new EMailPrivateKeyCredentials(emailAccount.get, 
p12FilePath.get)
+    assert(p12Cred.provider.refreshToken, "Failed to retrieve new access 
token.")
+  }
+
+  test("should provide p12 credentials based on binary content") {
+    p12Assumption()
+
+    val fileContent = Files.readAllBytes(Paths.get(p12FilePath.get))
+    val p12Cred = EMailPrivateKeyCredentials(emailAccount.get, fileContent)
+    assert(p12Cred.provider.refreshToken, "Failed to retrieve new access 
token.")
   }
 
   test("should build metadata service account") {
-    val metadataCreds = ServiceAccountCredentials()
-    assertResult(metadataCreds) {
+    val metadataCred = MetadataServiceCredentials()
+    assertResult(metadataCred) {
       builder.metadataServiceAccount().build()
     }
   }
 
   test("SparkGCPCredentials classes should be serializable") {
-    jsonAssumption
-    p12Assumption
-
-    val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
-    val p12Creds = ServiceAccountCredentials(
-      p12FilePath = p12FilePath, emailAccount = emailAccount)
-    val metadataCreds = ServiceAccountCredentials()
-    assertResult(jsonCreds) {
-      Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds))
-    }
+    jsonAssumption()
+    p12Assumption()
 
-    assertResult(p12Creds) {
-      Utils.deserialize[ServiceAccountCredentials](Utils.serialize(p12Creds))
-    }
+    val jsonCred = new JsonConfigCredentials(jsonFilePath.get)
+    val p12Cred = new EMailPrivateKeyCredentials(emailAccount.get, 
p12FilePath.get)
+    val metadataCred = MetadataServiceCredentials()
+
+    val jsonCredDeserialized: JsonConfigCredentials = 
Utils.deserialize(Utils.serialize(jsonCred))
+    assert(jsonCredDeserialized != null)
 
-    assertResult(metadataCreds) {
-      
Utils.deserialize[ServiceAccountCredentials](Utils.serialize(metadataCreds))
+    val p12CredDeserialized: EMailPrivateKeyCredentials =
+      Utils.deserialize(Utils.serialize(p12Cred))
+    assert(p12CredDeserialized != null)
+
+    assertResult(metadataCred) {
+      Utils.deserialize(Utils.serialize(metadataCred))
     }
 
     assertResult(ApplicationDefaultCredentials) {
-      
Utils.deserialize[ServiceAccountCredentials](Utils.serialize(ApplicationDefaultCredentials))
+      Utils.deserialize(Utils.serialize(ApplicationDefaultCredentials))
     }
   }
-
 }

Reply via email to