This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 0533ec394 [Feature] User can use different hadoop-user to submit
application (#3401)
0533ec394 is described below
commit 0533ec3949a05b1a335d1bc54bba2af10abfbdc6
Author: Kevin.Shin <[email protected]>
AuthorDate: Fri Dec 29 00:21:21 2023 +0800
[Feature] User can use different hadoop-user to submit application (#3401)
* user can use different hadoop-user to submit application
* user can use different hadoop-user to submit application:do it with
proxyuser
* add miss commit file
* add for pass tests
* copy hadoop-user when copy application
---------
Co-authored-by: shenk-b <[email protected]>
---
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 3 ++
.../console/core/entity/Application.java | 3 ++
.../impl/ApplicationActionServiceImpl.java | 1 +
.../impl/ApplicationManageServiceImpl.java | 5 ++
.../src/main/resources/db/schema-h2.sql | 1 +
.../src/api/flink/app.type.ts | 1 +
.../src/locales/lang/en/flink/app.ts | 1 +
.../src/locales/lang/zh-CN/flink/app.ts | 1 +
.../src/views/flink/app/EditFlink.vue | 1 +
.../flink/app/hooks/useCreateAndEditSchema.ts | 5 ++
.../src/views/flink/app/utils/index.ts | 1 +
.../flink/client/bean/SubmitRequest.scala | 1 +
.../flink/client/impl/YarnApplicationClient.scala | 57 ++++++++++++++++++++--
13 files changed, 77 insertions(+), 4 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 79a890def..c35c13c8f 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -46,6 +46,9 @@ alter table `t_flink_sql`
alter table `t_flink_app`
add column `probing` tinyint default 0;
+alter table `t_flink_app`
+ add column `hadoop_user` varchar(64) default null;
+
alter table `t_flink_cluster`
add column `job_manager_url` varchar(150) default null comment 'url
address of jobmanager' after `address`,
add column `start_time` datetime default null comment 'start time',
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 2234fa4aa..198cfe1dc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -187,6 +187,9 @@ public class Application implements Serializable {
@TableField("TOTAL_TM")
private Integer totalTM;
+ @TableField("HADOOP_USER")
+ private String hadoopUser;
+
private Integer totalSlot;
private Integer availableSlot;
private Integer jmMemory;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 17de0e116..963a92d45 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -487,6 +487,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
? null
: FlinkRestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
+ application.getHadoopUser(),
buildResult,
kubernetesSubmitParam,
extraParameter);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 2a4dd9d2e..992ece634 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -438,6 +438,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
newApp.setJarCheckSum(oldApp.getJarCheckSum());
newApp.setTags(oldApp.getTags());
newApp.setTeamId(oldApp.getTeamId());
+ newApp.setHadoopUser(oldApp.getHadoopUser());
boolean saved = save(newApp);
if (saved) {
@@ -559,7 +560,11 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
switch (appParam.getFlinkExecutionMode()) {
case YARN_APPLICATION:
+ application.setHadoopUser(appParam.getHadoopUser());
+ break;
case YARN_PER_JOB:
+ application.setHadoopUser(appParam.getHadoopUser());
+ break;
case KUBERNETES_NATIVE_APPLICATION:
application.setFlinkClusterId(null);
break;
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 1af50f2d9..596801a9a 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -96,6 +96,7 @@ create table if not exists `t_flink_app` (
`default_mode_ingress` text ,
`tags` varchar(500) default null,
`probing` tinyint default 0,
+ `hadoop_user` varchar(500) default null,
primary key(`id`)
);
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
index 71778e6cb..7fdf36c29 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
@@ -138,6 +138,7 @@ export interface AppListRecord {
empty: boolean;
};
streamParkJob: boolean;
+ hadoopUser: string;
}
interface AppControl {
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index f9fd462d6..e9b778af3 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -69,6 +69,7 @@ export default {
status: 'Run Status',
startTime: 'Start Time',
endTime: 'End Time',
+ hadoopUser: 'Hadoop User',
restoreModeTip:
'restore mode is supported since flink 1.15, usually, you do not have to
set this parameter',
release: {
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 51f72ceb4..17652ec3e 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -68,6 +68,7 @@ export default {
status: '运行状态',
startTime: '启动时间',
endTime: '结束时间',
+ hadoopUser: 'Hadoop User',
restoreModeTip: 'flink 1.15开始支持restore模式,一般情况下不用设置该参数',
release: {
releaseTitle: '该应用程序的当前启动正在进行中.',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index d4564211e..23a6b9106 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -87,6 +87,7 @@
args: app.args || '',
jar: app.jar,
description: app.description,
+ hadoopUser: app.hadoopUser,
dynamicProperties: app.dynamicProperties,
resolveOrder: app.resolveOrder,
executionMode: app.executionMode,
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index ef3346031..7e693ea1b 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -488,6 +488,11 @@ export const useCreateAndEditSchema = (
slot: 'args',
ifShow: ({ values }) => (edit?.mode ? true : values.jobType !=
JobTypeEnum.SQL),
},
+ {
+ field: 'hadoopUser',
+ label: t('flink.app.hadoopUser'),
+ component: 'Input'
+ },
{
field: 'description',
label: t('common.description'),
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 773903379..f50c5cfac 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -288,6 +288,7 @@ export function handleSubmitParams(
restartSize: values.restartSize,
alertId: values.alertId,
description: values.description,
+ hadoopUser: values.hadoopUser,
k8sNamespace: values.k8sNamespace || null,
clusterId: values.clusterId || null,
flinkClusterId:
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 87ec66d21..b74ecae25 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -52,6 +52,7 @@ case class SubmitRequest(
savePoint: String,
restoreMode: FlinkRestoreMode,
args: String,
+ @Nullable hadoopUser: String,
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f4fe45761..ec0a8091b 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -26,6 +26,7 @@ import
org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
@@ -37,6 +38,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
+import java.security.PrivilegedAction
import java.util
import java.util.Collections
@@ -140,9 +142,19 @@ object YarnApplicationClient extends YarnClientTrait {
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- SecurityUtils.install(new SecurityConfiguration(flinkConfig))
- SecurityUtils.getInstalledContext.runSecured(
- () => {
+ var proxyUserUgi: UserGroupInformation =
UserGroupInformation.getCurrentUser
+ val currentUser = UserGroupInformation.getCurrentUser
+ if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
+ if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
+ proxyUserUgi = UserGroupInformation.createProxyUser(
+ submitRequest.hadoopUser,
+ currentUser
+ )
+ }
+ }
+
+ proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
+ override def run(): SubmitResponse = {
val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
val clientFactory =
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
@@ -174,7 +186,44 @@ object YarnApplicationClient extends YarnClientTrait {
} finally {
Utils.close(clusterDescriptor, clusterClient)
}
- })
+ }
+ })
+
+// SecurityUtils.install(new SecurityConfiguration(flinkConfig))
+// SecurityUtils.getInstalledContext.runSecured(
+// () => {
+// val clusterClientServiceLoader = new
DefaultClusterClientServiceLoader
+// val clientFactory =
+//
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
+// val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig)
+// var clusterClient: ClusterClient[ApplicationId] = null
+// try {
+// val clusterSpecification =
clientFactory.getClusterSpecification(flinkConfig)
+// logInfo(s"""
+//
|------------------------<<specification>>-------------------------
+// |$clusterSpecification
+//
|------------------------------------------------------------------
+// |""".stripMargin)
+//
+// val applicationConfiguration =
ApplicationConfiguration.fromConfiguration(flinkConfig)
+// var applicationId: ApplicationId = null
+// var jobManagerUrl: String = null
+// clusterClient = clusterDescriptor
+// .deployApplicationCluster(clusterSpecification,
applicationConfiguration)
+// .getClusterClient
+// applicationId = clusterClient.getClusterId
+// jobManagerUrl = clusterClient.getWebInterfaceURL
+// logInfo(s"""
+//
|-------------------------<<applicationId>>------------------------
+// |Flink Job Started: applicationId: $applicationId
+//
|__________________________________________________________________
+// |""".stripMargin)
+//
+// SubmitResponse(applicationId.toString, flinkConfig.toMap,
jobManagerUrl = jobManagerUrl)
+// } finally {
+// Utils.close(clusterDescriptor, clusterClient)
+// }
+// })
}
}