This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b3b3557ccbe [SPARK-39399][CORE][K8S] Fix proxy-user authentication for
Spark on k8s in cluster deploy mode
b3b3557ccbe is described below
commit b3b3557ccbe53e34e0d0dbe3d21f49a230ee621b
Author: Shrikant Prasad <[email protected]>
AuthorDate: Wed Mar 8 11:33:39 2023 +0800
[SPARK-39399][CORE][K8S] Fix proxy-user authentication for Spark on k8s in
cluster deploy mode
### What changes were proposed in this pull request?
The PR fixes the authentication failure of the proxy user on driver side
while accessing kerberized hdfs through spark on k8s job. It follows the
similar approach as it was done for Mesos:
https://github.com/mesosphere/spark/pull/26
### Why are the changes needed?
When we try to access the kerberized HDFS through a proxy user in Spark Job
running in cluster deploy mode with Kubernetes resource manager, we encounter
AccessControlException. This is because authentication in driver is done using
tokens of the proxy user and since proxy user doesn't have any delegation
tokens on driver, auth fails.
Further details:
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532063&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532063
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532135&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532135
### Does this PR introduce _any_ user-facing change?
Yes, user will now be able to use proxy-user to access kerberized hdfs with
Spark on K8s.
### How was this patch tested?
The patch was tested by:
1. Running job which accesses kerberized hdfs with proxy user in cluster
mode and client mode with kubernetes resource manager.
2. Running job which accesses kerberized hdfs without proxy user in cluster
mode and client mode with kubernetes resource manager.
3. Build and run test github action :
https://github.com/shrprasa/spark/actions/runs/3051203625
Closes #37880 from shrprasa/proxy_user_fix.
Authored-by: Shrikant Prasad <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../org/apache/spark/deploy/SparkSubmit.scala | 47 +++++++++++++---------
1 file changed, 29 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d7443951e7f..7563b093522 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,24 +158,35 @@ private[spark] class SparkSubmit extends Logging {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
- val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
- UserGroupInformation.getCurrentUser())
- try {
- proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
- override def run(): Unit = {
- runMain(args, uninitLog)
- }
- })
- } catch {
- case e: Exception =>
- // Hadoop's AuthorizationException suppresses the exception's
stack trace, which
- // makes the message printed to the output by the JVM not very
helpful. Instead,
- // detect exceptions with empty stack traces here, and treat them
differently.
- if (e.getStackTrace().length == 0) {
- error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
- } else {
- throw e
- }
+ // Here we are checking for client mode because when job is sumbitted
in cluster
+ // deploy mode with k8s resource manager, the spark submit in the
driver container
+ // is done in client mode.
+ val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
+ args.deployMode.equals("client") &&
+ args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver",
false)
+ if (isKubernetesClusterModeDriver) {
+ logInfo("Running driver with proxy user. Cluster manager:
Kubernetes")
+ SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog))
+ } else {
+ val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
+ UserGroupInformation.getCurrentUser())
+ try {
+ proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
+ runMain(args, uninitLog)
+ }
+ })
+ } catch {
+ case e: Exception =>
+ // Hadoop's AuthorizationException suppresses the exception's
stack trace, which
+ // makes the message printed to the output by the JVM not very
helpful. Instead,
+ // detect exceptions with empty stack traces here, and treat
them differently.
+ if (e.getStackTrace().length == 0) {
+ error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
+ } else {
+ throw e
+ }
+ }
}
} else {
runMain(args, uninitLog)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]