This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b3b3557ccbe [SPARK-39399][CORE][K8S] Fix proxy-user authentication for 
Spark on k8s in cluster deploy mode
b3b3557ccbe is described below

commit b3b3557ccbe53e34e0d0dbe3d21f49a230ee621b
Author: Shrikant Prasad <[email protected]>
AuthorDate: Wed Mar 8 11:33:39 2023 +0800

    [SPARK-39399][CORE][K8S] Fix proxy-user authentication for Spark on k8s in 
cluster deploy mode
    
    ### What changes were proposed in this pull request?
    
    The PR fixes the authentication failure of the proxy user on driver side 
while accessing kerberized hdfs through spark on k8s job. It follows the 
similar approach as it was done for Mesos: 
https://github.com/mesosphere/spark/pull/26
    
     ### Why are the changes needed?
    
    When we try to access the kerberized HDFS through a proxy user in Spark Job 
running in cluster deploy mode with Kubernetes resource manager, we encounter 
AccessControlException. This is because  authentication in driver is done using 
tokens of the proxy user and since proxy user doesn't have any delegation 
tokens on driver, auth fails.
    
    Further details:
    
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532063&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532063
    
     
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532135&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532135
    
     ### Does this PR introduce _any_ user-facing change?
    
    Yes, user will now be able to use proxy-user to access kerberized hdfs with 
Spark on K8s.
    
    ### How was this patch tested?
    
    The patch was tested by:
    
    1. Running job which accesses kerberized hdfs with proxy user in cluster 
mode and client mode with kubernetes resource manager.
    
    2. Running job which accesses kerberized hdfs without proxy user in cluster 
mode and client mode with kubernetes resource manager.
    
    3. Build and run test github action : 
https://github.com/shrprasa/spark/actions/runs/3051203625
    
    Closes #37880 from shrprasa/proxy_user_fix.
    
    Authored-by: Shrikant Prasad <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 47 +++++++++++++---------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d7443951e7f..7563b093522 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -158,24 +158,35 @@ private[spark] class SparkSubmit extends Logging {
 
     def doRunMain(): Unit = {
       if (args.proxyUser != null) {
-        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
-          UserGroupInformation.getCurrentUser())
-        try {
-          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
-            override def run(): Unit = {
-              runMain(args, uninitLog)
-            }
-          })
-        } catch {
-          case e: Exception =>
-            // Hadoop's AuthorizationException suppresses the exception's 
stack trace, which
-            // makes the message printed to the output by the JVM not very 
helpful. Instead,
-            // detect exceptions with empty stack traces here, and treat them 
differently.
-            if (e.getStackTrace().length == 0) {
-              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
-            } else {
-              throw e
-            }
+        // Here we are checking for client mode because when job is sumbitted 
in cluster
+        // deploy mode with k8s resource manager, the spark submit in the 
driver container
+        // is done in client mode.
+        val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
+          args.deployMode.equals("client") &&
+          args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver", 
false)
+        if (isKubernetesClusterModeDriver) {
+          logInfo("Running driver with proxy user. Cluster manager: 
Kubernetes")
+          SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog))
+        } else {
+          val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
+            UserGroupInformation.getCurrentUser())
+          try {
+            proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
+              override def run(): Unit = {
+                runMain(args, uninitLog)
+              }
+            })
+          } catch {
+            case e: Exception =>
+              // Hadoop's AuthorizationException suppresses the exception's 
stack trace, which
+              // makes the message printed to the output by the JVM not very 
helpful. Instead,
+              // detect exceptions with empty stack traces here, and treat 
them differently.
+              if (e.getStackTrace().length == 0) {
+                error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
+              } else {
+                throw e
+              }
+          }
         }
       } else {
         runMain(args, uninitLog)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to