This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 2deb82998 [Bug] Fix: Kerberos authentication issues may occur in Flink
on YARN (#3951)
2deb82998 is described below
commit 2deb829982a53e53b802140f3d4e39b9c94441f9
Author: TiDra <[email protected]>
AuthorDate: Fri Aug 9 11:55:39 2024 +0800
[Bug] Fix: Kerberos authentication issues may occur in Flink on YARN (#3951)
* [Bug] Fix: Kerberos authentication issues may occur in Flink on YARN mode.
* [Bug] Fix: Kerberos authentication issues may occur in Flink on YARN and
remove unused code
* change the code comments to English
---
.../streampark/common/util/HadoopUtils.scala | 20 +++-
.../flink/client/impl/YarnApplicationClient.scala | 23 +----
.../flink/client/impl/YarnPerJobClient.scala | 23 ++---
.../flink/client/impl/YarnSessionClient.scala | 43 ++-------
.../flink/client/trait/YarnClientTrait.scala | 103 +++++++++++++++------
5 files changed, 111 insertions(+), 101 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index d0417fafe..482e1a2a2 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -232,10 +232,22 @@ object HadoopUtils extends Logger {
def yarnClient: YarnClient = {
if (reusableYarnClient == null ||
!reusableYarnClient.isInState(STATE.STARTED)) {
- reusableYarnClient = YarnClient.createYarnClient
- val yarnConf = new YarnConfiguration(hadoopConf)
- reusableYarnClient.init(yarnConf)
- reusableYarnClient.start()
+ // 使用doAs方法确保以下操作以ugi的身份执行
+ reusableYarnClient = Try {
+ getUgi().doAs(new PrivilegedAction[YarnClient]() {
+ override def run(): YarnClient = {
+ val yarnConf = new YarnConfiguration(hadoopConf);
+ val client = YarnClient.createYarnClient;
+ client.init(yarnConf);
+ client.start();
+ client
+ }
+ })
+ } match {
+ case Success(client) => client
+ case Failure(e) =>
+ throw new IllegalArgumentException(s"[StreamPark] access yarnClient
error: $e")
+ }
}
reusableYarnClient
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 68571e9af..cc05b1e25 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -24,11 +24,11 @@ import
org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration._
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.util.HadoopUtils
+import org.apache.flink.yarn.YarnClusterDescriptor
import org.apache.flink.yarn.configuration.{YarnConfigOptions,
YarnDeploymentTarget}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
@@ -46,19 +46,6 @@ object YarnApplicationClient extends YarnClientTrait {
override def setConfig(submitRequest: SubmitRequest, flinkConfig:
Configuration): Unit = {
super.setConfig(submitRequest, flinkConfig)
- val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
- submitRequest.flinkVersion.flinkHome)
- val currentUser = UserGroupInformation.getCurrentUser
- logDebug(s"UserGroupInformation currentUser: $currentUser")
- if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
- logDebug(s"kerberos Security is Enabled...")
- val useTicketCache =
-
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
- if (!HadoopUtils.areKerberosCredentialsValid(currentUser,
useTicketCache)) {
- throw new RuntimeException(
- s"Hadoop security with Kerberos is enabled but the login user
$currentUser does not have Kerberos credentials or delegation tokens!")
- }
- }
val providedLibs = {
val array = ListBuffer(
submitRequest.hdfsWorkspace.flinkLib,
@@ -98,11 +85,8 @@ object YarnApplicationClient extends YarnClientTrait {
SecurityUtils.install(new SecurityConfiguration(flinkConfig))
SecurityUtils.getInstalledContext.runSecured(new Callable[SubmitResponse] {
override def call(): SubmitResponse = {
- val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
-
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-
- val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
+ val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
+ getYarnClusterDeployDescriptor(flinkConfig)
logInfo(s"""
|------------------------<<specification>>-------------------------
|$clusterSpecification
@@ -110,7 +94,6 @@ object YarnApplicationClient extends YarnClientTrait {
|""".stripMargin)
val applicationConfiguration =
ApplicationConfiguration.fromConfiguration(flinkConfig)
- val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
val clusterClient = clusterDescriptor
.deployApplicationCluster(clusterSpecification,
applicationConfiguration)
.getClusterClient
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 87123263f..75e6df2e7 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -21,7 +21,6 @@ import
org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.util.FlinkUtils
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.program.PackagedProgram
import org.apache.flink.configuration.{Configuration, DeploymentOptions}
import org.apache.flink.yarn.{YarnClusterClientFactory, YarnClusterDescriptor}
@@ -59,22 +58,14 @@ object YarnPerJobClient extends YarnClientTrait {
val flinkHome = submitRequest.flinkVersion.flinkHome
- val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
-
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
-
- val clusterDescriptor = {
- val clusterDescriptor =
-
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[YarnClusterDescriptor]
- val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
- clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
- clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
- clusterDescriptor
- }
+ val (clusterSpecification, clusterDescriptor: YarnClusterDescriptor) =
+ getYarnClusterDeployDescriptor(flinkConfig)
+ val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
+ clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
+ clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
var packagedProgram: PackagedProgram = null
val clusterClient = {
- val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
logInfo(s"""
|------------------------<<specification>>-------------------------
|$clusterSpecification
@@ -116,8 +107,8 @@ object YarnPerJobClient extends YarnClientTrait {
flinkConf
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
val response = super.doCancel(cancelRequest, flinkConf)
- val clusterClientFactory = new YarnClusterClientFactory
- val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConf)
+ val (yarnClusterId: ApplicationId, clusterDescriptor:
YarnClusterDescriptor) =
+ getYarnClusterDescriptor(flinkConf)
clusterDescriptor.killCluster(ApplicationId.fromString(cancelRequest.clusterId))
response
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index faaa6e87a..e9afbf1d6 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -22,7 +22,7 @@ import
org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
+import org.apache.flink.client.deployment.ClusterSpecification
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.runtime.util.HadoopUtils
@@ -61,21 +61,6 @@ object YarnSessionClient extends YarnClientTrait {
private def deployClusterConfig(
deployRequest: DeployRequest,
flinkConfig: Configuration): Unit = {
-
- val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
- deployRequest.flinkVersion.flinkHome)
- val currentUser = UserGroupInformation.getCurrentUser
- logDebug(s"UserGroupInformation currentUser: $currentUser")
- if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
- logDebug(s"kerberos Security is Enabled...")
- val useTicketCache =
-
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
- if (!HadoopUtils.areKerberosCredentialsValid(currentUser,
useTicketCache)) {
- throw new RuntimeException(
- s"Hadoop security with Kerberos is enabled but the login user
$currentUser does not have Kerberos credentials or delegation tokens!")
- }
- }
-
val shipFiles = new util.ArrayList[String]()
shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/lib")
shipFiles.add(s"${deployRequest.flinkVersion.flinkHome}/plugins")
@@ -105,9 +90,8 @@ object YarnSessionClient extends YarnClientTrait {
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- val yarnClusterDescriptor = getYarnSessionClusterDescriptor(flinkConfig)
- val clusterDescriptor = yarnClusterDescriptor._2
- val yarnClusterId: ApplicationId = yarnClusterDescriptor._1
+ val (yarnClusterId: ApplicationId, clusterDescriptor:
YarnClusterDescriptor) =
+ getYarnClusterDescriptor(flinkConfig)
val programJobGraph = super.getJobGraph(flinkConfig, submitRequest,
submitRequest.userJarFile)
val packageProgram = programJobGraph._1
val jobGraph = programJobGraph._2
@@ -160,7 +144,8 @@ object YarnSessionClient extends YarnClientTrait {
deployClusterConfig(deployRequest, flinkConfig)
- val yarnClusterDescriptor =
getSessionClusterDeployDescriptor(flinkConfig)
+ val yarnClusterDescriptor = getYarnClusterDeployDescriptor(flinkConfig)
+ val clusterSpecification: ClusterSpecification = yarnClusterDescriptor._1
clusterDescriptor = yarnClusterDescriptor._2
if (StringUtils.isNotBlank(deployRequest.clusterId)) {
try {
@@ -181,7 +166,7 @@ object YarnSessionClient extends YarnClientTrait {
case e: Exception => return DeployResponse(error = e)
}
}
- val clientProvider =
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
+ val clientProvider =
clusterDescriptor.deploySessionCluster(clusterSpecification)
client = clientProvider.getClusterClient
if (client.getWebInterfaceURL != null) {
DeployResponse(
@@ -210,7 +195,8 @@ object YarnSessionClient extends YarnClientTrait {
})
flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID,
shutDownRequest.clusterId)
flinkConfig.safeSet(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName)
- val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
+ val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
+ val applicationId: ApplicationId = yarnClusterDescriptor._1
clusterDescriptor = yarnClusterDescriptor._2
if (
FinalApplicationStatus.UNDEFINED.equals(
@@ -218,7 +204,7 @@ object YarnSessionClient extends YarnClientTrait {
.getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
.getFinalApplicationStatus)
) {
- val clientProvider =
clusterDescriptor.retrieve(yarnClusterDescriptor._1)
+ val clientProvider = clusterDescriptor.retrieve(applicationId)
client = clientProvider.getClusterClient
client.shutDownCluster()
}
@@ -233,15 +219,4 @@ object YarnSessionClient extends YarnClientTrait {
}
}
- private[this] def getYarnSessionClusterDescriptor(
- flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
- val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
- require(yarnClusterId != null)
- val clusterDescriptor =
-
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[YarnClusterDescriptor]
- (yarnClusterId, clusterDescriptor)
- }
-
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 4a19067e4..0f4cf020f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.client.`trait`
+import org.apache.streampark.common.util.HadoopUtils
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.bean._
@@ -32,8 +33,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId
import java.lang.{Boolean => JavaBool}
import java.lang.reflect.Method
+import java.security.PrivilegedAction
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
/** yarn application mode submit */
trait YarnClientTrait extends FlinkClientTrait {
@@ -50,17 +52,12 @@ trait YarnClientTrait extends FlinkClientTrait {
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {
val jobID = getJobID(request.jobId)
- val clusterClient = {
- flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
- val clusterClientFactory = new YarnClusterClientFactory
- val applicationId = clusterClientFactory.getClusterId(flinkConf)
- if (applicationId == null) {
- throw new FlinkException(
- "[StreamPark] getClusterClient error. No cluster id was specified.
Please specify a cluster to which you would like to connect.")
- }
- val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConf)
- clusterDescriptor.retrieve(applicationId).getClusterClient
- }
+ flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+ // Get the ClusterClient from the YarnClusterDescriptor
+ val (applicationId, clusterDescriptor: YarnClusterDescriptor) =
getYarnClusterDescriptor(
+ flinkConf)
+ val clusterClient =
clusterDescriptor.retrieve(applicationId).getClusterClient
+
Try {
actionFunc(jobID, clusterClient)
}.recover {
@@ -125,22 +122,74 @@ trait YarnClientTrait extends FlinkClientTrait {
.asInstanceOf[ClusterClientProvider[ApplicationId]]
}
- private[client] def getSessionClusterDescriptor[T <:
ClusterDescriptor[ApplicationId]](
- flinkConfig: Configuration): (ApplicationId, T) = {
- val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val yarnClusterId: ApplicationId = clientFactory.getClusterId(flinkConfig)
- require(yarnClusterId != null)
- val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
- (yarnClusterId, clusterDescriptor)
+ /**
+ * Retrieves the YarnClusterDescriptor and the application ID.
+ *
+ * @param flinkConfig
+ * the Flink configuration
+ * @return
+ * a tuple containing the application ID and the YarnClusterDescriptor
+ */
+ private[client] def getYarnClusterDescriptor(
+ flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
+ // Set up Kerberos authentication
+ val ugi = HadoopUtils.getUgi()
+
+ // Wrap the operation in ugi.doAs()
+ val result = Try {
+ ugi.doAs(new PrivilegedAction[(ApplicationId, YarnClusterDescriptor)] {
+ override def run(): (ApplicationId, YarnClusterDescriptor) = {
+ val clientFactory = new YarnClusterClientFactory
+ // Get the cluster ID
+ val yarnClusterId: ApplicationId =
clientFactory.getClusterId(flinkConfig)
+ require(yarnClusterId != null)
+ // Create the ClusterDescriptor
+ val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
+
+ (yarnClusterId, clusterDescriptor)
+ }
+ })
+ } match {
+ case Success(result) => result
+ case Failure(e) =>
+ throw new IllegalArgumentException(s"[StreamPark] access
ClusterDescriptor error: $e")
+ }
+
+ result
}
- private[client] def getSessionClusterDeployDescriptor[T <:
ClusterDescriptor[ApplicationId]](
- flinkConfig: Configuration): (ClusterSpecification, T) = {
- val serviceLoader = new DefaultClusterClientServiceLoader
- val clientFactory =
serviceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
- val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
- val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[T]
- (clusterSpecification, clusterDescriptor)
+ /**
+ * Retrieves the ClusterSpecification and the YarnClusterDescriptor for
deployment.
+ *
+ * @param flinkConfig
+ * the Flink configuration
+ * @return
+ * a tuple containing the ClusterSpecification and the
YarnClusterDescriptor
+ */
+ private[client] def getYarnClusterDeployDescriptor(
+ flinkConfig: Configuration): (ClusterSpecification,
YarnClusterDescriptor) = {
+ // Set up Kerberos authentication
+ val ugi = HadoopUtils.getUgi()
+
+ // Wrap the operation in ugi.doAs()
+ val result = Try {
+ ugi.doAs(new PrivilegedAction[(ClusterSpecification,
YarnClusterDescriptor)] {
+ override def run(): (ClusterSpecification, YarnClusterDescriptor) = {
+ val clientFactory = new YarnClusterClientFactory
+ // Get the ClusterSpecification
+ val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
+ // Create the ClusterDescriptor
+ val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
+
+ (clusterSpecification, clusterDescriptor)
+ }
+ })
+ } match {
+ case Success(result) => result
+ case Failure(e) =>
+ throw new IllegalArgumentException(s"[StreamPark] access
ClusterDescriptor error: $e")
+ }
+
+ result
}
}