This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new bb604e48b [Improve][Bug] dev: Kerberos authentication support in Flink 
on YARN (#4005)
bb604e48b is described below

commit bb604e48b6a3bcefc362d7e87845b80b9e8eeda9
Author: TiDra <[email protected]>
AuthorDate: Fri Sep 6 23:55:48 2024 +0800

    [Improve][Bug] dev: Kerberos authentication support in Flink on YARN (#4005)
    
    * [Bug] Kerberos authentication issues may occur in Flink on YARN
    
    * [Improve] configHolder get value bug fixed.
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../common/conf/InternalConfigHolder.scala         |  22 ++--
 .../streampark/common/util/HadoopUtils.scala       |  19 ++-
 .../console/core/runner/EnvInitializer.java        |   2 +-
 .../flink/client/impl/YarnApplicationClient.scala  |  89 ++++---------
 .../flink/client/impl/YarnPerJobClient.scala       |  30 ++---
 .../flink/client/impl/YarnSessionClient.scala      |  99 ++-------------
 .../flink/client/trait/YarnClientTrait.scala       | 137 ++++++++++++++-------
 7 files changed, 171 insertions(+), 227 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index e35971ff5..977baf662 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -42,7 +42,9 @@ object InternalConfigHolder extends Logger {
     new ConcurrentHashMap[String, InternalOption](initialCapacity)
 
   /** Initialize the ConfigHub. */
-  Seq(CommonConfig, K8sFlinkConfig)
+  def initConfigHub(): Unit = {
+    Seq(CommonConfig, K8sFlinkConfig)
+  }
 
   /** Register the ConfigOption */
   private[conf] def register(@Nonnull conf: InternalOption): Unit = {
@@ -65,13 +67,19 @@ object InternalConfigHolder extends Logger {
    */
   @Nonnull
   def get[T](@Nonnull conf: InternalOption): T = {
-    confData.get(conf.key) match {
-      case null =>
-        SystemPropertyUtils.get(conf.key) match {
-          case v if v != null => v.cast[T](conf.classType)
-          case _ => conf.defaultValue.asInstanceOf[T]
+    val value = confData.get(conf.key)
+    if (value == null || value == conf.defaultValue) {
+      val v = SystemPropertyUtils.get(conf.key)
+      if (v != null) {
+        if (v != value) {
+          set(conf, v)
         }
-      case v => v.asInstanceOf[T]
+        v.cast[T](conf.classType)
+      } else {
+        conf.defaultValue.asInstanceOf[T]
+      }
+    } else {
+      value.toString.cast[T](conf.classType)
     }
   }
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 8ed2b807d..9262256c9 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -233,10 +233,21 @@ object HadoopUtils extends Logger {
 
   def yarnClient: YarnClient = {
     if (reusableYarnClient == null || 
!reusableYarnClient.isInState(STATE.STARTED)) {
-      reusableYarnClient = YarnClient.createYarnClient
-      val yarnConf = new YarnConfiguration(hadoopConf)
-      reusableYarnClient.init(yarnConf)
-      reusableYarnClient.start()
+      reusableYarnClient = Try {
+        getUgi().doAs(new PrivilegedAction[YarnClient]() {
+          override def run(): YarnClient = {
+            val yarnConf = new YarnConfiguration(hadoopConf);
+            val client = YarnClient.createYarnClient;
+            client.init(yarnConf);
+            client.start();
+            client
+          }
+        })
+      } match {
+        case Success(client) => client
+        case Failure(e) =>
+          throw new IllegalArgumentException(s"[StreamPark] access yarnClient 
error: $e")
+      }
     }
     reusableYarnClient
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c3a92bbbf..4e5ba38ac 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -89,8 +89,8 @@ public class EnvInitializer implements ApplicationRunner {
     }
 
     private void initConfig() {
-
         Environment env = context.getEnvironment();
+        InternalConfigHolder.initConfigHub();
         // override config from spring application.yaml
         InternalConfigHolder.keys().stream()
             .filter(env::containsProperty)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index d37808d79..645c6837e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -27,17 +27,13 @@ import 
org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 
-import org.apache.commons.lang3.StringUtils
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration._
 import org.apache.flink.python.PythonOptions
-import org.apache.flink.runtime.util.HadoopUtils
+import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.flink.yarn.configuration.YarnConfigOptions
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
-import java.security.PrivilegedAction
 import java.util
 import java.util.Collections
 
@@ -50,18 +46,6 @@ object YarnApplicationClient extends YarnClientTrait {
 
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
     super.setConfig(submitRequest, flinkConfig)
-    val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
-      submitRequest.flinkVersion.flinkHome)
-    val currentUser = UserGroupInformation.getCurrentUser
-    logDebug(s"UserGroupInformation currentUser: $currentUser")
-    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
-      logDebug(s"kerberos Security is Enabled...")
-      val useTicketCache =
-        
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
-      AssertUtils.required(
-        HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
-        s"Hadoop security with Kerberos is enabled but the login user 
$currentUser does not have Kerberos credentials or delegation tokens!")
-    }
     val providedLibs = {
       val array = ListBuffer(
         submitRequest.hdfsWorkspace.flinkLib,
@@ -140,53 +124,32 @@ object YarnApplicationClient extends YarnClientTrait {
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    var proxyUserUgi: UserGroupInformation = 
UserGroupInformation.getCurrentUser
-    val currentUser = UserGroupInformation.getCurrentUser
-    val enableProxyState =
-      !HadoopUtils.isKerberosSecurityEnabled(currentUser) && StringUtils
-        .isNotEmpty(submitRequest.hadoopUser)
-    if (enableProxyState) {
-      proxyUserUgi = UserGroupInformation.createProxyUser(
-        submitRequest.hadoopUser,
-        currentUser)
-    }
+    val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) = 
getYarnClusterDeployDescriptor(flinkConfig, submitRequest.hadoopUser)
+    logInfo(s"""
+               
|------------------------<<specification>>-------------------------
+               |$clusterSpecification
+               
|------------------------------------------------------------------
+               |""".stripMargin)
 
-    proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
-      override def run(): SubmitResponse = {
-        val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
-        val clientFactory =
-          
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-        val clusterDescriptor =
-          clientFactory.createClusterDescriptor(flinkConfig)
-        val clusterSpecification =
-          clientFactory.getClusterSpecification(flinkConfig)
-        logInfo(s"""
-                   
|------------------------<<specification>>-------------------------
-                   |$clusterSpecification
-                   
|------------------------------------------------------------------
-                   |""".stripMargin)
-
-        val applicationConfiguration =
-          ApplicationConfiguration.fromConfiguration(flinkConfig)
-        var applicationId: ApplicationId = null
-        var jobManagerUrl: String = null
-        val clusterClient = clusterDescriptor
-          .deployApplicationCluster(clusterSpecification, 
applicationConfiguration)
-          .getClusterClient
-        applicationId = clusterClient.getClusterId
-        jobManagerUrl = clusterClient.getWebInterfaceURL
-        logInfo(s"""
-                   
|-------------------------<<applicationId>>------------------------
-                   |Flink Job Started: applicationId: $applicationId
-                   
|__________________________________________________________________
-                   |""".stripMargin)
-
-        val resp =
-          SubmitResponse(applicationId.toString, flinkConfig.toMap, 
jobManagerUrl = jobManagerUrl)
-        closeSubmit(submitRequest, clusterClient, clusterDescriptor)
-        resp
-      }
-    })
+    val applicationConfiguration =
+      ApplicationConfiguration.fromConfiguration(flinkConfig)
+    var applicationId: ApplicationId = null
+    var jobManagerUrl: String = null
+    val clusterClient = clusterDescriptor
+      .deployApplicationCluster(clusterSpecification, applicationConfiguration)
+      .getClusterClient
+    applicationId = clusterClient.getClusterId
+    jobManagerUrl = clusterClient.getWebInterfaceURL
+    logInfo(s"""
+               
|-------------------------<<applicationId>>------------------------
+               |Flink Job Started: applicationId: $applicationId
+               
|__________________________________________________________________
+               |""".stripMargin)
+
+    val resp =
+      SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl 
= jobManagerUrl)
+    closeSubmit(submitRequest, clusterClient, clusterDescriptor)
+    resp
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 557bb9967..5e8d6dfd0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -22,10 +22,9 @@ import 
org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.util.FlinkUtils
 
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.PackagedProgram
 import org.apache.flink.configuration.{Configuration, DeploymentOptions}
-import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
+import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
 import org.apache.hadoop.fs.{Path => HadoopPath}
@@ -55,28 +54,16 @@ object YarnPerJobClient extends YarnClientTrait {
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-
     val flinkHome = submitRequest.flinkVersion.flinkHome
 
-    val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory =
-      
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-
-    val clusterDescriptor = {
-      val clusterDescriptor =
-        clientFactory
-          .createClusterDescriptor(flinkConfig)
-          .asInstanceOf[YarnClusterDescriptor]
-      val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
-      clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
-      clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
-      clusterDescriptor
-    }
+    val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
+      getYarnClusterDeployDescriptor(flinkConfig, submitRequest.hadoopUser)
+    val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
+    clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
+    clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
 
     var packagedProgram: PackagedProgram = null
     val clusterClient = {
-      val clusterSpecification =
-        clientFactory.getClusterSpecification(flinkConfig)
       logInfo(s"""
                  
|------------------------<<specification>>-------------------------
                  |$clusterSpecification
@@ -119,9 +106,8 @@ object YarnPerJobClient extends YarnClientTrait {
       cancelRequest: CancelRequest,
       flinkConfig: Configuration): CancelResponse = {
     val response = super.doCancel(cancelRequest, flinkConfig)
-    val clusterClientFactory = new YarnClusterClientFactory
-    val clusterDescriptor =
-      clusterClientFactory.createClusterDescriptor(flinkConfig)
+    val (yarnClusterId: ApplicationId, clusterDescriptor: 
YarnClusterDescriptor) =
+      getYarnClusterDescriptor(flinkConfig)
     
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
     response
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index a5af099cd..6693997bd 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -17,20 +17,16 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.util.{AssertUtils, Utils}
 import org.apache.streampark.common.util.Implicits._
+import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration._
-import org.apache.flink.runtime.util.HadoopUtils
 import org.apache.flink.yarn.YarnClusterDescriptor
 import org.apache.flink.yarn.configuration.{YarnConfigOptions, 
YarnDeploymentTarget}
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.{ApplicationId, 
FinalApplicationStatus}
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.ConverterUtils
@@ -60,19 +56,6 @@ object YarnSessionClient extends YarnClientTrait {
    * @param flinkConfig
    */
   def deployClusterConfig(deployRequest: DeployRequest, flinkConfig: 
Configuration): Unit = {
-    val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
-      deployRequest.flinkVersion.flinkHome)
-    val currentUser = UserGroupInformation.getCurrentUser
-    logDebug(s"UserGroupInformation currentUser: $currentUser")
-    if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
-      logDebug(s"kerberos Security is Enabled...")
-      val useTicketCache =
-        
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
-      AssertUtils.required(
-        HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
-        s"Hadoop security with Kerberos is enabled but the login user 
$currentUser does not have Kerberos credentials or delegation tokens!")
-    }
-
     val shipFiles = new util.ArrayList[String]()
     shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
     shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
@@ -98,9 +81,7 @@ object YarnSessionClient extends YarnClientTrait {
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
-    val clusterDescriptor = yarnClusterDescriptor._2
-    val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
+    val (yarnClusterId: ApplicationId, clusterDescriptor: 
YarnClusterDescriptor) = getYarnClusterDescriptor(flinkConfig)
     val programJobGraph =
       super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile)
     val packageProgram = programJobGraph._1
@@ -119,59 +100,20 @@ object YarnSessionClient extends YarnClientTrait {
     resp
   }
 
-  private[this] def executeClientAction[O, R <: SavepointRequestTrait](
-      savepointRequestTrait: R,
-      flinkConfig: Configuration,
-      actFunc: (JobID, ClusterClient[_]) => O): O = {
-    flinkConfig
-      .safeSet(YarnConfigOptions.APPLICATION_ID, 
savepointRequestTrait.clusterId)
-      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
-    logInfo(s"""
-               
|------------------------------------------------------------------
-               |Effective submit configuration: $flinkConfig
-               
|------------------------------------------------------------------
-               |""".stripMargin)
-
-    var clusterDescriptor: YarnClusterDescriptor = null
-    var client: ClusterClient[ApplicationId] = null
-    try {
-      val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
-      clusterDescriptor = yarnClusterDescriptor._2
-      client = 
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
-      actFunc(JobID.fromHexString(savepointRequestTrait.jobId), client)
-    } catch {
-      case e: Exception =>
-        logError(s"${savepointRequestTrait.getClass.getSimpleName} for flink 
yarn session job fail")
-        e.printStackTrace()
-        throw e
-    } finally {
-      Utils.close(client, clusterDescriptor)
-    }
-  }
-
   override def doCancel(
       cancelRequest: CancelRequest,
       flinkConfig: Configuration): CancelResponse = {
-    executeClientAction(
-      cancelRequest,
-      flinkConfig,
-      (jobID, clusterClient) => {
-        val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient)
-        CancelResponse(actionResult)
-      })
+    flinkConfig
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    super.doCancel(cancelRequest, flinkConfig)
   }
 
   override def doTriggerSavepoint(
-      savepointRequest: TriggerSavepointRequest,
+      request: TriggerSavepointRequest,
       flinkConfig: Configuration): SavepointResponse = {
-    executeClientAction(
-      savepointRequest,
-      flinkConfig,
-      (jobID, clusterClient) => {
-        val actionResult =
-          super.triggerSavepoint(savepointRequest, jobID, clusterClient)
-        SavepointResponse(actionResult)
-      })
+    flinkConfig
+      .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+    super.doTriggerSavepoint(request, flinkConfig)
   }
 
   def deploy(deployRequest: DeployRequest): DeployResponse = {
@@ -191,7 +133,7 @@ object YarnSessionClient extends YarnClientTrait {
       val flinkConfig =
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
       deployClusterConfig(deployRequest, flinkConfig)
-      val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
+      val yarnClusterDescriptor = getYarnClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       if (StringUtils.isNotBlank(deployRequest.clusterId)) {
         try {
@@ -213,13 +155,12 @@ object YarnSessionClient extends YarnClientTrait {
             logInfo("this applicationId have not managed by yarn ,need deploy 
...")
         }
       }
-      val clientProvider =
-        clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
+      val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
       client = clientProvider.getClusterClient
       if (client.getWebInterfaceURL != null) {
         DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
       } else {
-        null
+        DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
       }
     } catch {
       case e: Exception =>
@@ -243,7 +184,7 @@ object YarnSessionClient extends YarnClientTrait {
         })
       flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, 
shutDownRequest.clusterId)
       flinkConfig.safeSet(DeploymentOptions.TARGET, 
YarnDeploymentTarget.SESSION.getName)
-      val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
+      val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
       val shutDownState = FinalApplicationStatus.UNDEFINED.equals(
         clusterDescriptor.getYarnClient
@@ -269,18 +210,4 @@ object YarnSessionClient extends YarnClientTrait {
     }
   }
 
-  private[this] def getYarnSessionClusterDescriptor(
-      flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
-    val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory =
-      serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
-    require(yarnClusterId != null)
-    val clusterDescriptor =
-      clientFactory
-        .createClusterDescriptor(flinkConfig)
-        .asInstanceOf[YarnClusterDescriptor]
-    (yarnClusterId, clusterDescriptor)
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 853bc378b..e9c5d1b5e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,24 +17,26 @@
 
 package org.apache.streampark.flink.client.`trait`
 
-import org.apache.streampark.common.util.{AssertUtils, ExceptionUtils}
+import org.apache.streampark.common.util.{ExceptionUtils, HadoopUtils}
 import org.apache.streampark.common.util.Implicits._
 import org.apache.streampark.flink.client.bean._
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.client.deployment.{ClusterDescriptor, 
ClusterSpecification, DefaultClusterClientServiceLoader}
+import org.apache.flink.client.deployment.ClusterSpecification
 import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.util.FlinkException
 import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
 import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
 import java.lang.{Boolean => JavaBool}
 import java.lang.reflect.Method
+import java.security.PrivilegedAction
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 /** yarn application mode submit */
 trait YarnClientTrait extends FlinkClientTrait {
@@ -47,29 +49,23 @@ trait YarnClientTrait extends FlinkClientTrait {
   }
 
   private[this] def executeClientAction[R <: SavepointRequestTrait, O](
-      savepointRequestTrait: R,
+      request: R,
       flinkConf: Configuration,
       actionFunc: (JobID, ClusterClient[_]) => O): O = {
-
-    flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, 
savepointRequestTrait.clusterId)
-    val clusterClientFactory = new YarnClusterClientFactory
-    val applicationId = clusterClientFactory.getClusterId(flinkConf)
-    AssertUtils.required(
-      applicationId != null,
-      "[StreamPark] getClusterClient error. No cluster id was specified. 
Please specify a cluster to which you would like to connect.")
-
-    val clusterDescriptor =
-      clusterClientFactory.createClusterDescriptor(flinkConf)
-    clusterDescriptor
-      .retrieve(applicationId)
-      .getClusterClient
-      .autoClose(client =>
-        Try(actionFunc(getJobID(savepointRequestTrait.jobId), client)).recover 
{
-          case e =>
-            throw new FlinkException(
-              s"[StreamPark] Do 
${savepointRequestTrait.getClass.getSimpleName} for the job 
${savepointRequestTrait.jobId} failed. " +
-                s"detail: ${ExceptionUtils.stringifyException(e)}");
-        }.get)
+    val jobID = getJobID(request.jobId)
+    flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+    // Get the ClusterClient from the YarnClusterDescriptor
+    val (applicationId: ApplicationId, clusterDescriptor: 
YarnClusterDescriptor) = getYarnClusterDescriptor(flinkConf)
+    val clusterClient = 
clusterDescriptor.retrieve(applicationId).getClusterClient
+
+    Try {
+      actionFunc(jobID, clusterClient)
+    }.recover {
+      case e =>
+        throw new FlinkException(
+          s"[StreamPark] Do ${request.getClass.getSimpleName} for the job 
${request.jobId} failed. " +
+            s"detail: ${ExceptionUtils.stringifyException(e)}");
+    }.get
   }
 
   override def doTriggerSavepoint(
@@ -124,27 +120,80 @@ trait YarnClientTrait extends FlinkClientTrait {
       .asInstanceOf[ClusterClientProvider[ApplicationId]]
   }
 
-  private[client] def getSessionClusterDescriptor[T <: 
ClusterDescriptor[ApplicationId]](
-      flinkConfig: Configuration): (ApplicationId, T) = {
-    val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory =
-      serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
-    require(yarnClusterId != null)
-    val clusterDescriptor =
-      clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
-    (yarnClusterId, clusterDescriptor)
+  /**
+   * Retrieves the YarnClusterDescriptor and the application ID.
+   *
+   * @param flinkConfig
+   *   the Flink configuration
+   * @return
+   *   a tuple containing the application ID and the YarnClusterDescriptor
+   */
+  private[client] def getYarnClusterDescriptor(
+      flinkConfig: Configuration,
+      user: String = ""): (ApplicationId, YarnClusterDescriptor) = {
+    Try {
+      doAsYarnClusterDescriptor[ApplicationId](
+        user,
+        () => {
+          val clientFactory = new YarnClusterClientFactory
+          // Get the cluster ID
+          val yarnClusterId: ApplicationId = 
clientFactory.getClusterId(flinkConfig)
+          require(yarnClusterId != null)
+          // Create the ClusterDescriptor
+          val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
+          (yarnClusterId, clusterDescriptor)
+        })
+    } match {
+      case Success(result) => result
+      case Failure(e) =>
+        throw new IllegalArgumentException(s"[StreamPark] access 
ClusterDescriptor error: $e")
+    }
+  }
+
+  /**
+   * Retrieves the ClusterSpecification and the YarnClusterDescriptor for 
deployment.
+   *
+   * @param flinkConfig
+   *   the Flink configuration
+   * @return
+   *   a tuple containing the ClusterSpecification and the 
YarnClusterDescriptor
+   */
+  private[client] def getYarnClusterDeployDescriptor(
+      flinkConfig: Configuration,
+      user: String = ""): (ClusterSpecification, YarnClusterDescriptor) = {
+    Try {
+      doAsYarnClusterDescriptor[ClusterSpecification](
+        user,
+        () => {
+          val clientFactory = new YarnClusterClientFactory
+          // Get the ClusterSpecification
+          val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
+          // Create the ClusterDescriptor
+          val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
+          clusterSpecification -> clusterDescriptor
+        })
+    } match {
+      case Success(result) => result
+      case Failure(e) =>
+        throw new IllegalArgumentException(s"[StreamPark] access 
ClusterDescriptor error: $e")
+    }
   }
 
-  private[client] def getSessionClusterDeployDescriptor[T <: 
ClusterDescriptor[ApplicationId]](
-      flinkConfig: Configuration): (ClusterSpecification, T) = {
-    val serviceLoader = new DefaultClusterClientServiceLoader
-    val clientFactory =
-      serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-    val clusterSpecification =
-      clientFactory.getClusterSpecification(flinkConfig)
-    val clusterDescriptor =
-      clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
-    (clusterSpecification, clusterDescriptor)
+  private[this] def doAsYarnClusterDescriptor[T](
+      user: String,
+      func: () => (T, YarnClusterDescriptor)): (T, YarnClusterDescriptor) = {
+    // Wrap the operation in ugi.doAs()
+    val ugi = HadoopUtils.getUgi()
+    val finalUgi = if (user != null && user.nonEmpty && ugi.getShortUserName 
!= user) UserGroupInformation.createProxyUser(user, ugi) else ugi
+
+    try {
+      finalUgi.doAs(new PrivilegedAction[(T, YarnClusterDescriptor)] {
+        override def run(): (T, YarnClusterDescriptor) = func()
+      })
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(s"[StreamPark] Error executing 
YarnClusterDescriptor operation as user $user", e)
+    }
   }
+
 }

Reply via email to