Repository: spark
Updated Branches:
  refs/heads/master 20278e719 -> 9b1f6c8ba


[SPARK-26304][SS] Add default value to spark.kafka.sasl.kerberos.service.name 
parameter

## What changes were proposed in this pull request?

spark.kafka.sasl.kerberos.service.name is an optional parameter but most of the 
time value `kafka` has to be set. As I've written in the jira the following 
reasoning is behind:
* Kafka's configuration guide suggest the same value: 
https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig
* It would be easier for spark users by providing less configuration
* Other streaming engines are doing the same

In this PR I've changed the parameter from optional to `WithDefault` and set 
`kafka` as default value.

## How was this patch tested?

Available unit tests + on cluster.

Closes #23254 from gaborgsomogyi/SPARK-26304.

Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b1f6c8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b1f6c8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b1f6c8b

Branch: refs/heads/master
Commit: 9b1f6c8bab5401258c653d4e2efb50e97c6d282f
Parents: 20278e7
Author: Gabor Somogyi <[email protected]>
Authored: Fri Dec 7 13:58:02 2018 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Dec 7 13:58:02 2018 -0800

----------------------------------------------------------------------
 .../spark/deploy/security/KafkaTokenUtil.scala  |  7 ++----
 .../apache/spark/internal/config/Kafka.scala    |  2 +-
 .../deploy/security/KafkaTokenUtilSuite.scala   | 24 --------------------
 .../sql/kafka010/KafkaSecurityHelper.scala      |  5 +---
 .../sql/kafka010/KafkaSecurityHelperSuite.scala | 15 ------------
 5 files changed, 4 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b1f6c8b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
index c890cee..aec0f72 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -143,14 +143,11 @@ private[spark] object KafkaTokenUtil extends Logging {
   }
 
   private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
-    val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
-    require(serviceName.nonEmpty, "Kerberos service name must be defined")
-
     val params =
       s"""
       |${getKrb5LoginModuleName} required
       | useKeyTab=true
-      | serviceName="${serviceName.get}"
+      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
       | keyTab="${sparkConf.get(KEYTAB).get}"
       | principal="${sparkConf.get(PRINCIPAL).get}";
       """.stripMargin.replace("\n", "")
@@ -166,7 +163,7 @@ private[spark] object KafkaTokenUtil extends Logging {
       s"""
       |${getKrb5LoginModuleName} required
       | useTicketCache=true
-      | serviceName="${serviceName.get}";
+      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
       """.stripMargin.replace("\n", "")
     logDebug(s"Krb ticket cache JAAS params: $params")
     params

http://git-wip-us.apache.org/repos/asf/spark/blob/9b1f6c8b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
index 85d74c2..064fc93 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -40,7 +40,7 @@ private[spark] object Kafka {
         "Kafka's JAAS config or in Kafka's config. For further details please 
see kafka " +
         "documentation. Only used to obtain delegation token.")
       .stringConf
-      .createOptional
+      .createWithDefault("kafka")
 
   val TRUSTSTORE_LOCATION =
     ConfigBuilder("spark.kafka.ssl.truststore.location")

http://git-wip-us.apache.org/repos/asf/spark/blob/9b1f6c8b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
index 682bebd..18aa537 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -36,7 +36,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   private val keyStorePassword = "keyStoreSecret"
   private val keyPassword = "keySecret"
   private val keytab = "/path/to/keytab"
-  private val kerberosServiceName = "kafka"
   private val principal = "[email protected]"
 
   private var sparkConf: SparkConf = null
@@ -96,7 +95,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
     sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
     sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
-    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
 
     val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
 
@@ -119,7 +117,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
     sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
     sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
-    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
 
     val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
 
@@ -143,7 +140,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
     sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
     sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
-    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
 
     val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
 
@@ -177,7 +173,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
     sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
     sparkConf.set(KEYTAB, keytab)
-    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
     sparkConf.set(PRINCIPAL, principal)
 
     val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
@@ -195,7 +190,6 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("createAdminClientProperties without keytab should set ticket cache 
dynamic jaas config") {
     sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
     sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
-    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
 
     val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
 
@@ -218,22 +212,4 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
 
     assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
   }
-
-  test("getKeytabJaasParams with keytab no service should throw exception") {
-    sparkConf.set(KEYTAB, keytab)
-
-    val thrown = intercept[IllegalArgumentException] {
-      KafkaTokenUtil.getKeytabJaasParams(sparkConf)
-    }
-
-    assert(thrown.getMessage contains "Kerberos service name must be defined")
-  }
-
-  test("getTicketCacheJaasParams without service should throw exception") {
-    val thrown = intercept[IllegalArgumentException] {
-      KafkaTokenUtil.getTicketCacheJaasParams(sparkConf)
-    }
-
-    assert(thrown.getMessage contains "Kerberos service name must be defined")
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9b1f6c8b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
index 74d5ef9..7215295 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.kafka010
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.kafka.common.security.scram.ScramLoginModule
 
 import org.apache.spark.SparkConf
@@ -35,8 +34,6 @@ private[kafka010] object KafkaSecurityHelper extends Logging {
   def getTokenJaasParams(sparkConf: SparkConf): String = {
     val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
       KafkaTokenUtil.TOKEN_SERVICE)
-    val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
-    require(serviceName.isDefined, "Kerberos service name must be defined")
     val username = new String(token.getIdentifier)
     val password = new String(token.getPassword)
 
@@ -45,7 +42,7 @@ private[kafka010] object KafkaSecurityHelper extends Logging {
       s"""
       |$loginModuleName required
       | tokenauth=true
-      | serviceName="${serviceName.get}"
+      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
       | username="$username"
       | password="$password";
       """.stripMargin.replace("\n", "")

http://git-wip-us.apache.org/repos/asf/spark/blob/9b1f6c8b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
index 772fe46..fd9dee3 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
@@ -26,12 +26,8 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.security.KafkaTokenUtil
 import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
-import org.apache.spark.internal.config._
 
 class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
-  private val keytab = "/path/to/keytab"
-  private val kerberosServiceName = "kafka"
-  private val principal = "[email protected]"
   private val tokenId = "tokenId" + UUID.randomUUID().toString
   private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
 
@@ -76,19 +72,8 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(KafkaSecurityHelper.isTokenAvailable())
   }
 
-  test("getTokenJaasParams with token no service should throw exception") {
-    addTokenToUGI()
-
-    val thrown = intercept[IllegalArgumentException] {
-      KafkaSecurityHelper.getTokenJaasParams(sparkConf)
-    }
-
-    assert(thrown.getMessage contains "Kerberos service name must be defined")
-  }
-
   test("getTokenJaasParams with token should return scram module") {
     addTokenToUGI()
-    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
 
     val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to