This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0533ec394 [Feature] User can use different hadoop-user to submit 
application (#3401)
0533ec394 is described below

commit 0533ec3949a05b1a335d1bc54bba2af10abfbdc6
Author: Kevin.Shin <[email protected]>
AuthorDate: Fri Dec 29 00:21:21 2023 +0800

    [Feature] User can use different hadoop-user to submit application (#3401)
    
    * user can use different hadoop-user to submit application
    
    * user can use different hadoop-user to submit application:do it with 
proxyuser
    
    * add miss commit file
    
    * add for pass tests
    
    * copy hadoop-user when copy application
    
    ---------
    
    Co-authored-by: shenk-b <[email protected]>
---
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |  3 ++
 .../console/core/entity/Application.java           |  3 ++
 .../impl/ApplicationActionServiceImpl.java         |  1 +
 .../impl/ApplicationManageServiceImpl.java         |  5 ++
 .../src/main/resources/db/schema-h2.sql            |  1 +
 .../src/api/flink/app.type.ts                      |  1 +
 .../src/locales/lang/en/flink/app.ts               |  1 +
 .../src/locales/lang/zh-CN/flink/app.ts            |  1 +
 .../src/views/flink/app/EditFlink.vue              |  1 +
 .../flink/app/hooks/useCreateAndEditSchema.ts      |  5 ++
 .../src/views/flink/app/utils/index.ts             |  1 +
 .../flink/client/bean/SubmitRequest.scala          |  1 +
 .../flink/client/impl/YarnApplicationClient.scala  | 57 ++++++++++++++++++++--
 13 files changed, 77 insertions(+), 4 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 79a890def..c35c13c8f 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -46,6 +46,9 @@ alter table `t_flink_sql`
 alter table `t_flink_app`
     add column `probing` tinyint default 0;
 
+alter table `t_flink_app`
+    add column `hadoop_user` varchar(64) default null;
+
 alter table `t_flink_cluster`
     add column `job_manager_url` varchar(150) default null comment 'url 
address of jobmanager' after `address`,
     add column `start_time` datetime default null comment 'start time',
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 2234fa4aa..198cfe1dc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -187,6 +187,9 @@ public class Application implements Serializable {
   @TableField("TOTAL_TM")
   private Integer totalTM;
 
+  @TableField("HADOOP_USER")
+  private String hadoopUser;
+
   private Integer totalSlot;
   private Integer availableSlot;
   private Integer jmMemory;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 17de0e116..963a92d45 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -487,6 +487,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
                 ? null
                 : FlinkRestoreMode.of(appParam.getRestoreMode()),
             applicationArgs,
+            application.getHadoopUser(),
             buildResult,
             kubernetesSubmitParam,
             extraParameter);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 2a4dd9d2e..992ece634 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -438,6 +438,7 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     newApp.setJarCheckSum(oldApp.getJarCheckSum());
     newApp.setTags(oldApp.getTags());
     newApp.setTeamId(oldApp.getTeamId());
+    newApp.setHadoopUser(oldApp.getHadoopUser());
 
     boolean saved = save(newApp);
     if (saved) {
@@ -559,7 +560,11 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     switch (appParam.getFlinkExecutionMode()) {
       case YARN_APPLICATION:
+        application.setHadoopUser(appParam.getHadoopUser());
+        break;
       case YARN_PER_JOB:
+        application.setHadoopUser(appParam.getHadoopUser());
+        break;
       case KUBERNETES_NATIVE_APPLICATION:
         application.setFlinkClusterId(null);
         break;
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 1af50f2d9..596801a9a 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -96,6 +96,7 @@ create table if not exists `t_flink_app` (
   `default_mode_ingress` text ,
   `tags` varchar(500) default null,
   `probing` tinyint default 0,
+  `hadoop_user` varchar(500) default null,
   primary key(`id`)
 );
 
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts 
b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
index 71778e6cb..7fdf36c29 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
@@ -138,6 +138,7 @@ export interface AppListRecord {
     empty: boolean;
   };
   streamParkJob: boolean;
+  hadoopUser: string;
 }
 
 interface AppControl {
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index f9fd462d6..e9b778af3 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -69,6 +69,7 @@ export default {
   status: 'Run Status',
   startTime: 'Start Time',
   endTime: 'End Time',
+  hadoopUser: 'Hadoop User',
   restoreModeTip:
     'restore mode is supported since flink 1.15, usually, you do not have to 
set this parameter',
   release: {
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 51f72ceb4..17652ec3e 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -68,6 +68,7 @@ export default {
   status: '运行状态',
   startTime: '启动时间',
   endTime: '结束时间',
+  hadoopUser: 'Hadoop User',
   restoreModeTip: 'flink 1.15开始支持restore模式,一般情况下不用设置该参数',
   release: {
     releaseTitle: '该应用程序的当前启动正在进行中.',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index d4564211e..23a6b9106 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -87,6 +87,7 @@
         args: app.args || '',
         jar: app.jar,
         description: app.description,
+        hadoopUser: app.hadoopUser,
         dynamicProperties: app.dynamicProperties,
         resolveOrder: app.resolveOrder,
         executionMode: app.executionMode,
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index ef3346031..7e693ea1b 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -488,6 +488,11 @@ export const useCreateAndEditSchema = (
         slot: 'args',
         ifShow: ({ values }) => (edit?.mode ? true : values.jobType != 
JobTypeEnum.SQL),
       },
+      {
+        field: 'hadoopUser',
+        label: t('flink.app.hadoopUser'),
+        component: 'Input'
+      },
       {
         field: 'description',
         label: t('common.description'),
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 773903379..f50c5cfac 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -288,6 +288,7 @@ export function handleSubmitParams(
     restartSize: values.restartSize,
     alertId: values.alertId,
     description: values.description,
+    hadoopUser: values.hadoopUser,
     k8sNamespace: values.k8sNamespace || null,
     clusterId: values.clusterId || null,
     flinkClusterId:
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 87ec66d21..b74ecae25 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -52,6 +52,7 @@ case class SubmitRequest(
     savePoint: String,
     restoreMode: FlinkRestoreMode,
     args: String,
+    @Nullable hadoopUser: String,
     @Nullable buildResult: BuildResult,
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f4fe45761..ec0a8091b 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -26,6 +26,7 @@ import 
org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.ClusterClient
@@ -37,6 +38,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
+import java.security.PrivilegedAction
 import java.util
 import java.util.Collections
 
@@ -140,9 +142,19 @@ object YarnApplicationClient extends YarnClientTrait {
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    SecurityUtils.install(new SecurityConfiguration(flinkConfig))
-    SecurityUtils.getInstalledContext.runSecured(
-      () => {
+    var proxyUserUgi: UserGroupInformation = 
UserGroupInformation.getCurrentUser
+    val currentUser = UserGroupInformation.getCurrentUser
+    if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
+      if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
+        proxyUserUgi = UserGroupInformation.createProxyUser(
+          submitRequest.hadoopUser,
+          currentUser
+        )
+      }
+    }
+
+    proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
+      override def run(): SubmitResponse = {
         val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
         val clientFactory =
           
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
@@ -174,7 +186,44 @@ object YarnApplicationClient extends YarnClientTrait {
         } finally {
           Utils.close(clusterDescriptor, clusterClient)
         }
-      })
+      }
+    })
+
+//    SecurityUtils.install(new SecurityConfiguration(flinkConfig))
+//    SecurityUtils.getInstalledContext.runSecured(
+//      () => {
+//        val clusterClientServiceLoader = new 
DefaultClusterClientServiceLoader
+//        val clientFactory =
+//          
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
+//        val clusterDescriptor = 
clientFactory.createClusterDescriptor(flinkConfig)
+//        var clusterClient: ClusterClient[ApplicationId] = null
+//        try {
+//          val clusterSpecification = 
clientFactory.getClusterSpecification(flinkConfig)
+//          logInfo(s"""
+//                     
|------------------------<<specification>>-------------------------
+//                     |$clusterSpecification
+//                     
|------------------------------------------------------------------
+//                     |""".stripMargin)
+//
+//          val applicationConfiguration = 
ApplicationConfiguration.fromConfiguration(flinkConfig)
+//          var applicationId: ApplicationId = null
+//          var jobManagerUrl: String = null
+//          clusterClient = clusterDescriptor
+//            .deployApplicationCluster(clusterSpecification, 
applicationConfiguration)
+//            .getClusterClient
+//          applicationId = clusterClient.getClusterId
+//          jobManagerUrl = clusterClient.getWebInterfaceURL
+//          logInfo(s"""
+//                     
|-------------------------<<applicationId>>------------------------
+//                     |Flink Job Started: applicationId: $applicationId
+//                     
|__________________________________________________________________
+//                     |""".stripMargin)
+//
+//          SubmitResponse(applicationId.toString, flinkConfig.toMap, 
jobManagerUrl = jobManagerUrl)
+//        } finally {
+//          Utils.close(clusterDescriptor, clusterClient)
+//        }
+//      })
   }
 
 }

Reply via email to