This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b85b5d4e3 [Feature] Support Packaging Spark Docker Images (#4133)
b85b5d4e3 is described below
commit b85b5d4e3bbe1096ec552e102809f10d57d9a5ad
Author: lenoxzhao <[email protected]>
AuthorDate: Fri Nov 29 10:42:36 2024 +0800
[Feature] Support Packaging Spark Docker Images (#4133)
* feat: support spark docker package
* feature: support spark docker packaging
* feature: update h2 schema
---
.../streampark/common/enums/SparkDeployMode.java | 7 +-
.../src/main/assembly/script/data/mysql-data.sql | 23 +-
.../main/assembly/script/schema/mysql-schema.sql | 20 +-
.../console/core/entity/SparkApplication.java | 17 ++
.../SparkApplicationBuildPipelineServiceImpl.java | 43 ++++
.../impl/SparkApplicationManageServiceImpl.java | 3 +
.../core/service/impl/ResourceServiceImpl.java | 1 +
.../src/main/resources/db/schema-h2.sql | 3 +
.../src/locales/lang/zh-CN/spark/app.ts | 4 +-
.../flink/kubernetes/PodTemplateTool.scala | 39 ++++
.../kubernetes/model/SparkK8sPodTemplates.scala | 65 ++++++
.../flink/packer/pipeline/PipelineTypeEnum.java | 16 +-
.../packer/docker/SparkDockerfileTemplate.scala | 48 ++++
.../docker/SparkDockerfileTemplateTrait.scala | 121 ++++++++++
.../docker/SparkHadoopDockerfileTemplate.scala | 128 +++++++++++
.../flink/packer/pipeline/BuildRequest.scala | 22 +-
.../impl/SparkK8sApplicationBuildPipeline.scala | 249 +++++++++++++++++++++
.../streampark/spark/client/SparkClient.scala | 1 -
18 files changed, 787 insertions(+), 23 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
index 785d77693..a45b3a523 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
@@ -36,7 +36,12 @@ public enum SparkDeployMode {
YARN_CLUSTER(2, "yarn-cluster"),
/** yarn client */
- YARN_CLIENT(3, "yarn-client");
+ YARN_CLIENT(3, "yarn-client"),
+
+ /** yarn-cluster mode */
+ KUBERNETES_NATIVE_CLUSTER(4, "kubernetes-native-cluster"),
+
+ KUBERNETES_NATIVE_CLIENT(5, "kubernetes-native-client");
private final Integer mode;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index c63ae9771..ecf0edd86 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -26,6 +26,10 @@ set foreign_key_checks = 0;
-- ----------------------------
insert into `t_team` values (100000, 'default', null, now(), now());
+-- ----------------------------
+-- Records of flink-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`) VALUES
(100000, 1, now(), now());
-- ----------------------------
-- Records of t_flink_app
@@ -51,6 +55,11 @@ insert into `t_flink_project` values (100000, 100000,
'streampark-quickstart', '
-- ----------------------------
insert into `t_flink_sql` values (100000, 100000,
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
null, null, 1, 1, now());
+-- ----------------------------
+-- Records of spark-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`) VALUES
(100001, 2, now(), now());
+
-- ----------------------------
-- Records of t_spark_app
-- ----------------------------
@@ -58,17 +67,17 @@ insert into `t_spark_app` (
`id`, `team_id`, `job_type`, `app_type`, `app_name`, `deploy_mode`,
`resource_from`, `main_class`,
`yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`,
`option_state`, `user_id`,
`description`, `tracking`, `release`, `build`, `create_time`,
`modify_time`, `tags`)
-values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+values (100001, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
-- ----------------------------
-- Records of t_spark_effective
-- ----------------------------
-insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+insert into `t_spark_effective` values (100000, 100001, 4, 100000, now());
-- ----------------------------
-- Records of t_spark_sql
-- ----------------------------
-insert into `t_spark_sql` values (100000, 100000,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
+insert into `t_spark_sql` values (100000, 100001,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
-- ----------------------------
@@ -315,10 +324,10 @@ insert into `t_role_menu` values (100102, 100002, 140401);
insert into `t_role_menu` values (100103, 100002, 140402);
insert into `t_role_menu` values (100104, 100002, 140403);
insert into `t_role_menu` values (100105, 100002, 150000);
-insert into `t_role_menu` values (100107, 100002, 150601);
-insert into `t_role_menu` values (100108, 100002, 150602);
-insert into `t_role_menu` values (100109, 100002, 150603);
-insert into `t_role_menu` values (100110, 100002, 150604);
+insert into `t_role_menu` values (100107, 100002, 150501);
+insert into `t_role_menu` values (100108, 100002, 150502);
+insert into `t_role_menu` values (100109, 100002, 150503);
+insert into `t_role_menu` values (100110, 100002, 150504);
insert into `t_role_menu` values (100111, 100002, 150601);
insert into `t_role_menu` values (100112, 100002, 150602);
insert into `t_role_menu` values (100113, 100002, 150603);
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index b76f90b5c..a0885a319 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -24,13 +24,14 @@ set foreign_key_checks = 0;
-- ----------------------------
-- Table structure for t_app
-- ----------------------------
-create table if not exists `t_app` (
-`id` bigint not null,
-`job_type` tinyint default null,
-`create_time` datetime default null comment 'create time',
-`modify_time` datetime default null comment 'modify time',
-primary key(`id`)
-);
+drop table if exists `t_app`;
+create table `t_app` (
+ `id` bigint not null auto_increment,
+ `job_type` tinyint default null,
+ `create_time` datetime default null comment 'create time',
+ `modify_time` datetime default null comment 'modify time',
+ primary key(`id`)
+) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
-- ----------------------------
-- Table structure for t_flink_app
@@ -605,13 +606,16 @@ create table `t_spark_app` (
`jar_check_sum` bigint default null,
`app_properties` text collate utf8mb4_general_ci comment 'Arbitrary Spark
configuration property in key=value format (e.g. spark.driver.cores=1)',
`app_args` text collate utf8mb4_general_ci comment 'Arguments passed to the
main method of your main class',
- `app_id` varchar(64) collate utf8mb4_general_ci default null comment
'(1)application_id on yarn(2)driver_pod_name on k8s',
+ `cluster_id` varchar(64) collate utf8mb4_general_ci default null comment
'(1)application_id on yarn(2)driver_pod_name on k8s',
`yarn_queue` varchar(128) collate utf8mb4_general_ci default null,
`k8s_master_url` varchar(128) collate utf8mb4_general_ci default null,
`k8s_container_image` varchar(128) collate utf8mb4_general_ci default null,
`k8s_image_pull_policy` tinyint default 1,
`k8s_service_account` varchar(64) collate utf8mb4_general_ci default null,
`k8s_namespace` varchar(64) collate utf8mb4_general_ci default null,
+ `k8s_driver_pod_template` text collate utf8mb4_general_ci,
+ `k8s_executor_pod_template` text collate utf8mb4_general_ci,
+ `k8s_hadoop_integration` tinyint default 0,
`hadoop_user` varchar(64) collate utf8mb4_general_ci default null,
`restart_size` int default null,
`restart_count` int default null,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index fe4c53b8b..56894baa9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -35,6 +35,7 @@ import
org.apache.streampark.console.core.enums.SparkAppStateEnum;
import
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
import org.apache.streampark.console.core.util.YarnQueueLabelExpression;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
+import org.apache.streampark.spark.kubernetes.model.SparkK8sPodTemplates;
import org.apache.commons.lang3.StringUtils;
@@ -139,6 +140,14 @@ public class SparkApplication extends BaseEntity {
/** k8s namespace */
private String k8sNamespace = Constants.DEFAULT;
+ /** spark kubernetes pod template */
+ private String k8sDriverPodTemplate;
+
+ private String k8sExecutorPodTemplate;
+
+ /** spark-hadoop integration on spark-k8s mode */
+ private Boolean k8sHadoopIntegration;
+
@TableField("HADOOP_USER")
private String hadoopUser;
@@ -356,6 +365,8 @@ public class SparkApplication extends BaseEntity {
switch (this.getDeployModeEnum()) {
case REMOTE:
case LOCAL:
+ case KUBERNETES_NATIVE_CLIENT:
+ case KUBERNETES_NATIVE_CLUSTER:
return getLocalAppHome();
case YARN_CLIENT:
case YARN_CLUSTER:
@@ -376,6 +387,10 @@ public class SparkApplication extends BaseEntity {
return ApplicationType.of(appType);
}
+ public SparkK8sPodTemplates getK8sPodTemplates() {
+ return SparkK8sPodTemplates.of(k8sDriverPodTemplate,
k8sExecutorPodTemplate);
+ }
+
@JsonIgnore
@SneakyThrows
@SuppressWarnings("unchecked")
@@ -466,6 +481,8 @@ public class SparkApplication extends BaseEntity {
case YARN_CLUSTER:
case YARN_CLIENT:
return StorageType.HDFS;
+ case KUBERNETES_NATIVE_CLUSTER:
+ case KUBERNETES_NATIVE_CLIENT:
case REMOTE:
return StorageType.LFS;
default:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
index 2642dd94c..fb7221274 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
@@ -29,6 +29,7 @@ import
org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.Dependency;
+import org.apache.streampark.console.core.bean.DockerConfig;
import org.apache.streampark.console.core.entity.ApplicationBuildPipeline;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.Message;
@@ -46,6 +47,7 @@ import
org.apache.streampark.console.core.enums.ResourceTypeEnum;
import
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
import org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
import
org.apache.streampark.console.core.service.application.ApplicationLogService;
@@ -55,6 +57,7 @@ import
org.apache.streampark.console.core.service.application.SparkApplicationIn
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
+import org.apache.streampark.flink.packer.docker.DockerConf;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.DependencyInfo;
import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
@@ -62,7 +65,9 @@ import
org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.PipeWatcher;
import org.apache.streampark.flink.packer.pipeline.PipelineSnapshot;
import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
+import
org.apache.streampark.flink.packer.pipeline.SparkK8sApplicationBuildRequest;
import org.apache.streampark.flink.packer.pipeline.SparkYarnBuildRequest;
+import
org.apache.streampark.flink.packer.pipeline.impl.SparkK8sApplicationBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.SparkYarnBuildPipeline;
import org.apache.commons.collections.CollectionUtils;
@@ -111,6 +116,9 @@ public class SparkApplicationBuildPipelineServiceImpl
@Autowired
private MessageService messageService;
+ @Autowired
+ private SettingService settingService;
+
@Autowired
private ApplicationLogService applicationLogService;
@@ -394,12 +402,47 @@ public class SparkApplicationBuildPipelineServiceImpl
getMergedDependencyInfo(app));
log.info("Submit params to building pipeline : {}",
yarnAppRequest);
return SparkYarnBuildPipeline.of(yarnAppRequest);
+ case KUBERNETES_NATIVE_CLUSTER:
+ case KUBERNETES_NATIVE_CLIENT:
+ DockerConfig dockerConfig = settingService.getDockerConfig();
+ SparkK8sApplicationBuildRequest k8sApplicationBuildRequest =
buildSparkK8sApplicationBuildRequest(
+ app, mainClass, sparkUserJar, sparkEnv, dockerConfig);
+ log.info("Submit params to building pipeline : {}",
k8sApplicationBuildRequest);
+ return
SparkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for DeployMode: " +
app.getDeployModeEnum());
}
}
+ @Nonnull
+ private SparkK8sApplicationBuildRequest
buildSparkK8sApplicationBuildRequest(
+
@Nonnull SparkApplication app,
+
String mainClass,
+
String mainJar,
+
SparkEnv sparkEnv,
+
DockerConfig dockerConfig) {
+ SparkK8sApplicationBuildRequest k8sApplicationBuildRequest = new
SparkK8sApplicationBuildRequest(
+ app.getAppName(),
+ app.getAppHome(),
+ mainClass,
+ mainJar,
+ app.getDeployModeEnum(),
+ app.getJobTypeEnum(),
+ sparkEnv.getSparkVersion(),
+ getMergedDependencyInfo(app),
+ app.getK8sNamespace(),
+ app.getK8sContainerImage(),
+ app.getK8sPodTemplates(),
+ app.getK8sHadoopIntegration() != null ?
app.getK8sHadoopIntegration() : false,
+ DockerConf.of(
+ dockerConfig.getAddress(),
+ dockerConfig.getNamespace(),
+ dockerConfig.getUsername(),
+ dockerConfig.getPassword()));
+ return k8sApplicationBuildRequest;
+ }
+
private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication
app) {
switch (app.getJobTypeEnum()) {
case SPARK_JAR:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index a998f447c..4ec68298b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -345,6 +345,9 @@ public class SparkApplicationManageServiceImpl
newApp.setK8sImagePullPolicy(oldApp.getK8sImagePullPolicy());
newApp.setK8sServiceAccount(oldApp.getK8sServiceAccount());
newApp.setK8sNamespace(oldApp.getK8sNamespace());
+ newApp.setK8sDriverPodTemplate(oldApp.getK8sDriverPodTemplate());
+ newApp.setK8sExecutorPodTemplate(oldApp.getK8sExecutorPodTemplate());
+ newApp.setK8sHadoopIntegration(oldApp.getK8sHadoopIntegration());
newApp.setHadoopUser(oldApp.getHadoopUser());
newApp.setRestartSize(oldApp.getRestartSize());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 4ca4d3514..2972c2f8c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -215,6 +215,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
if (resource.getResourceType() == ResourceTypeEnum.APP) {
findResource.setMainClass(resource.getMainClass());
}
+ findResource.setResourceType(resource.getResourceType());
findResource.setDescription(resource.getDescription());
baseMapper.updateById(findResource);
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index cc44fe197..d8f1b64fe 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -577,6 +577,9 @@ create table if not exists `t_spark_app` (
`k8s_image_pull_policy` tinyint default 1,
`k8s_service_account` varchar(64) default null,
`k8s_namespace` varchar(64) default null,
+ `k8s_driver_pod_template` text,
+ `k8s_executor_pod_template` text,
+ `k8s_hadoop_integration` tinyint default 0,
`hadoop_user` varchar(64) default null,
`restart_size` int default null,
`restart_count` int default null,
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
index 2c0a1381d..c0d35dc0c 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
@@ -19,8 +19,8 @@ export default {
runningTasks: '运行中的作业',
totalTask: 'Task 总数',
totalStage: 'Stage 总数',
- completedTask: '已进完成的作业数',
- completedStage: '完成的 Stage 总数',
+ completedTask: '已完成 Task 总数',
+ completedStage: '已完成 Stage 总数',
memory: '占用的总内存',
VCore: '占用的总 VCores',
},
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
index 31c743212..6da416d8c 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
@@ -18,6 +18,7 @@
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
+import org.apache.streampark.spark.kubernetes.model.SparkK8sPodTemplates
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
@@ -38,6 +39,12 @@ object PodTemplateTool {
val KUBERNETES_TM_POD_TEMPLATE: PodTemplateType =
PodTemplateType("kubernetes.pod-template-file.taskmanager",
"tm-pod-template.yaml")
+ val KUBERNETES_DRIVER_POD_TEMPLATE: PodTemplateType =
+ PodTemplateType("spark.kubernetes.driver.podTemplateFile",
"driver-pod-template.yaml")
+
+ val KUBERNETES_EXECUTOR_POD_TEMPLATE: PodTemplateType =
+ PodTemplateType("spark.kubernetes.executor.podTemplateFile",
"executor-pod-template.yaml")
+
/**
* Prepare kubernetes pod template file to buildWorkspace direactory.
*
@@ -72,6 +79,38 @@ object PodTemplateTool {
K8sPodTemplateFiles(podTempleMap.toMap)
}
+ /**
+ * Prepare kubernetes pod template file to buildWorkspace direactory.
+ *
+ * @param buildWorkspace
+ * project workspace dir of spark job
+ * @param podTemplates
+ * spark kubernetes pod templates
+ * @return
+ * Map[k8s pod template option, template file output path]
+ */
+ def preparePodTemplateFiles(
+ buildWorkspace: String,
+ podTemplates: SparkK8sPodTemplates): K8sPodTemplateFiles = {
+ val workspaceDir = new File(buildWorkspace)
+ if (!workspaceDir.exists()) {
+ workspaceDir.mkdir()
+ }
+
+ val podTempleMap = mutable.Map[String, String]()
+ val outputTmplContent = (tmplContent: String, podTmpl: PodTemplateType) =>
{
+ if (StringUtils.isNotBlank(tmplContent)) {
+ val outputPath = s"$buildWorkspace/${podTmpl.fileName}"
+ val outputFile = new File(outputPath)
+ FileUtils.write(outputFile, tmplContent, "UTF-8")
+ podTempleMap += (podTmpl.key -> outputPath)
+ }
+ }
+
+ outputTmplContent(podTemplates.driverPodTemplate,
KUBERNETES_DRIVER_POD_TEMPLATE)
+ outputTmplContent(podTemplates.executorPodTemplate,
KUBERNETES_EXECUTOR_POD_TEMPLATE)
+ K8sPodTemplateFiles(podTempleMap.toMap)
+ }
}
/**
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/SparkK8sPodTemplates.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/SparkK8sPodTemplates.scala
new file mode 100644
index 000000000..81d61440f
--- /dev/null
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/SparkK8sPodTemplates.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.spark.kubernetes.model
+
+import org.apache.streampark.common.util.Utils
+
+import scala.util.Try
+
+/** Pod template for Spark k8s cluster */
+case class SparkK8sPodTemplates(
+ driverPodTemplate: String = "",
+ executorPodTemplate: String = "") {
+
+ def nonEmpty: Boolean = Option(driverPodTemplate).exists(_.trim.nonEmpty) ||
+ Option(executorPodTemplate).exists(_.trim.nonEmpty)
+
+ def isEmpty: Boolean = !nonEmpty
+
+ override def hashCode(): Int =
+ Utils.hashCode(driverPodTemplate, executorPodTemplate)
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case that: SparkK8sPodTemplates =>
+ Try(driverPodTemplate.trim).getOrElse("") ==
Try(that.driverPodTemplate.trim)
+ .getOrElse("") &&
+ Try(executorPodTemplate.trim).getOrElse("") ==
Try(that.executorPodTemplate.trim)
+ .getOrElse("")
+ case _ => false
+ }
+ }
+
+}
+
+object SparkK8sPodTemplates {
+
+ def empty: SparkK8sPodTemplates = new SparkK8sPodTemplates()
+
+ def of(driverPodTemplate: String, executorPodTemplate: String):
SparkK8sPodTemplates =
+ SparkK8sPodTemplates(safeGet(driverPodTemplate),
safeGet(executorPodTemplate))
+
+ private[this] def safeGet(content: String): String = {
+ content match {
+ case null => ""
+ case x if x.trim.isEmpty => ""
+ case x => x
+ }
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
index 4eb9786b3..025412ca9 100644
---
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
+++
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
@@ -89,7 +89,21 @@ public enum PipelineTypeEnum {
.put(2, "Resolve maven dependencies")
.put(3, "upload jar to yarn.provided.lib.dirs")
.build(),
- SimpleBuildResponse.class);
+ SimpleBuildResponse.class),
+
+ SPARK_NATIVE_K8S_APPLICATION(
+ 7,
+ "spark native kubernetes application mode task building pipeline",
+ ImmutableMap.<Integer, String>builder()
+ .put(1, "Create building workspace")
+ .put(2, "Export kubernetes pod template")
+ .put(3, "Prepare spark job jar")
+ .put(4, "Export spark app dockerfile")
+ .put(5, "Pull spark app base docker image")
+ .put(6, "Build spark app docker image")
+ .put(7, "Push spark app docker image")
+ .build(),
+ DockerImageBuildResponse.class);
private final Integer code;
/** short description of pipeline type. */
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplate.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplate.scala
new file mode 100644
index 000000000..c4f9b14a6
--- /dev/null
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplate.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+/**
+ * Base spark docker file image template.
+ *
+ * @param workspacePath
+ * Path of dockerfile workspace, it should be a directory.
+ * @param sparkBaseImage
+ * Spark base docker image name, see https://hub.docker.com/r/apache/spark.
+ * @param sparkMainJarPath
+ * Path of spark job main jar which would copy to $SPARK_HOME/usrlib/
+ * @param sparkExtraLibPaths
+ * Path of additional spark lib path which would copy to $SPARK_HOME/lib/
+ */
+case class SparkDockerfileTemplate(
+ workspacePath: String,
+ sparkBaseImage: String,
+ sparkMainJarPath: String,
+ sparkExtraLibPaths: Set[String])
+ extends SparkDockerfileTemplateTrait {
+
+ /** offer content of DockerFile */
+ override def offerDockerfileContent: String = {
+ s"""FROM $sparkBaseImage
+ |USER root
+ |RUN mkdir -p $SPARK_HOME/usrlib
+ |COPY $mainJarName $SPARK_HOME/usrlib/$mainJarName
+ |COPY $extraLibName $SPARK_HOME/lib/
+ |""".stripMargin
+ }
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplateTrait.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplateTrait.scala
new file mode 100644
index 000000000..729aca65b
--- /dev/null
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplateTrait.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+import org.apache.streampark.common.constants.Constants
+import org.apache.streampark.common.fs.LfsOperator
+
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+
+/** Spark image dockerfile template. */
+trait SparkDockerfileTemplateTrait {
+
+ /** Path of dockerfile workspace, it should be a directory. */
+ def workspacePath: String
+
+ /** Spark base docker image name, see https://hub.docker.com/r/apache/spark
*/
+ def sparkBaseImage: String
+
+ /** Path of spark job main jar which would copy to $SPARK_HOME/usrlib/ */
+ def sparkMainJarPath: String
+
+ /** Path of additional spark lib path which would copy to $SPARK_HOME/lib/ */
+ def sparkExtraLibPaths: Set[String]
+
+ /** Offer content of DockerFile. */
+ def offerDockerfileContent: String
+
+ /** Startup spark main jar path inner Docker */
+ def innerMainJarPath: String = s"local:///opt/spark/usrlib/$mainJarName"
+
+ /** output dockerfile name */
+ protected val DEFAULT_DOCKER_FILE_NAME = "Dockerfile"
+ protected val SPARK_LIB_PATH = "lib"
+ protected val SPARK_HOME: String = "$SPARK_HOME"
+
+ /** Dockerfile building workspace. */
+ lazy val workspace: Path = {
+ val path = Paths.get(workspacePath).toAbsolutePath
+ if (!LfsOperator.exists(workspacePath)) LfsOperator.mkdirs(workspacePath)
+ path
+ }
+
+ /**
+ * spark main jar name, the main jar would copy from `sparkMainjarPath` to
+ * `workspacePath/mainJarName.jar`.
+ */
+ lazy val mainJarName: String = {
+ val mainJarPath = Paths.get(sparkMainJarPath).toAbsolutePath
+ if (mainJarPath.getParent != workspace) {
+ LfsOperator.copy(
+ mainJarPath.toString,
+ s"${workspace.toString}/${mainJarPath.getFileName.toString}")
+ }
+ mainJarPath.getFileName.toString
+ }
+
+ /**
+ * spark extra jar lib, the jar file in `sparkExtraLibPaths` would be copyed
into
+ * `SPARK_LIB_PATH`.
+ */
+ lazy val extraLibName: String = {
+ LfsOperator.mkCleanDirs(s"${workspace.toString}/$SPARK_LIB_PATH")
+ sparkExtraLibPaths
+ .map(new File(_))
+ .filter(_.exists())
+ .filter(_.getName.endsWith(Constants.JAR_SUFFIX))
+ .flatMap {
+ case f if f.isDirectory =>
+ f.listFiles
+ .filter(_.isFile)
+ .filter(_.getName.endsWith(Constants.JAR_SUFFIX))
+ .map(_.getAbsolutePath)
+ case f if f.isFile => Array(f.getAbsolutePath)
+ }
+ .foreach(LfsOperator.copy(_, s"${workspace.toString}/$SPARK_LIB_PATH"))
+ SPARK_LIB_PATH
+ }
+
+ /**
+ * write content of DockerFile to outputPath, the output dockerfile name is
"dockerfile".
+ *
+ * @return
+ * File Object for actual output Dockerfile
+ */
+ def writeDockerfile: File = {
+ val output = new File(s"$workspacePath/$DEFAULT_DOCKER_FILE_NAME")
+ FileUtils.write(output, offerDockerfileContent, "UTF-8")
+ output
+ }
+
+ /**
+ * write content of DockerFile to outputPath using specified output
dockerfile name.
+ *
+ * @return
+ * File Object for actual output Dockerfile
+ */
+ def writeDockerfile(dockerfileName: String): File = {
+ val output = new File(s"$workspacePath/$dockerfileName")
+ FileUtils.write(output, offerDockerfileContent, "UTF-8")
+ output
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkHadoopDockerfileTemplate.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkHadoopDockerfileTemplate.scala
new file mode 100644
index 000000000..9bc3d0f84
--- /dev/null
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkHadoopDockerfileTemplate.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.util.HadoopConfigUtils
+
+import javax.annotation.Nullable
+
+import java.nio.file.Paths
+
+/**
+ * flink-hadoop integration docker image template.
+ *
+ * @param workspacePath
+ * Path of dockerfile workspace, it should be a directory.
+ * @param sparkBaseImage
+ * Flink base docker image name, see https://hub.docker.com/_/flink.
+ * @param sparkMainJarPath
+ * Path of spark job main jar which would copy to $FLINK_HOME/usrlib/
+ * @param sparkExtraLibPaths
+ * Path of additional flink lib path which would copy to $FLINK_HOME/lib/
+ * @param hadoopConfDirPath
+ * Path of hadoop conf directory.
+ * @param hiveConfDirPath
+ * Path of hive conf directory.
+ */
+case class SparkHadoopDockerfileTemplate(
+ workspacePath: String,
+ sparkBaseImage: String,
+ sparkMainJarPath: String,
+ sparkExtraLibPaths: Set[String],
+ @Nullable hadoopConfDirPath: String,
+ @Nullable hiveConfDirPath: String)
+ extends SparkDockerfileTemplateTrait {
+
+ val hadoopConfDir: String =
+ workspace
+ .relativize(Paths.get(Option(hadoopConfDirPath).getOrElse("")))
+ .toString
+
+ val hiveConfDir: String =
+ workspace
+ .relativize(Paths.get(Option(hiveConfDirPath).getOrElse("")))
+ .toString
+
+ /** offer content of DockerFile */
+ override def offerDockerfileContent: String = {
+ var dockerfile =
+ s"""FROM $sparkBaseImage
+ |RUN mkdir -p $SPARK_HOME/usrlib
+ |""".stripMargin
+ if (hadoopConfDir.nonEmpty) {
+ dockerfile +=
+ s"""
+ |COPY $hadoopConfDir /opt/hadoop-conf
+ |ENV HADOOP_CONF_DIR /opt/hadoop-conf
+ |""".stripMargin
+ }
+ if (hiveConfDir.nonEmpty) {
+ dockerfile +=
+ s"""
+ |COPY $hiveConfDir /opt/hive-conf
+ |ENV HIVE_CONF_DIR /opt/hive-conf
+ |""".stripMargin
+ }
+ dockerfile +=
+ s"""
+ |COPY $extraLibName $SPARK_HOME/lib/
+ |COPY $mainJarName $SPARK_HOME/usrlib/$mainJarName
+ |""".stripMargin
+ dockerfile
+ }
+
+}
+
+object SparkHadoopDockerfileTemplate {
+
+ /** Use relevant system variables as the value of hadoopConfDirPath,
hiveConfDirPath. */
+ def fromSystemHadoopConf(
+ workspacePath: String,
+ sparkBaseImage: String,
+ sparkMainJarPath: String,
+ sparkExtraLibPaths: Set[String]): SparkHadoopDockerfileTemplate = {
+ // get hadoop and hive config directory from system and copy to
workspacePath
+ val hadoopConfDir = HadoopConfigUtils.getSystemHadoopConfDir match {
+ case None => ""
+ case hadoopConf if !LfsOperator.exists(hadoopConf.get) => ""
+ case hadoopConf =>
+ val dstDir = s"$workspacePath/hadoop-conf"
+ LfsOperator.mkCleanDirs(dstDir)
+ LfsOperator.copyDir(hadoopConf.get, dstDir)
+ dstDir
+ }
+ val hiveConfDir = HadoopConfigUtils.getSystemHiveConfDir match {
+ case None => ""
+ case hiveConf if !LfsOperator.exists(hiveConf.get) => ""
+ case hiveConf =>
+ val dstDir = s"$workspacePath/hive-conf"
+ LfsOperator.mkCleanDirs(dstDir)
+ LfsOperator.copyDir(hiveConf.get, dstDir)
+ dstDir
+ }
+ SparkHadoopDockerfileTemplate(
+ workspacePath,
+ sparkBaseImage,
+ sparkMainJarPath,
+ sparkExtraLibPaths,
+ hadoopConfDir,
+ hiveConfDir)
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 76a40a9ac..ed3393889 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -17,11 +17,12 @@
package org.apache.streampark.flink.packer.pipeline
-import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
+import org.apache.streampark.common.conf.{FlinkVersion, SparkVersion,
Workspace}
import org.apache.streampark.common.enums.{FlinkDeployMode, FlinkJobType,
SparkDeployMode, SparkJobType}
-import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
+import org.apache.streampark.flink.kubernetes.model.{K8sPodTemplates =>
FlinkK8sPodTemplates}
import org.apache.streampark.flink.packer.docker.DockerConf
import org.apache.streampark.flink.packer.maven.DependencyInfo
+import org.apache.streampark.spark.kubernetes.model.SparkK8sPodTemplates
import scala.collection.mutable.ArrayBuffer
@@ -96,7 +97,7 @@ case class FlinkK8sApplicationBuildRequest(
clusterId: String,
k8sNamespace: String,
flinkBaseImage: String,
- flinkPodTemplate: K8sPodTemplates,
+ flinkPodTemplate: FlinkK8sPodTemplates,
integrateWithHadoop: Boolean = false,
dockerConfig: DockerConf,
ingressTemplate: String)
@@ -131,3 +132,18 @@ case class SparkYarnBuildRequest(
jobType: SparkJobType,
deployMode: SparkDeployMode,
dependencyInfo: DependencyInfo) extends BuildParam
+
+case class SparkK8sApplicationBuildRequest(
+ appName: String,
+ workspace: String,
+ mainClass: String,
+ mainJar: String,
+ deployMode: SparkDeployMode,
+ jobType: SparkJobType,
+ sparkVersion: SparkVersion,
+ dependencyInfo: DependencyInfo,
+ k8sNamespace: String,
+ sparkBaseImage: String,
+ sparkPodTemplate: SparkK8sPodTemplates,
+ integrateWithHadoop: Boolean = false,
+ dockerConfig: DockerConf) extends BuildParam
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala
new file mode 100644
index 000000000..d90e8d758
--- /dev/null
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.pipeline.impl
+
+import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.util.ThreadUtils
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
+import org.apache.streampark.flink.packer.docker._
+import org.apache.streampark.flink.packer.pipeline._
+import org.apache.streampark.flink.packer.pipeline.BuildPipeline.executor
+
+import com.github.dockerjava.api.command.PushImageCmd
+import com.github.dockerjava.core.command.{HackBuildImageCmd,
HackPullImageCmd, HackPushImageCmd}
+import com.google.common.collect.Sets
+import org.apache.commons.lang3.StringUtils
+
+import java.io.File
+import java.nio.file.Paths
+import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/** Building pipeline for Spark kubernetes-native application mode */
+class SparkK8sApplicationBuildPipeline(request:
SparkK8sApplicationBuildRequest)
+ extends BuildPipeline {
+
+ override def pipeType: PipelineTypeEnum =
+ PipelineTypeEnum.SPARK_NATIVE_K8S_APPLICATION
+
+ private var dockerProcessWatcher: DockerProgressWatcher =
+ new SilentDockerProgressWatcher
+
+ // non-thread-safe
+ private val dockerProcess = new DockerResolveProgress(
+ DockerPullProgress.empty(),
+ DockerBuildProgress.empty(),
+ DockerPushProgress.empty())
+
+ override protected def offerBuildParam: SparkK8sApplicationBuildRequest =
+ request
+
+ def registerDockerProgressWatcher(watcher: DockerProgressWatcher): Unit = {
+ dockerProcessWatcher = watcher
+ }
+
+ @throws[Throwable]
+ override protected def buildProcess(): DockerImageBuildResponse = {
+
+ // Step-1: init build workspace of spark job
+ // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
+ val buildWorkspace =
+ execStep(1) {
+ val buildWorkspace =
+ s"${request.workspace}/${request.k8sNamespace}"
+ LfsOperator.mkCleanDirs(buildWorkspace)
+ logInfo(s"Recreate building workspace: $buildWorkspace")
+ buildWorkspace
+ }.getOrElse(throw getError.exception)
+
+ // Step-2: export k8s pod template files
+ val podTemplatePaths = request.sparkPodTemplate match {
+ case podTemplate if podTemplate.isEmpty =>
+ skipStep(2)
+ Map[String, String]()
+ case podTemplate =>
+ execStep(2) {
+ val podTemplateFiles =
+ PodTemplateTool
+ .preparePodTemplateFiles(buildWorkspace, podTemplate)
+ .tmplFiles
+ logInfo(s"Export spark podTemplates:
${podTemplateFiles.values.mkString(",")}")
+ podTemplateFiles
+ }.getOrElse(throw getError.exception)
+ }
+
+ // Step-3: prepare spark job jar
+ val (mainJarPath, extJarLibs) =
+ execStep(3) {
+ val mainJarName = Paths.get(request.mainJar).getFileName
+ val mainJarPath = s"$buildWorkspace/$mainJarName"
+ LfsOperator.copy(request.mainJar, mainJarPath)
+ logInfo(s"Prepared spark job jar: $mainJarPath")
+ mainJarPath -> Set[String]()
+ }.getOrElse(throw getError.exception)
+
+ // Step-4: generate and Export spark image dockerfiles
+ val (dockerfile, dockerFileTemplate) =
+ execStep(4) {
+ val dockerFileTemplate = {
+ if (request.integrateWithHadoop) {
+ SparkHadoopDockerfileTemplate.fromSystemHadoopConf(
+ buildWorkspace,
+ request.sparkBaseImage,
+ mainJarPath,
+ extJarLibs)
+ } else {
+ SparkDockerfileTemplate(
+ buildWorkspace,
+ request.sparkBaseImage,
+ mainJarPath,
+ extJarLibs)
+ }
+ }
+ val dockerFile = dockerFileTemplate.writeDockerfile
+ logInfo(
+ s"Output spark dockerfile: ${dockerFile.getAbsolutePath}, content:
\n${dockerFileTemplate.offerDockerfileContent}")
+ dockerFile -> dockerFileTemplate
+ }.getOrElse(throw getError.exception)
+
+ val dockerConf = request.dockerConfig
+ val baseImageTag = request.sparkBaseImage.trim
+ val pushImageTag = {
+ if (request.k8sNamespace.isEmpty || request.appName.isEmpty) {
+ throw new IllegalArgumentException("k8sNamespace or appName cannot be
empty")
+ }
+ val expectedImageTag =
+ s"streampark-sparkjob-${request.k8sNamespace}-${request.appName}"
+ compileTag(expectedImageTag, dockerConf.registerAddress,
dockerConf.imageNamespace)
+ }
+
+ // Step-5: pull spark base image
+ execStep(5) {
+ usingDockerClient {
+ dockerClient =>
+ val pullImageCmd = {
+ // when the register address prefix is explicitly identified on
base image tag,
+ // the user's pre-saved docker register auth info would be used.
+ val pullImageCmdState =
+ dockerConf.registerAddress != null && !baseImageTag.startsWith(
+ dockerConf.registerAddress)
+ if (pullImageCmdState) {
+ dockerClient.pullImageCmd(baseImageTag)
+ } else {
+ dockerClient
+ .pullImageCmd(baseImageTag)
+ .withAuthConfig(dockerConf.toAuthConf)
+ }
+ }
+ val pullCmdCallback = pullImageCmd
+ .asInstanceOf[HackPullImageCmd]
+ .start(watchDockerPullProcess {
+ pullRsp =>
+ dockerProcess.pull.update(pullRsp)
+
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
+ })
+ pullCmdCallback.awaitCompletion
+ logInfo(s"Already pulled docker image from remote register,
imageTag=$baseImageTag")
+ }(err => throw new Exception(s"Pull docker image failed,
imageTag=$baseImageTag", err))
+ }.getOrElse(throw getError.exception)
+
+ // Step-6: build spark image
+ execStep(6) {
+ usingDockerClient {
+ dockerClient =>
+ val buildImageCmd = dockerClient
+ .buildImageCmd()
+ .withBaseDirectory(new File(buildWorkspace))
+ .withDockerfile(dockerfile)
+ .withTags(Sets.newHashSet(pushImageTag))
+
+ val buildCmdCallback = buildImageCmd
+ .asInstanceOf[HackBuildImageCmd]
+ .start(watchDockerBuildStep {
+ buildStep =>
+ dockerProcess.build.update(buildStep)
+ Future(
+
dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
+ })
+ val imageId = buildCmdCallback.awaitImageId
+ logInfo(s"Built docker image, imageId=$imageId,
imageTag=$pushImageTag")
+ }(err => throw new Exception(s"Build docker image failed.
tag=$pushImageTag", err))
+ }.getOrElse(throw getError.exception)
+
+ // Step-7: push spark image
+ execStep(7) {
+ usingDockerClient {
+ dockerClient =>
+ val pushCmd: PushImageCmd = dockerClient
+ .pushImageCmd(pushImageTag)
+ .withAuthConfig(dockerConf.toAuthConf)
+
+ val pushCmdCallback = pushCmd
+ .asInstanceOf[HackPushImageCmd]
+ .start(watchDockerPushProcess {
+ pushRsp =>
+ dockerProcess.push.update(pushRsp)
+
Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
+ })
+ pushCmdCallback.awaitCompletion
+ logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
+ }(err => throw new Exception(s"Push docker image failed.
tag=$pushImageTag", err))
+ }.getOrElse(throw getError.exception)
+
+ DockerImageBuildResponse(
+ buildWorkspace,
+ pushImageTag,
+ podTemplatePaths,
+ dockerFileTemplate.innerMainJarPath)
+ }
+
+ /** compile image tag with namespace and remote address. */
+ private[this] def compileTag(
+ tag: String,
+ registerAddress: String,
+ imageNamespace: String): String = {
+ var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
+ val addRegisterAddressState =
+ StringUtils.isNotBlank(registerAddress) &&
!tagName.startsWith(registerAddress)
+ if (addRegisterAddressState) {
+ tagName = s"$registerAddress/$tagName"
+ }
+ tagName.toLowerCase
+ }
+
+}
+
+object SparkK8sApplicationBuildPipeline {
+
+ val execPool = new ThreadPoolExecutor(
+ Runtime.getRuntime.availableProcessors * 5,
+ Runtime.getRuntime.availableProcessors() * 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable](2048),
+ ThreadUtils.threadFactory("streampark-docker-progress-watcher-executor"),
+ new ThreadPoolExecutor.DiscardOldestPolicy)
+
+ implicit val executor: ExecutionContext =
+ ExecutionContext.fromExecutorService(execPool)
+
+ def of(request: SparkK8sApplicationBuildRequest):
SparkK8sApplicationBuildPipeline =
+ new SparkK8sApplicationBuildPipeline(request)
+
+}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
index 32d09b856..2e198d075 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
@@ -63,5 +63,4 @@ object SparkClient extends Logger {
}
})
}
-
}