This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 362ef94fa88 [SPARK-39399][CORE][K8S] Fix proxy-user authentication for 
Spark on k8s in cluster deploy mode
362ef94fa88 is described below

commit 362ef94fa88ec5fc0535e5eabbe915da4c2541c8
Author: Shrikant Prasad <[email protected]>
AuthorDate: Wed Mar 8 11:33:39 2023 +0800

    [SPARK-39399][CORE][K8S] Fix proxy-user authentication for Spark on k8s in 
cluster deploy mode
    
    ### What changes were proposed in this pull request?
    
    The PR fixes the authentication failure of the proxy user on driver side 
while accessing kerberized hdfs through spark on k8s job. It follows the 
similar approach as it was done for Mesos: 
https://github.com/mesosphere/spark/pull/26
    
     ### Why are the changes needed?
    
    When we try to access the kerberized HDFS through a proxy user in Spark Job 
running in cluster deploy mode with Kubernetes resource manager, we encounter 
AccessControlException. This is because  authentication in driver is done using 
tokens of the proxy user and since proxy user doesn't have any delegation 
tokens on driver, auth fails.
    
    Further details:
    
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532063&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532063
    
     
https://issues.apache.org/jira/browse/SPARK-25355?focusedCommentId=17532135&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17532135
    
     ### Does this PR introduce _any_ user-facing change?
    
    Yes, user will now be able to use proxy-user to access kerberized hdfs with 
Spark on K8s.
    
    ### How was this patch tested?
    
    The patch was tested by:
    
    1. Running job which accesses kerberized hdfs with proxy user in cluster 
mode and client mode with kubernetes resource manager.
    
    2. Running job which accesses kerberized hdfs without proxy user in cluster 
mode and client mode with kubernetes resource manager.
    
    3. Build and run test github action : 
https://github.com/shrprasa/spark/actions/runs/3051203625
    
    Closes #37880 from shrprasa/proxy_user_fix.
    
    Authored-by: Shrikant Prasad <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
    (cherry picked from commit b3b3557ccbe53e34e0d0dbe3d21f49a230ee621b)
    Signed-off-by: Kent Yao <[email protected]>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 47 +++++++++++++---------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 67a601b6bfc..eff39255407 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -157,24 +157,35 @@ private[spark] class SparkSubmit extends Logging {
 
     def doRunMain(): Unit = {
       if (args.proxyUser != null) {
-        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
-          UserGroupInformation.getCurrentUser())
-        try {
-          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
-            override def run(): Unit = {
-              runMain(args, uninitLog)
-            }
-          })
-        } catch {
-          case e: Exception =>
-            // Hadoop's AuthorizationException suppresses the exception's 
stack trace, which
-            // makes the message printed to the output by the JVM not very 
helpful. Instead,
-            // detect exceptions with empty stack traces here, and treat them 
differently.
-            if (e.getStackTrace().length == 0) {
-              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
-            } else {
-              throw e
-            }
+        // Here we are checking for client mode because when job is sumbitted 
in cluster
+        // deploy mode with k8s resource manager, the spark submit in the 
driver container
+        // is done in client mode.
+        val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
+          args.deployMode.equals("client") &&
+          args.toSparkConf().getBoolean("spark.kubernetes.submitInDriver", 
false)
+        if (isKubernetesClusterModeDriver) {
+          logInfo("Running driver with proxy user. Cluster manager: 
Kubernetes")
+          SparkHadoopUtil.get.runAsSparkUser(() => runMain(args, uninitLog))
+        } else {
+          val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
+            UserGroupInformation.getCurrentUser())
+          try {
+            proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
+              override def run(): Unit = {
+                runMain(args, uninitLog)
+              }
+            })
+          } catch {
+            case e: Exception =>
+              // Hadoop's AuthorizationException suppresses the exception's 
stack trace, which
+              // makes the message printed to the output by the JVM not very 
helpful. Instead,
+              // detect exceptions with empty stack traces here, and treat 
them differently.
+              if (e.getStackTrace().length == 0) {
+                error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
+              } else {
+                throw e
+              }
+          }
         }
       } else {
         runMain(args, uninitLog)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to