This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new bb604e48b [Improve][Bug] dev: Kerberos authentication support in Flink
on YARN (#4005)
bb604e48b is described below
commit bb604e48b6a3bcefc362d7e87845b80b9e8eeda9
Author: TiDra <[email protected]>
AuthorDate: Fri Sep 6 23:55:48 2024 +0800
[Improve][Bug] dev: Kerberos authentication support in Flink on YARN (#4005)
* [Bug] Kerberos authentication issues may occur in Flink on YARN
* [Improve] configHolder get value bug fixed.
---------
Co-authored-by: benjobs <[email protected]>
---
.../common/conf/InternalConfigHolder.scala | 22 ++--
.../streampark/common/util/HadoopUtils.scala | 19 ++-
.../console/core/runner/EnvInitializer.java | 2 +-
.../flink/client/impl/YarnApplicationClient.scala | 89 ++++---------
.../flink/client/impl/YarnPerJobClient.scala | 30 ++---
.../flink/client/impl/YarnSessionClient.scala | 99 ++-------------
.../flink/client/trait/YarnClientTrait.scala | 137 ++++++++++++++-------
7 files changed, 171 insertions(+), 227 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index e35971ff5..977baf662 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -42,7 +42,9 @@ object InternalConfigHolder extends Logger {
new ConcurrentHashMap[String, InternalOption](initialCapacity)
/** Initialize the ConfigHub. */
- Seq(CommonConfig, K8sFlinkConfig)
+ def initConfigHub(): Unit = {
+ Seq(CommonConfig, K8sFlinkConfig)
+ }
/** Register the ConfigOption */
private[conf] def register(@Nonnull conf: InternalOption): Unit = {
@@ -65,13 +67,19 @@ object InternalConfigHolder extends Logger {
*/
@Nonnull
def get[T](@Nonnull conf: InternalOption): T = {
- confData.get(conf.key) match {
- case null =>
- SystemPropertyUtils.get(conf.key) match {
- case v if v != null => v.cast[T](conf.classType)
- case _ => conf.defaultValue.asInstanceOf[T]
+ val value = confData.get(conf.key)
+ if (value == null || value == conf.defaultValue) {
+ val v = SystemPropertyUtils.get(conf.key)
+ if (v != null) {
+ if (v != value) {
+ set(conf, v)
}
- case v => v.asInstanceOf[T]
+ v.cast[T](conf.classType)
+ } else {
+ conf.defaultValue.asInstanceOf[T]
+ }
+ } else {
+ value.toString.cast[T](conf.classType)
}
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 8ed2b807d..9262256c9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -233,10 +233,21 @@ object HadoopUtils extends Logger {
def yarnClient: YarnClient = {
if (reusableYarnClient == null ||
!reusableYarnClient.isInState(STATE.STARTED)) {
- reusableYarnClient = YarnClient.createYarnClient
- val yarnConf = new YarnConfiguration(hadoopConf)
- reusableYarnClient.init(yarnConf)
- reusableYarnClient.start()
+ reusableYarnClient = Try {
+ getUgi().doAs(new PrivilegedAction[YarnClient]() {
+ override def run(): YarnClient = {
+ val yarnConf = new YarnConfiguration(hadoopConf);
+ val client = YarnClient.createYarnClient;
+ client.init(yarnConf);
+ client.start();
+ client
+ }
+ })
+ } match {
+ case Success(client) => client
+ case Failure(e) =>
+ throw new IllegalArgumentException(s"[StreamPark] access yarnClient
error: $e")
+ }
}
reusableYarnClient
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c3a92bbbf..4e5ba38ac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -89,8 +89,8 @@ public class EnvInitializer implements ApplicationRunner {
}
private void initConfig() {
-
Environment env = context.getEnvironment();
+ InternalConfigHolder.initConfigHub();
// override config from spring application.yaml
InternalConfigHolder.keys().stream()
.filter(env::containsProperty)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index d37808d79..645c6837e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -27,17 +27,13 @@ import
org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
-import org.apache.commons.lang3.StringUtils
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration._
import org.apache.flink.python.PythonOptions
-import org.apache.flink.runtime.util.HadoopUtils
+import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.YarnConfigOptions
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
-import java.security.PrivilegedAction
import java.util
import java.util.Collections
@@ -50,18 +46,6 @@ object YarnApplicationClient extends YarnClientTrait {
override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
super.setConfig(submitRequest, flinkConfig)
- val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
- submitRequest.flinkVersion.flinkHome)
- val currentUser = UserGroupInformation.getCurrentUser
- logDebug(s"UserGroupInformation currentUser: $currentUser")
- if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
- logDebug(s"kerberos Security is Enabled...")
- val useTicketCache =
-
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
- AssertUtils.required(
- HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
- s"Hadoop security with Kerberos is enabled but the login user
$currentUser does not have Kerberos credentials or delegation tokens!")
- }
val providedLibs = {
val array = ListBuffer(
submitRequest.hdfsWorkspace.flinkLib,
@@ -140,53 +124,32 @@ object YarnApplicationClient extends YarnClientTrait {
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- var proxyUserUgi: UserGroupInformation =
UserGroupInformation.getCurrentUser
- val currentUser = UserGroupInformation.getCurrentUser
- val enableProxyState =
- !HadoopUtils.isKerberosSecurityEnabled(currentUser) && StringUtils
- .isNotEmpty(submitRequest.hadoopUser)
- if (enableProxyState) {
- proxyUserUgi = UserGroupInformation.createProxyUser(
- submitRequest.hadoopUser,
- currentUser)
- }
+ val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
getYarnClusterDeployDescriptor(flinkConfig, submitRequest.hadoopUser)
+ logInfo(s"""
+
|------------------------<<specification>>-------------------------
+ |$clusterSpecification
+
|------------------------------------------------------------------
+ |""".stripMargin)
- proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
- override def run(): SubmitResponse = {
- val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
-
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val clusterDescriptor =
- clientFactory.createClusterDescriptor(flinkConfig)
- val clusterSpecification =
- clientFactory.getClusterSpecification(flinkConfig)
- logInfo(s"""
-
|------------------------<<specification>>-------------------------
- |$clusterSpecification
-
|------------------------------------------------------------------
- |""".stripMargin)
-
- val applicationConfiguration =
- ApplicationConfiguration.fromConfiguration(flinkConfig)
- var applicationId: ApplicationId = null
- var jobManagerUrl: String = null
- val clusterClient = clusterDescriptor
- .deployApplicationCluster(clusterSpecification,
applicationConfiguration)
- .getClusterClient
- applicationId = clusterClient.getClusterId
- jobManagerUrl = clusterClient.getWebInterfaceURL
- logInfo(s"""
-
|-------------------------<<applicationId>>------------------------
- |Flink Job Started: applicationId: $applicationId
-
|__________________________________________________________________
- |""".stripMargin)
-
- val resp =
- SubmitResponse(applicationId.toString, flinkConfig.toMap,
jobManagerUrl = jobManagerUrl)
- closeSubmit(submitRequest, clusterClient, clusterDescriptor)
- resp
- }
- })
+ val applicationConfiguration =
+ ApplicationConfiguration.fromConfiguration(flinkConfig)
+ var applicationId: ApplicationId = null
+ var jobManagerUrl: String = null
+ val clusterClient = clusterDescriptor
+ .deployApplicationCluster(clusterSpecification, applicationConfiguration)
+ .getClusterClient
+ applicationId = clusterClient.getClusterId
+ jobManagerUrl = clusterClient.getWebInterfaceURL
+ logInfo(s"""
+
|-------------------------<<applicationId>>------------------------
+ |Flink Job Started: applicationId: $applicationId
+
|__________________________________________________________________
+ |""".stripMargin)
+
+ val resp =
+ SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl
= jobManagerUrl)
+ closeSubmit(submitRequest, clusterClient, clusterDescriptor)
+ resp
}
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 557bb9967..5e8d6dfd0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -22,10 +22,9 @@ import
org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.util.FlinkUtils
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.PackagedProgram
import org.apache.flink.configuration.{Configuration, DeploymentOptions}
-import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
+import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.YarnDeploymentTarget
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
import org.apache.hadoop.fs.{Path => HadoopPath}
@@ -55,28 +54,16 @@ object YarnPerJobClient extends YarnClientTrait {
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
-
val flinkHome = submitRequest.flinkVersion.flinkHome
- val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
-
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-
- val clusterDescriptor = {
- val clusterDescriptor =
- clientFactory
- .createClusterDescriptor(flinkConfig)
- .asInstanceOf[YarnClusterDescriptor]
- val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
- clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
- clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
- clusterDescriptor
- }
+ val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
+ getYarnClusterDeployDescriptor(flinkConfig, submitRequest.hadoopUser)
+ val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
+ clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
+ clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
var packagedProgram: PackagedProgram = null
val clusterClient = {
- val clusterSpecification =
- clientFactory.getClusterSpecification(flinkConfig)
logInfo(s"""
|------------------------<<specification>>-------------------------
|$clusterSpecification
@@ -119,9 +106,8 @@ object YarnPerJobClient extends YarnClientTrait {
cancelRequest: CancelRequest,
flinkConfig: Configuration): CancelResponse = {
val response = super.doCancel(cancelRequest, flinkConfig)
- val clusterClientFactory = new YarnClusterClientFactory
- val clusterDescriptor =
- clusterClientFactory.createClusterDescriptor(flinkConfig)
+ val (yarnClusterId: ApplicationId, clusterDescriptor:
YarnClusterDescriptor) =
+ getYarnClusterDescriptor(flinkConfig)
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
response
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index a5af099cd..6693997bd 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -17,20 +17,16 @@
package org.apache.streampark.flink.client.impl
-import org.apache.streampark.common.util.{AssertUtils, Utils}
import org.apache.streampark.common.util.Implicits._
+import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.JobID
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
-import org.apache.flink.runtime.util.HadoopUtils
import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.{YarnConfigOptions,
YarnDeploymentTarget}
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.{ApplicationId,
FinalApplicationStatus}
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.ConverterUtils
@@ -60,19 +56,6 @@ object YarnSessionClient extends YarnClientTrait {
* @param flinkConfig
*/
def deployClusterConfig(deployRequest: DeployRequest, flinkConfig:
Configuration): Unit = {
- val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
- deployRequest.flinkVersion.flinkHome)
- val currentUser = UserGroupInformation.getCurrentUser
- logDebug(s"UserGroupInformation currentUser: $currentUser")
- if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
- logDebug(s"kerberos Security is Enabled...")
- val useTicketCache =
-
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
- AssertUtils.required(
- HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
- s"Hadoop security with Kerberos is enabled but the login user
$currentUser does not have Kerberos credentials or delegation tokens!")
- }
-
val shipFiles = new util.ArrayList[String]()
shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
@@ -98,9 +81,7 @@ object YarnSessionClient extends YarnClientTrait {
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
- val clusterDescriptor = yarnClusterDescriptor._2
- val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
+ val (yarnClusterId: ApplicationId, clusterDescriptor:
YarnClusterDescriptor) = getYarnClusterDescriptor(flinkConfig)
val programJobGraph =
super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile)
val packageProgram = programJobGraph._1
@@ -119,59 +100,20 @@ object YarnSessionClient extends YarnClientTrait {
resp
}
- private[this] def executeClientAction[O, R <: SavepointRequestTrait](
- savepointRequestTrait: R,
- flinkConfig: Configuration,
- actFunc: (JobID, ClusterClient[_]) => O): O = {
- flinkConfig
- .safeSet(YarnConfigOptions.APPLICATION_ID,
savepointRequestTrait.clusterId)
- .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
- logInfo(s"""
-
|------------------------------------------------------------------
- |Effective submit configuration: $flinkConfig
-
|------------------------------------------------------------------
- |""".stripMargin)
-
- var clusterDescriptor: YarnClusterDescriptor = null
- var client: ClusterClient[ApplicationId] = null
- try {
- val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
- clusterDescriptor = yarnClusterDescriptor._2
- client =
clusterDescriptor.retrieve(yarnClusterDescriptor._1).getClusterClient
- actFunc(JobID.fromHexString(savepointRequestTrait.jobId), client)
- } catch {
- case e: Exception =>
- logError(s"${savepointRequestTrait.getClass.getSimpleName} for flink
yarn session job fail")
- e.printStackTrace()
- throw e
- } finally {
- Utils.close(client, clusterDescriptor)
- }
- }
-
override def doCancel(
cancelRequest: CancelRequest,
flinkConfig: Configuration): CancelResponse = {
- executeClientAction(
- cancelRequest,
- flinkConfig,
- (jobID, clusterClient) => {
- val actionResult = super.cancelJob(cancelRequest, jobID, clusterClient)
- CancelResponse(actionResult)
- })
+ flinkConfig
+ .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+ super.doCancel(cancelRequest, flinkConfig)
}
override def doTriggerSavepoint(
- savepointRequest: TriggerSavepointRequest,
+ request: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
- executeClientAction(
- savepointRequest,
- flinkConfig,
- (jobID, clusterClient) => {
- val actionResult =
- super.triggerSavepoint(savepointRequest, jobID, clusterClient)
- SavepointResponse(actionResult)
- })
+ flinkConfig
+ .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
+ super.doTriggerSavepoint(request, flinkConfig)
}
def deploy(deployRequest: DeployRequest): DeployResponse = {
@@ -191,7 +133,7 @@ object YarnSessionClient extends YarnClientTrait {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
deployClusterConfig(deployRequest, flinkConfig)
- val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
+ val yarnClusterDescriptor = getYarnClusterDeployDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
if (StringUtils.isNotBlank(deployRequest.clusterId)) {
try {
@@ -213,13 +155,12 @@ object YarnSessionClient extends YarnClientTrait {
logInfo("this applicationId have not managed by yarn ,need deploy
...")
}
}
- val clientProvider =
- clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
+ val clientProvider =
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
client = clientProvider.getClusterClient
if (client.getWebInterfaceURL != null) {
DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
} else {
- null
+ DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
}
} catch {
case e: Exception =>
@@ -243,7 +184,7 @@ object YarnSessionClient extends YarnClientTrait {
})
flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID,
shutDownRequest.clusterId)
flinkConfig.safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName)
- val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
+ val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
val shutDownState = FinalApplicationStatus.UNDEFINED.equals(
clusterDescriptor.getYarnClient
@@ -269,18 +210,4 @@ object YarnSessionClient extends YarnClientTrait {
}
}
- private[this] def getYarnSessionClusterDescriptor(
- flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
- val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
- serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
- require(yarnClusterId != null)
- val clusterDescriptor =
- clientFactory
- .createClusterDescriptor(flinkConfig)
- .asInstanceOf[YarnClusterDescriptor]
- (yarnClusterId, clusterDescriptor)
- }
-
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 853bc378b..e9c5d1b5e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,24 +17,26 @@
package org.apache.streampark.flink.client.`trait`
-import org.apache.streampark.common.util.{AssertUtils, ExceptionUtils}
+import org.apache.streampark.common.util.{ExceptionUtils, HadoopUtils}
import org.apache.streampark.common.util.Implicits._
import org.apache.streampark.flink.client.bean._
import org.apache.flink.api.common.JobID
-import org.apache.flink.client.deployment.{ClusterDescriptor,
ClusterSpecification, DefaultClusterClientServiceLoader}
+import org.apache.flink.client.deployment.ClusterSpecification
import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.util.FlinkException
import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
import java.lang.{Boolean => JavaBool}
import java.lang.reflect.Method
+import java.security.PrivilegedAction
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
/** yarn application mode submit */
trait YarnClientTrait extends FlinkClientTrait {
@@ -47,29 +49,23 @@ trait YarnClientTrait extends FlinkClientTrait {
}
private[this] def executeClientAction[R <: SavepointRequestTrait, O](
- savepointRequestTrait: R,
+ request: R,
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {
-
- flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID,
savepointRequestTrait.clusterId)
- val clusterClientFactory = new YarnClusterClientFactory
- val applicationId = clusterClientFactory.getClusterId(flinkConf)
- AssertUtils.required(
- applicationId != null,
- "[StreamPark] getClusterClient error. No cluster id was specified.
Please specify a cluster to which you would like to connect.")
-
- val clusterDescriptor =
- clusterClientFactory.createClusterDescriptor(flinkConf)
- clusterDescriptor
- .retrieve(applicationId)
- .getClusterClient
- .autoClose(client =>
- Try(actionFunc(getJobID(savepointRequestTrait.jobId), client)).recover
{
- case e =>
- throw new FlinkException(
- s"[StreamPark] Do
${savepointRequestTrait.getClass.getSimpleName} for the job
${savepointRequestTrait.jobId} failed. " +
- s"detail: ${ExceptionUtils.stringifyException(e)}");
- }.get)
+ val jobID = getJobID(request.jobId)
+ flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+ // Get the ClusterClient from the YarnClusterDescriptor
+ val (applicationId: ApplicationId, clusterDescriptor:
YarnClusterDescriptor) = getYarnClusterDescriptor(flinkConf)
+ val clusterClient =
clusterDescriptor.retrieve(applicationId).getClusterClient
+
+ Try {
+ actionFunc(jobID, clusterClient)
+ }.recover {
+ case e =>
+ throw new FlinkException(
+ s"[StreamPark] Do ${request.getClass.getSimpleName} for the job
${request.jobId} failed. " +
+ s"detail: ${ExceptionUtils.stringifyException(e)}");
+ }.get
}
override def doTriggerSavepoint(
@@ -124,27 +120,80 @@ trait YarnClientTrait extends FlinkClientTrait {
.asInstanceOf[ClusterClientProvider[ApplicationId]]
}
- private[client] def getSessionClusterDescriptor[T <:
ClusterDescriptor[ApplicationId]](
- flinkConfig: Configuration): (ApplicationId, T) = {
- val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
- serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
- require(yarnClusterId != null)
- val clusterDescriptor =
- clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
- (yarnClusterId, clusterDescriptor)
+ /**
+ * Retrieves the YarnClusterDescriptor and the application ID.
+ *
+ * @param flinkConfig
+ * the Flink configuration
+ * @return
+ * a tuple containing the application ID and the YarnClusterDescriptor
+ */
+ private[client] def getYarnClusterDescriptor(
+ flinkConfig: Configuration,
+ user: String = ""): (ApplicationId, YarnClusterDescriptor) = {
+ Try {
+ doAsYarnClusterDescriptor[ApplicationId](
+ user,
+ () => {
+ val clientFactory = new YarnClusterClientFactory
+ // Get the cluster ID
+ val yarnClusterId: ApplicationId =
clientFactory.getClusterId(flinkConfig)
+ require(yarnClusterId != null)
+ // Create the ClusterDescriptor
+ val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
+ (yarnClusterId, clusterDescriptor)
+ })
+ } match {
+ case Success(result) => result
+ case Failure(e) =>
+ throw new IllegalArgumentException(s"[StreamPark] access
ClusterDescriptor error: $e")
+ }
+ }
+
+ /**
+ * Retrieves the ClusterSpecification and the YarnClusterDescriptor for
deployment.
+ *
+ * @param flinkConfig
+ * the Flink configuration
+ * @return
+ * a tuple containing the ClusterSpecification and the
YarnClusterDescriptor
+ */
+ private[client] def getYarnClusterDeployDescriptor(
+ flinkConfig: Configuration,
+ user: String = ""): (ClusterSpecification, YarnClusterDescriptor) = {
+ Try {
+ doAsYarnClusterDescriptor[ClusterSpecification](
+ user,
+ () => {
+ val clientFactory = new YarnClusterClientFactory
+ // Get the ClusterSpecification
+ val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
+ // Create the ClusterDescriptor
+ val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
+ clusterSpecification -> clusterDescriptor
+ })
+ } match {
+ case Success(result) => result
+ case Failure(e) =>
+ throw new IllegalArgumentException(s"[StreamPark] access
ClusterDescriptor error: $e")
+ }
}
- private[client] def getSessionClusterDeployDescriptor[T <:
ClusterDescriptor[ApplicationId]](
- flinkConfig: Configuration): (ClusterSpecification, T) = {
- val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
- serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val clusterSpecification =
- clientFactory.getClusterSpecification(flinkConfig)
- val clusterDescriptor =
- clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
- (clusterSpecification, clusterDescriptor)
+ private[this] def doAsYarnClusterDescriptor[T](
+ user: String,
+ func: () => (T, YarnClusterDescriptor)): (T, YarnClusterDescriptor) = {
+ // Wrap the operation in ugi.doAs()
+ val ugi = HadoopUtils.getUgi()
+ val finalUgi = if (user != null && user.nonEmpty && ugi.getShortUserName
!= user) UserGroupInformation.createProxyUser(user, ugi) else ugi
+
+ try {
+ finalUgi.doAs(new PrivilegedAction[(T, YarnClusterDescriptor)] {
+ override def run(): (T, YarnClusterDescriptor) = func()
+ })
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(s"[StreamPark] Error executing
YarnClusterDescriptor operation as user $user", e)
+ }
}
+
}