This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new a0bbc4dec [Improve] yarn client improvement
a0bbc4dec is described below

commit a0bbc4decccfd960e3a1ccd2da5612bea415040a
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 9 12:47:16 2024 +0800

    [Improve] yarn client improvement
---
 .../streampark/common/util/HadoopUtils.scala       |  1 -
 .../flink/client/trait/YarnClientTrait.scala       | 45 ++++++++++------------
 2 files changed, 20 insertions(+), 26 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index 482e1a2a2..3aba6bf8a 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -232,7 +232,6 @@ object HadoopUtils extends Logger {
 
   def yarnClient: YarnClient = {
     if (reusableYarnClient == null || 
!reusableYarnClient.isInState(STATE.STARTED)) {
-      // 使用doAs方法确保以下操作以ugi的身份执行
       reusableYarnClient = Try {
         getUgi().doAs(new PrivilegedAction[YarnClient]() {
           override def run(): YarnClient = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 0f4cf020f..3e270eeef 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.bean._
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.client.deployment.{ClusterDescriptor, 
ClusterSpecification, DefaultClusterClientServiceLoader}
+import org.apache.flink.client.deployment.ClusterSpecification
 import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.jobgraph.JobGraph
@@ -132,30 +132,22 @@ trait YarnClientTrait extends FlinkClientTrait {
    */
   private[client] def getYarnClusterDescriptor(
       flinkConfig: Configuration): (ApplicationId, YarnClusterDescriptor) = {
-    // Set up Kerberos authentication
-    val ugi = HadoopUtils.getUgi()
-
-    // Wrap the operation in ugi.doAs()
-    val result = Try {
-      ugi.doAs(new PrivilegedAction[(ApplicationId, YarnClusterDescriptor)] {
-        override def run(): (ApplicationId, YarnClusterDescriptor) = {
+    Try {
+      doAsYarnClusterDescriptor[ApplicationId](
+        () => {
           val clientFactory = new YarnClusterClientFactory
           // Get the cluster ID
           val yarnClusterId: ApplicationId = 
clientFactory.getClusterId(flinkConfig)
           require(yarnClusterId != null)
           // Create the ClusterDescriptor
           val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
-
           (yarnClusterId, clusterDescriptor)
-        }
-      })
+        })
     } match {
       case Success(result) => result
       case Failure(e) =>
         throw new IllegalArgumentException(s"[StreamPark] access 
ClusterDescriptor error: $e")
     }
-
-    result
   }
 
   /**
@@ -168,28 +160,31 @@ trait YarnClientTrait extends FlinkClientTrait {
    */
   private[client] def getYarnClusterDeployDescriptor(
       flinkConfig: Configuration): (ClusterSpecification, 
YarnClusterDescriptor) = {
-    // Set up Kerberos authentication
-    val ugi = HadoopUtils.getUgi()
-
-    // Wrap the operation in ugi.doAs()
-    val result = Try {
-      ugi.doAs(new PrivilegedAction[(ClusterSpecification, 
YarnClusterDescriptor)] {
-        override def run(): (ClusterSpecification, YarnClusterDescriptor) = {
+    Try {
+      doAsYarnClusterDescriptor[ClusterSpecification](
+        () => {
           val clientFactory = new YarnClusterClientFactory
           // Get the ClusterSpecification
           val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
           // Create the ClusterDescriptor
           val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
-
-          (clusterSpecification, clusterDescriptor)
-        }
-      })
+          clusterSpecification -> clusterDescriptor
+        })
     } match {
       case Success(result) => result
       case Failure(e) =>
         throw new IllegalArgumentException(s"[StreamPark] access 
ClusterDescriptor error: $e")
     }
+  }
 
-    result
+  private[this] def doAsYarnClusterDescriptor[T](
+      func: () => (T, YarnClusterDescriptor)): (T, YarnClusterDescriptor) = {
+    // Wrap the operation in ugi.doAs()
+    HadoopUtils
+      .getUgi()
+      .doAs(new PrivilegedAction[(T, YarnClusterDescriptor)] {
+        override def run(): (T, YarnClusterDescriptor) = func()
+      })
   }
+
 }

Reply via email to