This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir.git
The following commit(s) were added to refs/heads/master by this push:
new cc61a83 [BAHIR-141] Support GCP JSON key type as binary array
cc61a83 is described below
commit cc61a83a79d912f8eb842507ecca0b2d82f734e6
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Thu Jan 24 04:06:57 2019 -0800
[BAHIR-141] Support GCP JSON key type as binary array
Closes #82
Closes #53
---
streaming-pubsub/README.md | 8 +-
.../streaming/pubsub/SparkGCPCredentials.scala | 161 ++++++++++++---------
.../pubsub/SparkGCPCredentialsBuilderSuite.scala | 100 +++++++------
3 files changed, 149 insertions(+), 120 deletions(-)
diff --git a/streaming-pubsub/README.md b/streaming-pubsub/README.md
index a8f374f..f20e5c9 100644
--- a/streaming-pubsub/README.md
+++ b/streaming-pubsub/README.md
@@ -27,11 +27,13 @@ The `--packages` argument can also be used with
`bin/spark-submit`.
First you need to create credential by SparkGCPCredentials, it support four
type of credentials
* application default
`SparkGCPCredentials.builder.build()`
-* json type service account
+* JSON type service account (based on file or its binary content)
`SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
-* p12 type service account
+ `SparkGCPCredentials.builder.jsonServiceAccount(JSON_KEY_BYTES).build()`
+* P12 type service account
`SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY,
EMAIL_ACCOUNT).build()`
-* metadata service account(running on dataproc)
+ `SparkGCPCredentials.builder.p12ServiceAccount(P12_KEY_BYTES,
EMAIL_ACCOUNT).build()`
+* Metadata service account (running on dataproc)
`SparkGCPCredentials.builder.metadataServiceAccount().build()`
### Scala API
diff --git
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
index f15a521..4352d7f 100644
---
a/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
+++
b/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentials.scala
@@ -17,119 +17,146 @@
package org.apache.spark.streaming.pubsub
+import java.io.{ByteArrayInputStream, File, FileOutputStream}
+import java.nio.file.{Files, Paths}
+import java.util
+
import com.google.api.client.auth.oauth2.Credential
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
+import com.google.api.client.http.HttpTransport
import com.google.api.client.json.jackson.JacksonFactory
import com.google.api.services.pubsub.PubsubScopes
import com.google.cloud.hadoop.util.{CredentialFactory, HttpTransportFactory}
-import java.io.{ByteArrayInputStream, File, FileNotFoundException,
FileOutputStream}
-import java.nio.file.{Files, Paths}
-import java.util
-import org.apache.hadoop.conf.Configuration
/**
* Serializable interface providing a method executors can call to obtain an
* GCPCredentialsProvider instance for authenticating to GCP services.
*/
private[pubsub] sealed trait SparkGCPCredentials extends Serializable {
-
def provider: Credential
+
+ def jacksonFactory(): JacksonFactory = new JacksonFactory
+
+ def httpTransport(): HttpTransport =
HttpTransportFactory.createHttpTransport(
+ HttpTransportFactory.HttpTransportType.JAVA_NET, null
+ )
+
+ def scopes(): util.Collection[String] = PubsubScopes.all()
}
/**
- * Returns application default type credential
+ * Application default credentials.
*/
private[pubsub] final case object ApplicationDefaultCredentials extends
SparkGCPCredentials {
-
override def provider: Credential = {
GoogleCredential.getApplicationDefault.createScoped(PubsubScopes.all())
}
}
/**
- * Returns a Service Account type Credential instance.
- * If all parameters are None, then try metadata service type
- * If jsonFilePath available, try json type
- * If jsonFilePath is None and p12FilePath and emailAccount available, try p12
type
- *
- * @param jsonFilePath file path for json
- * @param p12FilePath file path for p12
- * @param emailAccount email account for p12
+ * Credentials based on JSON key file.
*/
-private[pubsub] final case class ServiceAccountCredentials(
- jsonFilePath: Option[String] = None,
- p12FilePath: Option[String] = None,
- emailAccount: Option[String] = None)
+private[pubsub] final case class JsonConfigCredentials(jsonContent:
Array[Byte])
extends SparkGCPCredentials {
- private val fileBytes = getFileBuffer
+ def this(jsonFilePath: String) =
this(Files.readAllBytes(Paths.get(jsonFilePath)))
override def provider: Credential = {
- val jsonFactory = new JacksonFactory
- val scopes = new util.ArrayList(PubsubScopes.all())
- val transport = HttpTransportFactory.createHttpTransport(
- HttpTransportFactory.HttpTransportType.JAVA_NET, null)
-
- if (!jsonFilePath.isEmpty) {
- val stream = new ByteArrayInputStream(fileBytes)
- CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential(
- GoogleCredential.fromStream(stream, transport, jsonFactory)
- .createScoped(scopes))
- } else if (!p12FilePath.isEmpty && !emailAccount.isEmpty) {
- val tempFile = File.createTempFile(emailAccount.get, ".p12")
- tempFile.deleteOnExit
- val p12Out = new FileOutputStream(tempFile)
- p12Out.write(fileBytes, 0, fileBytes.length)
- p12Out.close
-
- new CredentialFactory.GoogleCredentialWithRetry(
- new GoogleCredential.Builder().setTransport(transport)
- .setJsonFactory(jsonFactory)
- .setServiceAccountId(emailAccount.get)
- .setServiceAccountScopes(scopes)
- .setServiceAccountPrivateKeyFromP12File(tempFile)
- .setRequestInitializer(new
CredentialFactory.CredentialHttpRetryInitializer()))
- } else (new CredentialFactory).getCredentialFromMetadataServiceAccount
+ val stream = new ByteArrayInputStream(jsonContent)
+ val credentials =
CredentialFactory.GoogleCredentialWithRetry.fromGoogleCredential(
+ GoogleCredential.fromStream(
+ stream, httpTransport(), jacksonFactory()
+ ).createScoped(scopes())
+ )
+ stream.close()
+
+ credentials
}
+}
- private def getFileBuffer: Array[Byte] = {
- val filePath = jsonFilePath orElse p12FilePath
- if (filePath.isEmpty) Array[Byte]()
- else if (!Files.exists(Paths.get(filePath.get))) {
- throw new FileNotFoundException(s"The key file path(${filePath.get})
doesn't exist.")
- } else Files.readAllBytes(Paths.get(filePath.get))
+/**
+ * Credentials based on e-mail account and P12 key.
+ */
+private[pubsub] final case class EMailPrivateKeyCredentials(
+ emailAccount: String, p12Content: Array[Byte]
+ ) extends SparkGCPCredentials {
+ def this(emailAccount: String, p12FilePath: String) = {
+ this(emailAccount, Files.readAllBytes(Paths.get(p12FilePath)))
}
+ override def provider: Credential = {
+ val tempFile = File.createTempFile(emailAccount, ".p12")
+ tempFile.deleteOnExit()
+ val p12Out = new FileOutputStream(tempFile)
+ p12Out.write(p12Content, 0, p12Content.length)
+ p12Out.flush()
+ p12Out.close()
+
+ new CredentialFactory.GoogleCredentialWithRetry(
+ new GoogleCredential.Builder().setTransport(httpTransport())
+ .setJsonFactory(jacksonFactory())
+ .setServiceAccountId(emailAccount)
+ .setServiceAccountScopes(scopes())
+ .setServiceAccountPrivateKeyFromP12File(tempFile)
+ .setRequestInitializer(new
CredentialFactory.CredentialHttpRetryInitializer())
+ )
+ }
}
-object SparkGCPCredentials {
+/**
+ * Credentials based on metadata service.
+ */
+private[pubsub] final case class MetadataServiceCredentials() extends
SparkGCPCredentials {
+ override def provider: Credential = {
+ (new CredentialFactory).getCredentialFromMetadataServiceAccount
+ }
+}
+object SparkGCPCredentials {
/**
* Builder for SparkGCPCredentials instance.
*/
class Builder {
- private var creds: Option[SparkGCPCredentials] = None
+ private var credentials: Option[SparkGCPCredentials] = None
/**
- * Use a json type key file for service account credential
- *
- * @param jsonFilePath json type key file
+ * Use a JSON type key file for service account credential
+ * @param jsonFilePath JSON type key file
* @return Reference to this SparkGCPCredentials.Builder
*/
def jsonServiceAccount(jsonFilePath: String): Builder = {
- creds = Option(ServiceAccountCredentials(Option(jsonFilePath)))
+ credentials = Option(new JsonConfigCredentials(jsonFilePath))
this
}
/**
- * Use a p12 type key file service account credential
- *
- * @param p12FilePath p12 type key file
+ * Use a JSON type key file for service account credential
+ * @param jsonFileBuffer binary content of JSON type key file
+ * @return Reference to this SparkGCPCredentials.Builder
+ */
+ def jsonServiceAccount(jsonFileBuffer: Array[Byte]): Builder = {
+ credentials = Option(JsonConfigCredentials(jsonFileBuffer))
+ this
+ }
+
+ /**
+ * Use a P12 type key file service account credential
+ * @param p12FilePath P12 type key file
* @param emailAccount email of service account
* @return Reference to this SparkGCPCredentials.Builder
*/
def p12ServiceAccount(p12FilePath: String, emailAccount: String): Builder
= {
- creds = Option(ServiceAccountCredentials(
- p12FilePath = Option(p12FilePath), emailAccount =
Option(emailAccount)))
+ credentials = Option(new EMailPrivateKeyCredentials(emailAccount,
p12FilePath))
+ this
+ }
+
+ /**
+ * Use a P12 type key file service account credential
+ * @param p12FileBuffer binary content of P12 type key file
+ * @param emailAccount email of service account
+ * @return Reference to this SparkGCPCredentials.Builder
+ */
+ def p12ServiceAccount(p12FileBuffer: Array[Byte], emailAccount: String):
Builder = {
+ credentials = Option(EMailPrivateKeyCredentials(emailAccount,
p12FileBuffer))
this
}
@@ -138,27 +165,23 @@ object SparkGCPCredentials {
* @return Reference to this SparkGCPCredentials.Builder
*/
def metadataServiceAccount(): Builder = {
- creds = Option(ServiceAccountCredentials())
+ credentials = Option(MetadataServiceCredentials())
this
}
/**
* Returns the appropriate instance of SparkGCPCredentials given the
configured
* parameters.
- *
* - The service account credentials will be returned if they were
provided.
- *
* - The application default credentials will be returned otherwise.
- * @return
+ * @return SparkGCPCredentials object
*/
- def build(): SparkGCPCredentials =
creds.getOrElse(ApplicationDefaultCredentials)
-
+ def build(): SparkGCPCredentials =
credentials.getOrElse(ApplicationDefaultCredentials)
}
/**
* Creates a SparkGCPCredentials.Builder for constructing
* SparkGCPCredentials instance.
- *
* @return SparkGCPCredentials.Builder instance
*/
def builder: Builder = new Builder
diff --git
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
index be47e18..4f50639 100644
---
a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
+++
b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/SparkGCPCredentialsBuilderSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.streaming.pubsub
import java.nio.file.{Files, Paths}
-import org.scalatest.concurrent.TimeLimits
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.TimeLimits
-import org.apache.spark.util.Utils
import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.Utils
class SparkGCPCredentialsBuilderSuite
extends SparkFunSuite with TimeLimits with BeforeAndAfter{
@@ -33,17 +33,17 @@ class SparkGCPCredentialsBuilderSuite
private val p12FilePath =
sys.env.get(PubsubTestUtils.envVarNameForP12KeyPath)
private val emailAccount = sys.env.get(PubsubTestUtils.envVarNameForAccount)
- private def jsonAssumption {
+ private def jsonAssumption() {
assume(
- !jsonFilePath.isEmpty,
+ jsonFilePath.isDefined,
s"as the environment variable
${PubsubTestUtils.envVarNameForJsonKeyPath} is not set.")
}
- private def p12Assumption {
+ private def p12Assumption() {
assume(
- !p12FilePath.isEmpty,
+ p12FilePath.isDefined,
s"as the environment variable ${PubsubTestUtils.envVarNameForP12KeyPath}
is not set.")
assume(
- !emailAccount.isEmpty,
+ emailAccount.isDefined,
s"as the environment variable ${PubsubTestUtils.envVarNameForAccount} is
not set.")
}
@@ -52,71 +52,75 @@ class SparkGCPCredentialsBuilderSuite
}
test("should build json service account") {
- jsonAssumption
+ jsonAssumption()
- val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
- assertResult(jsonCreds) {
- builder.jsonServiceAccount(jsonFilePath.get).build()
- }
+ assert(builder.jsonServiceAccount(jsonFilePath.get).build() != null)
}
- test("should provide json creds") {
- jsonAssumption
+ test("should provide json credentials based on file") {
+ jsonAssumption()
- val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
- val credential = jsonCreds.provider
- assert(credential.refreshToken, "Failed to retrive a new access token.")
+ val jsonCred = new JsonConfigCredentials(jsonFilePath.get)
+ assert(jsonCred.provider.refreshToken, "Failed to retrieve new access
token.")
+ }
+
+ test("should provide json credentials based on binary content") {
+ jsonAssumption()
+
+ val fileContent = Files.readAllBytes(Paths.get(jsonFilePath.get))
+ val jsonCred = JsonConfigCredentials(fileContent)
+ assert(jsonCred.provider.refreshToken, "Failed to retrieve new access
token.")
}
test("should build p12 service account") {
- p12Assumption
+ p12Assumption()
- val p12Creds = ServiceAccountCredentials(
- p12FilePath = p12FilePath, emailAccount = emailAccount)
- assertResult(p12Creds) {
- builder.p12ServiceAccount(p12FilePath.get, emailAccount.get).build()
- }
+ assert(builder.p12ServiceAccount(p12FilePath.get,
emailAccount.get).build() != null)
}
- test("should provide p12 creds") {
- p12Assumption
+ test("should provide p12 credentials based on file") {
+ p12Assumption()
- val p12Creds = ServiceAccountCredentials(
- p12FilePath = p12FilePath, emailAccount = emailAccount)
- val credential = p12Creds.provider
- assert(credential.refreshToken, "Failed to retrive a new access token.")
+ val p12Cred = new EMailPrivateKeyCredentials(emailAccount.get,
p12FilePath.get)
+ assert(p12Cred.provider.refreshToken, "Failed to retrieve new access
token.")
+ }
+
+ test("should provide p12 credentials based on binary content") {
+ p12Assumption()
+
+ val fileContent = Files.readAllBytes(Paths.get(p12FilePath.get))
+ val p12Cred = EMailPrivateKeyCredentials(emailAccount.get, fileContent)
+ assert(p12Cred.provider.refreshToken, "Failed to retrieve new access
token.")
}
test("should build metadata service account") {
- val metadataCreds = ServiceAccountCredentials()
- assertResult(metadataCreds) {
+ val metadataCred = MetadataServiceCredentials()
+ assertResult(metadataCred) {
builder.metadataServiceAccount().build()
}
}
test("SparkGCPCredentials classes should be serializable") {
- jsonAssumption
- p12Assumption
-
- val jsonCreds = ServiceAccountCredentials(jsonFilePath = jsonFilePath)
- val p12Creds = ServiceAccountCredentials(
- p12FilePath = p12FilePath, emailAccount = emailAccount)
- val metadataCreds = ServiceAccountCredentials()
- assertResult(jsonCreds) {
- Utils.deserialize[ServiceAccountCredentials](Utils.serialize(jsonCreds))
- }
+ jsonAssumption()
+ p12Assumption()
- assertResult(p12Creds) {
- Utils.deserialize[ServiceAccountCredentials](Utils.serialize(p12Creds))
- }
+ val jsonCred = new JsonConfigCredentials(jsonFilePath.get)
+ val p12Cred = new EMailPrivateKeyCredentials(emailAccount.get,
p12FilePath.get)
+ val metadataCred = MetadataServiceCredentials()
+
+ val jsonCredDeserialized: JsonConfigCredentials =
Utils.deserialize(Utils.serialize(jsonCred))
+ assert(jsonCredDeserialized != null)
- assertResult(metadataCreds) {
-
Utils.deserialize[ServiceAccountCredentials](Utils.serialize(metadataCreds))
+ val p12CredDeserialized: EMailPrivateKeyCredentials =
+ Utils.deserialize(Utils.serialize(p12Cred))
+ assert(p12CredDeserialized != null)
+
+ assertResult(metadataCred) {
+ Utils.deserialize(Utils.serialize(metadataCred))
}
assertResult(ApplicationDefaultCredentials) {
-
Utils.deserialize[ServiceAccountCredentials](Utils.serialize(ApplicationDefaultCredentials))
+ Utils.deserialize(Utils.serialize(ApplicationDefaultCredentials))
}
}
-
}