This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 2deb82998 [Bug] Fix: Kerberos authentication issues may occur in Flink 
on YARN (#3951)
2deb82998 is described below

commit 2deb829982a53e53b802140f3d4e39b9c94441f9
Author: TiDra <[email protected]>
AuthorDate: Fri Aug 9 11:55:39 2024 +0800

    [Bug] Fix: Kerberos authentication issues may occur in Flink on YARN (#3951)
    
    * [Bug] Fix: Kerberos authentication issues may occur in Flink on YARN mode.
    
    * [Bug] Fix: Kerberos authentication issues may occur in Flink on YARN and 
remove unused code
    
    * change the code comments to English
---
 .../streampark/common/util/HadoopUtils.scala       |  20 +++-
 .../flink/client/impl/YarnApplicationClient.scala  |  23 +----
 .../flink/client/impl/YarnPerJobClient.scala       |  23 ++---
 .../flink/client/impl/YarnSessionClient.scala      |  43 ++-------
 .../flink/client/trait/YarnClientTrait.scala       | 103 +++++++++++++++------
 5 files changed, 111 insertions(+), 101 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index d0417fafe..482e1a2a2 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -232,10 +232,22 @@ object HadoopUtils extends Logger {
 
   def yarnClient: YarnClient = {
     if (reusableYarnClient == null || 
!reusableYarnClient.isInState(STATE.STARTED)) {
-      reusableYarnClient = YarnClient.createYarnClient
-      val yarnConf = new YarnConfiguration(hadoopConf)
-      reusableYarnClient.init(yarnConf)
-      reusableYarnClient.start()
+      // 使用doAs方法确保以下操作以ugi的身份执行
+      reusableYarnClient = Try {
+        getUgi().doAs(new PrivilegedAction[YarnClient]() {
+          override def run(): YarnClient = {
+            val yarnConf = new YarnConfiguration(hadoopConf);
+            val client = YarnClient.createYarnClient;
+            client.init(yarnConf);
+            client.start();
+            client
+          }
+        })
+      } match {
+        case Success(client) => client
+        case Failure(e) =>
+          throw new IllegalArgumentException(s"[StreamPark] access yarnClient 
error: $e")
+      }
     }
     reusableYarnClient
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 68571e9af..cc05b1e25 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -24,11 +24,11 @@ import 
org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.util.HadoopUtils
+import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.flink.yarn.configuration.{YarnConfigOptions, 
YarnDeploymentTarget}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
@@ -46,19 +46,6 @@ object YarnApplicationClient extends YarnClientTrait {
 
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
     super.setConfig(submitRequest, flinkConfig)
-    val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
-      submitRequest.flinkVersion.flinkHome)
-    val currentUser = UserGroupInformation.getCurrentUser
-    logDebug(s"UserGroupInformation currentUser: $currentUser")
-    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
-      logDebug(s"kerberos Security is Enabled...")
-      val useTicketCache =
-        
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
-      if (!HadoopUtils.areKerberosCredentialsValid(currentUser, 
useTicketCache)) {
-        throw new RuntimeException(
-          s"Hadoop security with Kerberos is enabled but the login user 
$currentUser does not have Kerberos credentials or delegation tokens!")
-      }
-    }
     val providedLibs = {
       val array = ListBuffer(
         submitRequest.hdfsWorkspace.flinkLib,
@@ -98,11 +85,8 @@ object YarnApplicationClient extends YarnClientTrait {
     SecurityUtils.install(new SecurityConfiguration(flinkConfig))
     SecurityUtils.getInstalledContext.runSecured(new Callable[SubmitResponse] {
       override def call(): SubmitResponse = {
-        val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
-        val clientFactory =
-          
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-
-        val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
+        val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
+          getYarnClusterDeployDescriptor(flinkConfig)
         logInfo(s"""
                    
|------------------------<<specification>>-------------------------
                    |$clusterSpecification
@@ -110,7 +94,6 @@ object YarnApplicationClient extends YarnClientTrait {
                    |""".stripMargin)
 
         val applicationConfiguration = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
-        val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
         val clusterClient = clusterDescriptor
           .deployApplicationCluster(clusterSpecification, 
applicationConfiguration)
           .getClusterClient
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 87123263f..75e6df2e7 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -21,7 +21,6 @@ import 
org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.util.FlinkUtils
 
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.PackagedProgram
 import org.apache.flink.configuration.{Configuration, DeploymentOptions}
 import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
@@ -59,22 +58,14 @@ object YarnPerJobClient extends YarnClientTrait {
 
     val flinkHome = submitRequest.flinkVersion.flinkHome
 
-    val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory =
-      
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-
-    val clusterDescriptor = {
-      val clusterDescriptor =
-        
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[YarnClusterDescriptor]
-      val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
-      clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
-      clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
-      clusterDescriptor
-    }
+    val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
+      getYarnClusterDeployDescriptor(flinkConfig)
+    val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
+    clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
+    clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
 
     var packagedProgram: PackagedProgram = null
     val clusterClient = {
-      val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
       logInfo(s"""
                  
|------------------------<<specification>>-------------------------
                  |$clusterSpecification
@@ -116,8 +107,8 @@ object YarnPerJobClient extends YarnClientTrait {
     flinkConf
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
     val response = super.doCancel(cancelRequest, flinkConf)
-    val clusterClientFactory = new YarnClusterClientFactory
-    val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
+    val (yarnClusterId: ApplicationId, clusterDescriptor: 
YarnClusterDescriptor) =
+      getYarnClusterDescriptor(flinkConf)
     
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
     response
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index faaa6e87a..e9afbf1d6 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -22,7 +22,7 @@ import 
org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
+import org.apache.flink.client.deployment.ClusterSpecification
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration._
 import org.apache.flink.runtime.util.HadoopUtils
@@ -61,21 +61,6 @@ object YarnSessionClient extends YarnClientTrait {
   private def deployClusterConfig(
       deployRequest: DeployRequest,
       flinkConfig: Configuration): Unit = {
-
-    val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
-      deployRequest.flinkVersion.flinkHome)
-    val currentUser = UserGroupInformation.getCurrentUser
-    logDebug(s"UserGroupInformation currentUser: $currentUser")
-    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
-      logDebug(s"kerberos Security is Enabled...")
-      val useTicketCache =
-        
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
-      if (!HadoopUtils.areKerberosCredentialsValid(currentUser, 
useTicketCache)) {
-        throw new RuntimeException(
-          s"Hadoop security with Kerberos is enabled but the login user 
$currentUser does not have Kerberos credentials or delegation tokens!")
-      }
-    }
-
     val shipFiles = new util.ArrayList[String]()
     shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
     shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
@@ -105,9 +90,8 @@ object YarnSessionClient extends YarnClientTrait {
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
-    val clusterDescriptor = yarnClusterDescriptor._2
-    val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
+    val (yarnClusterId: ApplicationId, clusterDescriptor: 
YarnClusterDescriptor) =
+      getYarnClusterDescriptor(flinkConfig)
     val programJobGraph = super.getJobGraph(flinkConfig, submitRequest, 
submitRequest.userJarFile)
     val packageProgram = programJobGraph._1
     val jobGraph = programJobGraph._2
@@ -160,7 +144,8 @@ object YarnSessionClient extends YarnClientTrait {
 
       deployClusterConfig(deployRequest, flinkConfig)
 
-      val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
+      val yarnClusterDescriptor = getYarnClusterDeployDescriptor(flinkConfig)
+      val clusterSpecification: ClusterSpecification = yarnClusterDescriptor._1
       clusterDescriptor = yarnClusterDescriptor._2
       if (StringUtils.isNotBlank(deployRequest.clusterId)) {
         try {
@@ -181,7 +166,7 @@ object YarnSessionClient extends YarnClientTrait {
           case e: Exception => return DeployResponse(error = e)
         }
       }
-      val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
+      val clientProvider = 
clusterDescriptor.deploySessionCluster(clusterSpecification)
       client = clientProvider.getClusterClient
       if (client.getWebInterfaceURL != null) {
         DeployResponse(
@@ -210,7 +195,8 @@ object YarnSessionClient extends YarnClientTrait {
           })
       flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, 
shutDownRequest.clusterId)
       flinkConfig.safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
-      val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
+      val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
+      val applicationId: ApplicationId = yarnClusterDescriptor._1
       clusterDescriptor = yarnClusterDescriptor._2
       if (
         FinalApplicationStatus.UNDEFINED.equals(
@@ -218,7 +204,7 @@ object YarnSessionClient extends YarnClientTrait {
             
.getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
             .getFinalApplicationStatus)
       ) {
-        val clientProvider = 
clusterDescriptor.retrieve(yarnClusterDescriptor._1)
+        val clientProvider = clusterDescriptor.retrieve(applicationId)
         client = clientProvider.getClusterClient
         client.shutDownCluster()
       }
@@ -233,15 +219,4 @@ object YarnSessionClient extends YarnClientTrait {
     }
   }
 
-  private[this] def getYarnSessionClusterDescriptor(
-      flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
-    val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory = 
serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
-    require(yarnClusterId != null)
-    val clusterDescriptor =
-      
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[YarnClusterDescriptor]
-    (yarnClusterId, clusterDescriptor)
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 4a19067e4..0f4cf020f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.client.`trait`
 
+import org.apache.streampark.common.util.HadoopUtils
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.bean._
 
@@ -32,8 +33,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId
 
 import java.lang.{Boolean => JavaBool}
 import java.lang.reflect.Method
+import java.security.PrivilegedAction
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 /** yarn application mode submit */
 trait YarnClientTrait extends FlinkClientTrait {
@@ -50,17 +52,12 @@ trait YarnClientTrait extends FlinkClientTrait {
       flinkConf: Configuration,
       actionFunc: (JobID, ClusterClient[_]) => O): O = {
     val jobID = getJobID(request.jobId)
-    val clusterClient = {
-      flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
-      val clusterClientFactory = new YarnClusterClientFactory
-      val applicationId = clusterClientFactory.getClusterId(flinkConf)
-      if (applicationId == null) {
-        throw new FlinkException(
-          "[StreamPark] getClusterClient error. No cluster id was specified. 
Please specify a cluster to which you would like to connect.")
-      }
-      val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
-      clusterDescriptor.retrieve(applicationId).getClusterClient
-    }
+    flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+    // Get the ClusterClient from the YarnClusterDescriptor
+    val (applicationId, clusterDescriptor: YarnClusterDescriptor) = 
getYarnClusterDescriptor(
+      flinkConf)
+    val clusterClient = 
clusterDescriptor.retrieve(applicationId).getClusterClient
+
     Try {
       actionFunc(jobID, clusterClient)
     }.recover {
@@ -125,22 +122,74 @@ trait YarnClientTrait extends FlinkClientTrait {
       .asInstanceOf[ClusterClientProvider[ApplicationId]]
   }
 
-  private[client] def getSessionClusterDescriptor[T <: 
ClusterDescriptor[ApplicationId]](
-      flinkConfig: Configuration): (ApplicationId, T) = {
-    val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory = 
serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
-    require(yarnClusterId != null)
-    val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
-    (yarnClusterId, clusterDescriptor)
+  /**
+   * Retrieves the YarnClusterDescriptor and the application ID.
+   *
+   * @param flinkConfig
+   *   the Flink configuration
+   * @return
+   *   a tuple containing the application ID and the YarnClusterDescriptor
+   */
+  private[client] def getYarnClusterDescriptor(
+      flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
+    // Set up Kerberos authentication
+    val ugi = HadoopUtils.getUgi()
+
+    // Wrap the operation in ugi.doAs()
+    val result = Try {
+      ugi.doAs(new PrivilegedAction[(ApplicationId, YarnClusterDescriptor)] {
+        override def run(): (ApplicationId, YarnClusterDescriptor) = {
+          val clientFactory = new YarnClusterClientFactory
+          // Get the cluster ID
+          val yarnClusterId: ApplicationId = 
clientFactory.getClusterId(flinkConfig)
+          require(yarnClusterId != null)
+          // Create the ClusterDescriptor
+          val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
+
+          (yarnClusterId, clusterDescriptor)
+        }
+      })
+    } match {
+      case Success(result) => result
+      case Failure(e) =>
+        throw new IllegalArgumentException(s"[StreamPark] access 
ClusterDescriptor error: $e")
+    }
+
+    result
   }
 
-  private[client] def getSessionClusterDeployDescriptor[T <: 
ClusterDescriptor[ApplicationId]](
-      flinkConfig: Configuration): (ClusterSpecification, T) = {
-    val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory = 
serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
-    val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
-    (clusterSpecification, clusterDescriptor)
+  /**
+   * Retrieves the ClusterSpecification and the YarnClusterDescriptor for 
deployment.
+   *
+   * @param flinkConfig
+   *   the Flink configuration
+   * @return
+   *   a tuple containing the ClusterSpecification and the 
YarnClusterDescriptor
+   */
+  private[client] def getYarnClusterDeployDescriptor(
+      flinkConfig: Configuration): (ClusterSpecification, 
YarnClusterDescriptor) = {
+    // Set up Kerberos authentication
+    val ugi = HadoopUtils.getUgi()
+
+    // Wrap the operation in ugi.doAs()
+    val result = Try {
+      ugi.doAs(new PrivilegedAction[(ClusterSpecification, 
YarnClusterDescriptor)] {
+        override def run(): (ClusterSpecification, YarnClusterDescriptor) = {
+          val clientFactory = new YarnClusterClientFactory
+          // Get the ClusterSpecification
+          val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
+          // Create the ClusterDescriptor
+          val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
+
+          (clusterSpecification, clusterDescriptor)
+        }
+      })
+    } match {
+      case Success(result) => result
+      case Failure(e) =>
+        throw new IllegalArgumentException(s"[StreamPark] access 
ClusterDescriptor error: $e")
+    }
+
+    result
   }
 }

Reply via email to