This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 94adffa [SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug
possibility
94adffa is described below
commit 94adffa8b160c0f0317df9675d0a1534e5f804cd
Author: Gabor Somogyi <[email protected]>
AuthorDate: Thu Apr 11 16:39:40 2019 -0700
[SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibility
## What changes were proposed in this pull request?
`Krb5LoginModule` supports debug parameter which is not yet supported from
Spark side. This configuration makes it easier to debug authentication issues
against Kafka.
In this PR `Krb5LoginModule` debug flag controlled by either
`sun.security.krb5.debug` or `com.ibm.security.krb5.Krb5Debug`.
Additionally found some hardcoded values like `ssl.truststore.location`,
etc... which could be error prone if Kafka changes it so in such cases Kafka
define used.
## How was this patch tested?
Existing + additional unit tests + on cluster.
Closes #24204 from gaborgsomogyi/SPARK-27270.
Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../org/apache/spark/kafka010/KafkaTokenUtil.scala | 24 +++++++++----
.../spark/kafka010/KafkaTokenUtilSuite.scala | 40 +++++++++++++---------
2 files changed, 42 insertions(+), 22 deletions(-)
diff --git
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index e5604f2..e0825e5 100644
---
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.{Token,
TokenIdentifier}
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient,
CreateDelegationTokenOptions}
-import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT,
SASL_SSL, SSL}
import org.apache.kafka.common.security.scram.ScramLoginModule
@@ -136,22 +136,22 @@ private[spark] object KafkaTokenUtil extends Logging {
private def setTrustStoreProperties(sparkConf: SparkConf, properties:
ju.Properties): Unit = {
sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
- properties.put("ssl.truststore.location", truststoreLocation)
+ properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
truststoreLocation)
}
sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
- properties.put("ssl.truststore.password", truststorePassword)
+ properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
truststorePassword)
}
}
private def setKeyStoreProperties(sparkConf: SparkConf, properties:
ju.Properties): Unit = {
sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
- properties.put("ssl.keystore.location", keystoreLocation)
+ properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation)
}
sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
- properties.put("ssl.keystore.password", keystorePassword)
+ properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword)
}
sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
- properties.put("ssl.key.password", keyPassword)
+ properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword)
}
}
@@ -159,6 +159,7 @@ private[spark] object KafkaTokenUtil extends Logging {
val params =
s"""
|${getKrb5LoginModuleName} required
+ | debug=${isGlobalKrbDebugEnabled()}
| useKeyTab=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
| keyTab="${sparkConf.get(KEYTAB).get}"
@@ -175,6 +176,7 @@ private[spark] object KafkaTokenUtil extends Logging {
val params =
s"""
|${getKrb5LoginModuleName} required
+ | debug=${isGlobalKrbDebugEnabled()}
| useTicketCache=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
""".stripMargin.replace("\n", "")
@@ -194,6 +196,16 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
+ private def isGlobalKrbDebugEnabled(): Boolean = {
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ val debug = System.getenv("com.ibm.security.krb5.Krb5Debug")
+ debug != null && debug.equalsIgnoreCase("all")
+ } else {
+ val debug = System.getenv("sun.security.krb5.debug")
+ debug != null && debug.equalsIgnoreCase("true")
+ }
+ }
+
private def printToken(token: DelegationToken): Unit = {
if (log.isDebugEnabled) {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
diff --git
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index 0a5af1d..763f8db 100644
---
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -21,7 +21,7 @@ import java.security.PrivilegedExceptionAction
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT,
SASL_SSL, SSL}
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -83,11 +83,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with
KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SASL_PLAINTEXT.name)
- assert(!adminClientProperties.containsKey("ssl.truststore.location"))
- assert(!adminClientProperties.containsKey("ssl.truststore.password"))
- assert(!adminClientProperties.containsKey("ssl.keystore.location"))
- assert(!adminClientProperties.containsKey("ssl.keystore.password"))
- assert(!adminClientProperties.containsKey("ssl.key.password"))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
}
test("createAdminClientProperties with SASL_SSL protocol should include
truststore config") {
@@ -105,11 +105,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with
KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SASL_SSL.name)
- assert(adminClientProperties.get("ssl.truststore.location") ===
trustStoreLocation)
- assert(adminClientProperties.get("ssl.truststore.password") ===
trustStorePassword)
- assert(!adminClientProperties.containsKey("ssl.keystore.location"))
- assert(!adminClientProperties.containsKey("ssl.keystore.password"))
- assert(!adminClientProperties.containsKey("ssl.key.password"))
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+ === trustStoreLocation)
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
+ === trustStorePassword)
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
+
assert(!adminClientProperties.containsKey(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
}
test("createAdminClientProperties with SSL protocol should include keystore
and truststore " +
@@ -128,11 +130,13 @@ class KafkaTokenUtilSuite extends SparkFunSuite with
KafkaDelegationTokenTest {
=== bootStrapServers)
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
=== SSL.name)
- assert(adminClientProperties.get("ssl.truststore.location") ===
trustStoreLocation)
- assert(adminClientProperties.get("ssl.truststore.password") ===
trustStorePassword)
- assert(adminClientProperties.get("ssl.keystore.location") ===
keyStoreLocation)
- assert(adminClientProperties.get("ssl.keystore.password") ===
keyStorePassword)
- assert(adminClientProperties.get("ssl.key.password") === keyPassword)
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+ === trustStoreLocation)
+ assert(adminClientProperties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
+ === trustStorePassword)
+ assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)
=== keyStoreLocation)
+ assert(adminClientProperties.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)
=== keyStorePassword)
+ assert(adminClientProperties.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) ===
keyPassword)
}
test("createAdminClientProperties with global config should not set dynamic
jaas config") {
@@ -165,7 +169,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with
KafkaDelegationTokenTest {
assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
val saslJaasConfig =
adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
assert(saslJaasConfig.contains("Krb5LoginModule required"))
+ assert(saslJaasConfig.contains(s"debug="))
assert(saslJaasConfig.contains("useKeyTab=true"))
+ assert(saslJaasConfig.contains(s"""keyTab="$keytab""""))
+ assert(saslJaasConfig.contains(s"""principal="$principal""""))
}
test("createAdminClientProperties without keytab should set ticket cache
dynamic jaas config") {
@@ -181,6 +188,7 @@ class KafkaTokenUtilSuite extends SparkFunSuite with
KafkaDelegationTokenTest {
assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
val saslJaasConfig =
adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
assert(saslJaasConfig.contains("Krb5LoginModule required"))
+ assert(saslJaasConfig.contains(s"debug="))
assert(saslJaasConfig.contains("useTicketCache=true"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]