This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ca45e8 [SPARK-26592][SS] Throw exception when kafka delegation token
tried to obtain with proxy user
5ca45e8 is described below
commit 5ca45e8a3db75907e8699e7dff63accbff5bfae5
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue Jan 15 10:00:01 2019 -0800
[SPARK-26592][SS] Throw exception when kafka delegation token tried to
obtain with proxy user
## What changes were proposed in this pull request?
Kafka is not yet support to obtain delegation token with proxy user. It has
to be turned off until https://issues.apache.org/jira/browse/KAFKA-6945
implemented.
In this PR an exception will be thrown when this situation happens.
## How was this patch tested?
Additional unit test.
Closes #23511 from gaborgsomogyi/SPARK-26592.
Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../apache/spark/deploy/security/KafkaTokenUtil.scala | 14 +++++++++++++-
.../spark/deploy/security/KafkaTokenUtilSuite.scala | 19 ++++++++++++++++++-
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
index aec0f72..f363853 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -17,12 +17,13 @@
package org.apache.spark.deploy.security
-import java.{ util => ju }
+import java.{util => ju}
import java.text.SimpleDateFormat
import scala.util.control.NonFatal
import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
@@ -33,6 +34,7 @@ import
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
}
private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <:
TokenIdentifier], Long) = {
+ checkProxyUser()
+
val adminClient =
AdminClient.create(createAdminClientProperties(sparkConf))
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
val createResult =
adminClient.createDelegationToken(createDelegationTokenOptions)
@@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging {
), token.tokenInfo.expiryTimestamp)
}
+ private[security] def checkProxyUser(): Unit = {
+ val currentUser = UserGroupInformation.getCurrentUser()
+ // Obtaining delegation token for proxy user is planned but not yet
implemented
+ // See https://issues.apache.org/jira/browse/KAFKA-6945
+ require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining
delegation token for proxy " +
+ "user is not yet supported.")
+ }
+
private[security] def createAdminClientProperties(sparkConf: SparkConf):
ju.Properties = {
val adminClientProperties = new ju.Properties
diff --git
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
index 18aa537..daa7e54 100644
---
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.deploy.security
-import java.{ util => ju }
+import java.{util => ju}
+import java.security.PrivilegedExceptionAction
import javax.security.auth.login.{AppConfigurationEntry, Configuration}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT,
SASL_SSL, SSL}
@@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with
BeforeAndAfterEach {
Configuration.setConfiguration(null)
}
+ test("checkProxyUser with proxy current user should throw exception") {
+ val realUser = UserGroupInformation.createUserForTesting("realUser",
Array())
+ UserGroupInformation.createProxyUserForTesting("proxyUser", realUser,
Array()).doAs(
+ new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ val thrown = intercept[IllegalArgumentException] {
+ KafkaTokenUtil.checkProxyUser()
+ }
+ assert(thrown.getMessage contains
+ "Obtaining delegation token for proxy user is not yet supported.")
+ }
+ }
+ )
+ }
+
test("createAdminClientProperties without bootstrap servers should throw
exception") {
val thrown = intercept[IllegalArgumentException] {
KafkaTokenUtil.createAdminClientProperties(sparkConf)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]