This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 94adffa [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility 94adffa is described below commit 94adffa8b160c0f0317df9675d0a1534e5f804cd Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Thu Apr 11 16:39:40 2019 -0700 [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility ## What changes were proposed in this pull request? `Krb5LoginModule` supports debug parameter which is not yet supported from Spark side. This configuration makes it easier to debug authentication issues against Kafka. In this PR `Krb5LoginModule` debug flag controlled by either `sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`. Additionally found some hardcoded values like `ssl.truststore.location`, etc... which could be error prone if Kafka changes it so in such cases Kafka define used. ## How was this patch tested? Existing + additional unit tests + on cluster. Closes #24204 from gaborgsomogyi/SPARK-27270. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 24 +++++++++---- .../spark/kafka010/KafkaTokenUtilSuite.scala | 40 +++++++++++++--------- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index e5604f2..e0825e5 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} -import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} import org.apache.kafka.common.security.scram.ScramLoginModule @@ -136,22 +136,22 @@ private[spark] object KafkaTokenUtil extends Logging { private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = { sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation => - properties.put("ssl.truststore.location", truststoreLocation) + properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation) } sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword => - properties.put("ssl.truststore.password", truststorePassword) + properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword) } } private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = { sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation => - properties.put("ssl.keystore.location", keystoreLocation) + properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation) } sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword => - properties.put("ssl.keystore.password", keystorePassword) + properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword) } sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword => - properties.put("ssl.key.password", keyPassword) + properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword) } } @@ -159,6 +159,7 @@ private[spark] object KafkaTokenUtil extends Logging { val params = s""" |${getKrb5LoginModuleName} required + | debug=${isGlobalKrbDebugEnabled()} | useKeyTab=true | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" | keyTab="${sparkConf.get(KEYTAB).get}" @@ -175,6 +176,7 @@ private[spark] object KafkaTokenUtil extends Logging { val params = s""" |${getKrb5LoginModuleName} required + | debug=${isGlobalKrbDebugEnabled()} | useTicketCache=true | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"; """.stripMargin.replace("\n", "") @@ -194,6 +196,16 @@ private[spark] object KafkaTokenUtil extends Logging { } } + private def isGlobalKrbDebugEnabled(): Boolean = { + if (System.getProperty("java.vendor").contains("IBM")) { + val debug = System.getenv("com.ibm.security.krb5.Krb5Debug") + debug != null && debug.equalsIgnoreCase("all") + } else { + val debug = System.getenv("sun.security.krb5.debug") + debug != null && debug.equalsIgnoreCase("true") + } + } + private def printToken(token: DelegationToken): Unit = { if (log.isDebugEnabled) { val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 0a5af1d..763f8db 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} import org.apache.spark.{SparkConf, SparkFunSuite} @@ -83,11 +83,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SASL_PLAINTEXT.name) - assert(!adminClientProperties.containsKey("ssl.truststore.location")) - assert(!adminClientProperties.containsKey("ssl.truststore.password")) - assert(!adminClientProperties.containsKey("ssl.keystore.location")) - assert(!adminClientProperties.containsKey("ssl.keystore.password")) - assert(!adminClientProperties.containsKey("ssl.key.password")) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) } test("createAdminClientProperties with SASL_SSL protocol should include truststore config") { @@ -105,11 +105,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SASL_SSL.name) - assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) - assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) - assert(!adminClientProperties.containsKey("ssl.keystore.location")) - assert(!adminClientProperties.containsKey("ssl.keystore.password")) - assert(!adminClientProperties.containsKey("ssl.key.password")) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + === trustStoreLocation) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + === trustStorePassword) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) + assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG)) } test("createAdminClientProperties with SSL protocol should include keystore and truststore " + @@ -128,11 +130,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { === bootStrapServers) assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) === SSL.name) - assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) - assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) - assert(adminClientProperties.get("ssl.keystore.location") === keyStoreLocation) - assert(adminClientProperties.get("ssl.keystore.password") === keyStorePassword) - assert(adminClientProperties.get("ssl.key.password") === keyPassword) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + === trustStoreLocation) + assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + === trustStorePassword) + assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) === keyStoreLocation) + assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === keyStorePassword) + assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === keyPassword) } test("createAdminClientProperties with global config should not set dynamic jaas config") { @@ -165,7 +169,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM)) val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG) assert(saslJaasConfig.contains("Krb5LoginModule required")) + assert(saslJaasConfig.contains(s"debug=")) assert(saslJaasConfig.contains("useKeyTab=true")) + assert(saslJaasConfig.contains(s"""keyTab="$keytab"""")) + assert(saslJaasConfig.contains(s"""principal="$principal"""")) } test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") { @@ -181,6 +188,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM)) val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG) assert(saslJaasConfig.contains("Krb5LoginModule required")) + assert(saslJaasConfig.contains(s"debug=")) assert(saslJaasConfig.contains("useTicketCache=true")) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org