This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new a0bbc4dec [Improve] yarn client improvement
a0bbc4dec is described below
commit a0bbc4decccfd960e3a1ccd2da5612bea415040a
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 9 12:47:16 2024 +0800
[Improve] yarn client improvement
---
.../streampark/common/util/HadoopUtils.scala | 1 -
.../flink/client/trait/YarnClientTrait.scala | 45 ++++++++++------------
2 files changed, 20 insertions(+), 26 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 482e1a2a2..3aba6bf8a 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -232,7 +232,6 @@ object HadoopUtils extends Logger {
def yarnClient: YarnClient = {
if (reusableYarnClient == null ||
!reusableYarnClient.isInState(STATE.STARTED)) {
- // 使用doAs方法确保以下操作以ugi的身份执行
reusableYarnClient = Try {
getUgi().doAs(new PrivilegedAction[YarnClient]() {
override def run(): YarnClient = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 0f4cf020f..3e270eeef 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.bean._
import org.apache.flink.api.common.JobID
-import org.apache.flink.client.deployment.{ClusterDescriptor,
ClusterSpecification, DefaultClusterClientServiceLoader}
+import org.apache.flink.client.deployment.ClusterSpecification
import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.JobGraph
@@ -132,30 +132,22 @@ trait YarnClientTrait extends FlinkClientTrait {
*/
private[client] def getYarnClusterDescriptor(
flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
- // Set up Kerberos authentication
- val ugi = HadoopUtils.getUgi()
-
- // Wrap the operation in ugi.doAs()
- val result = Try {
- ugi.doAs(new PrivilegedAction[(ApplicationId, YarnClusterDescriptor)] {
- override def run(): (ApplicationId, YarnClusterDescriptor) = {
+ Try {
+ doAsYarnClusterDescriptor[ApplicationId](
+ () => {
val clientFactory = new YarnClusterClientFactory
// Get the cluster ID
val yarnClusterId: ApplicationId =
clientFactory.getClusterId(flinkConfig)
require(yarnClusterId != null)
// Create the ClusterDescriptor
val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
-
(yarnClusterId, clusterDescriptor)
- }
- })
+ })
} match {
case Success(result) => result
case Failure(e) =>
throw new IllegalArgumentException(s"[StreamPark] access
ClusterDescriptor error: $e")
}
-
- result
}
/**
@@ -168,28 +160,31 @@ trait YarnClientTrait extends FlinkClientTrait {
*/
private[client] def getYarnClusterDeployDescriptor(
flinkConfig: Configuration): (ClusterSpecification,
YarnClusterDescriptor) = {
- // Set up Kerberos authentication
- val ugi = HadoopUtils.getUgi()
-
- // Wrap the operation in ugi.doAs()
- val result = Try {
- ugi.doAs(new PrivilegedAction[(ClusterSpecification,
YarnClusterDescriptor)] {
- override def run(): (ClusterSpecification, YarnClusterDescriptor) = {
+ Try {
+ doAsYarnClusterDescriptor[ClusterSpecification](
+ () => {
val clientFactory = new YarnClusterClientFactory
// Get the ClusterSpecification
val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
// Create the ClusterDescriptor
val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
-
- (clusterSpecification, clusterDescriptor)
- }
- })
+ clusterSpecification -> clusterDescriptor
+ })
} match {
case Success(result) => result
case Failure(e) =>
throw new IllegalArgumentException(s"[StreamPark] access
ClusterDescriptor error: $e")
}
+ }
- result
+ private[this] def doAsYarnClusterDescriptor[T](
+ func: () => (T, YarnClusterDescriptor)): (T, YarnClusterDescriptor) = {
+ // Wrap the operation in ugi.doAs()
+ HadoopUtils
+ .getUgi()
+ .doAs(new PrivilegedAction[(T, YarnClusterDescriptor)] {
+ override def run(): (T, YarnClusterDescriptor) = func()
+ })
}
+
}