This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b85b5d4e3 [Feature] Support Packaging Spark Docker Images (#4133)
b85b5d4e3 is described below

commit b85b5d4e3bbe1096ec552e102809f10d57d9a5ad
Author: lenoxzhao <[email protected]>
AuthorDate: Fri Nov 29 10:42:36 2024 +0800

    [Feature] Support Packaging Spark Docker Images (#4133)
    
    * feat: support spark docker package
    
    * feature: support spark docker packaging
    
    * feature: update h2 schema
---
 .../streampark/common/enums/SparkDeployMode.java   |   7 +-
 .../src/main/assembly/script/data/mysql-data.sql   |  23 +-
 .../main/assembly/script/schema/mysql-schema.sql   |  20 +-
 .../console/core/entity/SparkApplication.java      |  17 ++
 .../SparkApplicationBuildPipelineServiceImpl.java  |  43 ++++
 .../impl/SparkApplicationManageServiceImpl.java    |   3 +
 .../core/service/impl/ResourceServiceImpl.java     |   1 +
 .../src/main/resources/db/schema-h2.sql            |   3 +
 .../src/locales/lang/zh-CN/spark/app.ts            |   4 +-
 .../flink/kubernetes/PodTemplateTool.scala         |  39 ++++
 .../kubernetes/model/SparkK8sPodTemplates.scala    |  65 ++++++
 .../flink/packer/pipeline/PipelineTypeEnum.java    |  16 +-
 .../packer/docker/SparkDockerfileTemplate.scala    |  48 ++++
 .../docker/SparkDockerfileTemplateTrait.scala      | 121 ++++++++++
 .../docker/SparkHadoopDockerfileTemplate.scala     | 128 +++++++++++
 .../flink/packer/pipeline/BuildRequest.scala       |  22 +-
 .../impl/SparkK8sApplicationBuildPipeline.scala    | 249 +++++++++++++++++++++
 .../streampark/spark/client/SparkClient.scala      |   1 -
 18 files changed, 787 insertions(+), 23 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
index 785d77693..a45b3a523 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDeployMode.java
@@ -36,7 +36,12 @@ public enum SparkDeployMode {
     YARN_CLUSTER(2, "yarn-cluster"),
 
     /** yarn client */
-    YARN_CLIENT(3, "yarn-client");
+    YARN_CLIENT(3, "yarn-client"),
+
+    /** yarn-cluster mode */
+    KUBERNETES_NATIVE_CLUSTER(4, "kubernetes-native-cluster"),
+
+    KUBERNETES_NATIVE_CLIENT(5, "kubernetes-native-client");
 
     private final Integer mode;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index c63ae9771..ecf0edd86 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -26,6 +26,10 @@ set foreign_key_checks = 0;
 -- ----------------------------
 insert into `t_team` values (100000, 'default', null, now(), now());
 
+-- ----------------------------
+-- Records of flink-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`) VALUES 
(100000, 1, now(), now());
 
 -- ----------------------------
 -- Records of t_flink_app
@@ -51,6 +55,11 @@ insert into `t_flink_project` values (100000, 100000, 
'streampark-quickstart', '
 -- ----------------------------
 insert into `t_flink_sql` values (100000, 100000, 
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
 null, null, 1, 1, now());
 
+-- ----------------------------
+-- Records of spark-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`) VALUES 
(100001, 2, now(), now());
+
 -- ----------------------------
 -- Records of t_spark_app
 -- ----------------------------
@@ -58,17 +67,17 @@ insert into `t_spark_app` (
      `id`, `team_id`, `job_type`, `app_type`, `app_name`, `deploy_mode`, 
`resource_from`, `main_class`,
      `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`, 
`option_state`, `user_id`,
      `description`, `tracking`, `release`, `build`, `create_time`, 
`modify_time`, `tags`)
-values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+values (100001, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
 
 -- ----------------------------
 -- Records of t_spark_effective
 -- ----------------------------
-insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+insert into `t_spark_effective` values (100000, 100001, 4, 100000, now());
 
 -- ----------------------------
 -- Records of t_spark_sql
 -- ----------------------------
-insert into `t_spark_sql` values (100000, 100000, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
+insert into `t_spark_sql` values (100000, 100001, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
 
 
 -- ----------------------------
@@ -315,10 +324,10 @@ insert into `t_role_menu` values (100102, 100002, 140401);
 insert into `t_role_menu` values (100103, 100002, 140402);
 insert into `t_role_menu` values (100104, 100002, 140403);
 insert into `t_role_menu` values (100105, 100002, 150000);
-insert into `t_role_menu` values (100107, 100002, 150601);
-insert into `t_role_menu` values (100108, 100002, 150602);
-insert into `t_role_menu` values (100109, 100002, 150603);
-insert into `t_role_menu` values (100110, 100002, 150604);
+insert into `t_role_menu` values (100107, 100002, 150501);
+insert into `t_role_menu` values (100108, 100002, 150502);
+insert into `t_role_menu` values (100109, 100002, 150503);
+insert into `t_role_menu` values (100110, 100002, 150504);
 insert into `t_role_menu` values (100111, 100002, 150601);
 insert into `t_role_menu` values (100112, 100002, 150602);
 insert into `t_role_menu` values (100113, 100002, 150603);
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index b76f90b5c..a0885a319 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -24,13 +24,14 @@ set foreign_key_checks = 0;
 -- ----------------------------
 -- Table structure for t_app
 -- ----------------------------
-create table if not exists `t_app` (
-`id` bigint not null,
-`job_type` tinyint default null,
-`create_time` datetime default null comment 'create time',
-`modify_time` datetime default null comment 'modify time',
-primary key(`id`)
-);
+drop table if exists `t_app`;
+create table `t_app` (
+    `id` bigint not null auto_increment,
+    `job_type` tinyint default null,
+    `create_time` datetime default null comment 'create time',
+    `modify_time` datetime default null comment 'modify time',
+    primary key(`id`)
+) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
 
 -- ----------------------------
 -- Table structure for t_flink_app
@@ -605,13 +606,16 @@ create table `t_spark_app` (
   `jar_check_sum` bigint default null,
   `app_properties` text collate utf8mb4_general_ci comment 'Arbitrary Spark 
configuration property in key=value format (e.g. spark.driver.cores=1)',
   `app_args` text collate utf8mb4_general_ci comment 'Arguments passed to the 
main method of your main class',
-  `app_id` varchar(64) collate utf8mb4_general_ci default null comment 
'(1)application_id on yarn(2)driver_pod_name on k8s',
+  `cluster_id` varchar(64) collate utf8mb4_general_ci default null comment 
'(1)application_id on yarn(2)driver_pod_name on k8s',
   `yarn_queue` varchar(128) collate utf8mb4_general_ci default null,
   `k8s_master_url` varchar(128) collate utf8mb4_general_ci default null,
   `k8s_container_image` varchar(128) collate utf8mb4_general_ci default null,
   `k8s_image_pull_policy` tinyint default 1,
   `k8s_service_account` varchar(64) collate utf8mb4_general_ci default null,
   `k8s_namespace` varchar(64) collate utf8mb4_general_ci default null,
+  `k8s_driver_pod_template` text collate utf8mb4_general_ci,
+  `k8s_executor_pod_template` text collate utf8mb4_general_ci,
+  `k8s_hadoop_integration` tinyint default 0,
   `hadoop_user` varchar(64) collate utf8mb4_general_ci default null,
   `restart_size` int default null,
   `restart_count` int default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index fe4c53b8b..56894baa9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -35,6 +35,7 @@ import 
org.apache.streampark.console.core.enums.SparkAppStateEnum;
 import 
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
 import org.apache.streampark.console.core.util.YarnQueueLabelExpression;
 import org.apache.streampark.flink.packer.maven.DependencyInfo;
+import org.apache.streampark.spark.kubernetes.model.SparkK8sPodTemplates;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -139,6 +140,14 @@ public class SparkApplication extends BaseEntity {
     /** k8s namespace */
     private String k8sNamespace = Constants.DEFAULT;
 
+    /** spark kubernetes pod template */
+    private String k8sDriverPodTemplate;
+
+    private String k8sExecutorPodTemplate;
+
+    /** spark-hadoop integration on spark-k8s mode */
+    private Boolean k8sHadoopIntegration;
+
     @TableField("HADOOP_USER")
     private String hadoopUser;
 
@@ -356,6 +365,8 @@ public class SparkApplication extends BaseEntity {
         switch (this.getDeployModeEnum()) {
             case REMOTE:
             case LOCAL:
+            case KUBERNETES_NATIVE_CLIENT:
+            case KUBERNETES_NATIVE_CLUSTER:
                 return getLocalAppHome();
             case YARN_CLIENT:
             case YARN_CLUSTER:
@@ -376,6 +387,10 @@ public class SparkApplication extends BaseEntity {
         return ApplicationType.of(appType);
     }
 
+    public SparkK8sPodTemplates getK8sPodTemplates() {
+        return SparkK8sPodTemplates.of(k8sDriverPodTemplate, 
k8sExecutorPodTemplate);
+    }
+
     @JsonIgnore
     @SneakyThrows
     @SuppressWarnings("unchecked")
@@ -466,6 +481,8 @@ public class SparkApplication extends BaseEntity {
             case YARN_CLUSTER:
             case YARN_CLIENT:
                 return StorageType.HDFS;
+            case KUBERNETES_NATIVE_CLUSTER:
+            case KUBERNETES_NATIVE_CLIENT:
             case REMOTE:
                 return StorageType.LFS;
             default:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
index 2642dd94c..fb7221274 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.Dependency;
+import org.apache.streampark.console.core.bean.DockerConfig;
 import org.apache.streampark.console.core.entity.ApplicationBuildPipeline;
 import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.Message;
@@ -46,6 +47,7 @@ import 
org.apache.streampark.console.core.enums.ResourceTypeEnum;
 import 
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
 import org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.ResourceService;
+import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.SparkSqlService;
 import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
@@ -55,6 +57,7 @@ import 
org.apache.streampark.console.core.service.application.SparkApplicationIn
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
+import org.apache.streampark.flink.packer.docker.DockerConf;
 import org.apache.streampark.flink.packer.maven.Artifact;
 import org.apache.streampark.flink.packer.maven.DependencyInfo;
 import org.apache.streampark.flink.packer.pipeline.BuildPipeline;
@@ -62,7 +65,9 @@ import 
org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.PipeWatcher;
 import org.apache.streampark.flink.packer.pipeline.PipelineSnapshot;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
+import 
org.apache.streampark.flink.packer.pipeline.SparkK8sApplicationBuildRequest;
 import org.apache.streampark.flink.packer.pipeline.SparkYarnBuildRequest;
+import 
org.apache.streampark.flink.packer.pipeline.impl.SparkK8sApplicationBuildPipeline;
 import org.apache.streampark.flink.packer.pipeline.impl.SparkYarnBuildPipeline;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -111,6 +116,9 @@ public class SparkApplicationBuildPipelineServiceImpl
     @Autowired
     private MessageService messageService;
 
+    @Autowired
+    private SettingService settingService;
+
     @Autowired
     private ApplicationLogService applicationLogService;
 
@@ -394,12 +402,47 @@ public class SparkApplicationBuildPipelineServiceImpl
                     getMergedDependencyInfo(app));
                 log.info("Submit params to building pipeline : {}", 
yarnAppRequest);
                 return SparkYarnBuildPipeline.of(yarnAppRequest);
+            case KUBERNETES_NATIVE_CLUSTER:
+            case KUBERNETES_NATIVE_CLIENT:
+                DockerConfig dockerConfig = settingService.getDockerConfig();
+                SparkK8sApplicationBuildRequest k8sApplicationBuildRequest = 
buildSparkK8sApplicationBuildRequest(
+                    app, mainClass, sparkUserJar, sparkEnv, dockerConfig);
+                log.info("Submit params to building pipeline : {}", 
k8sApplicationBuildRequest);
+                return 
SparkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
             default:
                 throw new UnsupportedOperationException(
                     "Unsupported Building Application for DeployMode: " + 
app.getDeployModeEnum());
         }
     }
 
+    @Nonnull
+    private SparkK8sApplicationBuildRequest 
buildSparkK8sApplicationBuildRequest(
+                                                                               
  @Nonnull SparkApplication app,
+                                                                               
  String mainClass,
+                                                                               
  String mainJar,
+                                                                               
  SparkEnv sparkEnv,
+                                                                               
  DockerConfig dockerConfig) {
+        SparkK8sApplicationBuildRequest k8sApplicationBuildRequest = new 
SparkK8sApplicationBuildRequest(
+            app.getAppName(),
+            app.getAppHome(),
+            mainClass,
+            mainJar,
+            app.getDeployModeEnum(),
+            app.getJobTypeEnum(),
+            sparkEnv.getSparkVersion(),
+            getMergedDependencyInfo(app),
+            app.getK8sNamespace(),
+            app.getK8sContainerImage(),
+            app.getK8sPodTemplates(),
+            app.getK8sHadoopIntegration() != null ? 
app.getK8sHadoopIntegration() : false,
+            DockerConf.of(
+                dockerConfig.getAddress(),
+                dockerConfig.getNamespace(),
+                dockerConfig.getUsername(),
+                dockerConfig.getPassword()));
+        return k8sApplicationBuildRequest;
+    }
+
     private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication 
app) {
         switch (app.getJobTypeEnum()) {
             case SPARK_JAR:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index a998f447c..4ec68298b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -345,6 +345,9 @@ public class SparkApplicationManageServiceImpl
         newApp.setK8sImagePullPolicy(oldApp.getK8sImagePullPolicy());
         newApp.setK8sServiceAccount(oldApp.getK8sServiceAccount());
         newApp.setK8sNamespace(oldApp.getK8sNamespace());
+        newApp.setK8sDriverPodTemplate(oldApp.getK8sDriverPodTemplate());
+        newApp.setK8sExecutorPodTemplate(oldApp.getK8sExecutorPodTemplate());
+        newApp.setK8sHadoopIntegration(oldApp.getK8sHadoopIntegration());
 
         newApp.setHadoopUser(oldApp.getHadoopUser());
         newApp.setRestartSize(oldApp.getRestartSize());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 4ca4d3514..2972c2f8c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -215,6 +215,7 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
         if (resource.getResourceType() == ResourceTypeEnum.APP) {
             findResource.setMainClass(resource.getMainClass());
         }
+        findResource.setResourceType(resource.getResourceType());
         findResource.setDescription(resource.getDescription());
         baseMapper.updateById(findResource);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index cc44fe197..d8f1b64fe 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -577,6 +577,9 @@ create table if not exists `t_spark_app` (
   `k8s_image_pull_policy` tinyint default 1,
   `k8s_service_account` varchar(64) default null,
   `k8s_namespace` varchar(64) default null,
+  `k8s_driver_pod_template` text,
+  `k8s_executor_pod_template` text,
+  `k8s_hadoop_integration` tinyint default 0,
   `hadoop_user` varchar(64) default null,
   `restart_size` int default null,
   `restart_count` int default null,
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
index 2c0a1381d..c0d35dc0c 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/app.ts
@@ -19,8 +19,8 @@ export default {
     runningTasks: '运行中的作业',
     totalTask: 'Task 总数',
     totalStage: 'Stage 总数',
-    completedTask: '已进完成的作业数',
-    completedStage: '完成的 Stage 总数',
+    completedTask: '已完成 Task 总数',
+    completedStage: '已完成 Stage 总数',
     memory: '占用的总内存',
     VCore: '占用的总 VCores',
   },
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
index 31c743212..6da416d8c 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
@@ -18,6 +18,7 @@
 package org.apache.streampark.flink.kubernetes
 
 import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
+import org.apache.streampark.spark.kubernetes.model.SparkK8sPodTemplates
 
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.StringUtils
@@ -38,6 +39,12 @@ object PodTemplateTool {
   val KUBERNETES_TM_POD_TEMPLATE: PodTemplateType =
     PodTemplateType("kubernetes.pod-template-file.taskmanager", 
"tm-pod-template.yaml")
 
+  val KUBERNETES_DRIVER_POD_TEMPLATE: PodTemplateType =
+    PodTemplateType("spark.kubernetes.driver.podTemplateFile", 
"driver-pod-template.yaml")
+
+  val KUBERNETES_EXECUTOR_POD_TEMPLATE: PodTemplateType =
+    PodTemplateType("spark.kubernetes.executor.podTemplateFile", 
"executor-pod-template.yaml")
+
   /**
    * Prepare kubernetes pod template file to buildWorkspace direactory.
    *
@@ -72,6 +79,38 @@ object PodTemplateTool {
     K8sPodTemplateFiles(podTempleMap.toMap)
   }
 
+  /**
+   * Prepare kubernetes pod template file to buildWorkspace direactory.
+   *
+   * @param buildWorkspace
+   *   project workspace dir of spark job
+   * @param podTemplates
+   *   spark kubernetes pod templates
+   * @return
+   *   Map[k8s pod template option, template file output path]
+   */
+  def preparePodTemplateFiles(
+      buildWorkspace: String,
+      podTemplates: SparkK8sPodTemplates): K8sPodTemplateFiles = {
+    val workspaceDir = new File(buildWorkspace)
+    if (!workspaceDir.exists()) {
+      workspaceDir.mkdir()
+    }
+
+    val podTempleMap = mutable.Map[String, String]()
+    val outputTmplContent = (tmplContent: String, podTmpl: PodTemplateType) => 
{
+      if (StringUtils.isNotBlank(tmplContent)) {
+        val outputPath = s"$buildWorkspace/${podTmpl.fileName}"
+        val outputFile = new File(outputPath)
+        FileUtils.write(outputFile, tmplContent, "UTF-8")
+        podTempleMap += (podTmpl.key -> outputPath)
+      }
+    }
+
+    outputTmplContent(podTemplates.driverPodTemplate, 
KUBERNETES_DRIVER_POD_TEMPLATE)
+    outputTmplContent(podTemplates.executorPodTemplate, 
KUBERNETES_EXECUTOR_POD_TEMPLATE)
+    K8sPodTemplateFiles(podTempleMap.toMap)
+  }
 }
 
 /**
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/SparkK8sPodTemplates.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/SparkK8sPodTemplates.scala
new file mode 100644
index 000000000..81d61440f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/SparkK8sPodTemplates.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.spark.kubernetes.model
+
+import org.apache.streampark.common.util.Utils
+
+import scala.util.Try
+
+/** Pod template for Spark k8s cluster */
+case class SparkK8sPodTemplates(
+    driverPodTemplate: String = "",
+    executorPodTemplate: String = "") {
+
+  def nonEmpty: Boolean = Option(driverPodTemplate).exists(_.trim.nonEmpty) ||
+    Option(executorPodTemplate).exists(_.trim.nonEmpty)
+
+  def isEmpty: Boolean = !nonEmpty
+
+  override def hashCode(): Int =
+    Utils.hashCode(driverPodTemplate, executorPodTemplate)
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: SparkK8sPodTemplates =>
+        Try(driverPodTemplate.trim).getOrElse("") == 
Try(that.driverPodTemplate.trim)
+          .getOrElse("") &&
+        Try(executorPodTemplate.trim).getOrElse("") == 
Try(that.executorPodTemplate.trim)
+          .getOrElse("")
+      case _ => false
+    }
+  }
+
+}
+
+object SparkK8sPodTemplates {
+
+  def empty: SparkK8sPodTemplates = new SparkK8sPodTemplates()
+
+  def of(driverPodTemplate: String, executorPodTemplate: String): 
SparkK8sPodTemplates =
+    SparkK8sPodTemplates(safeGet(driverPodTemplate), 
safeGet(executorPodTemplate))
+
+  private[this] def safeGet(content: String): String = {
+    content match {
+      case null => ""
+      case x if x.trim.isEmpty => ""
+      case x => x
+    }
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
 
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
index 4eb9786b3..025412ca9 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
+++ 
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineTypeEnum.java
@@ -89,7 +89,21 @@ public enum PipelineTypeEnum {
             .put(2, "Resolve maven dependencies")
             .put(3, "upload jar to yarn.provided.lib.dirs")
             .build(),
-        SimpleBuildResponse.class);
+        SimpleBuildResponse.class),
+
+    SPARK_NATIVE_K8S_APPLICATION(
+        7,
+        "spark native kubernetes application mode task building pipeline",
+        ImmutableMap.<Integer, String>builder()
+            .put(1, "Create building workspace")
+            .put(2, "Export kubernetes pod template")
+            .put(3, "Prepare spark job jar")
+            .put(4, "Export spark app dockerfile")
+            .put(5, "Pull spark app base docker image")
+            .put(6, "Build spark app docker image")
+            .put(7, "Push spark app docker image")
+            .build(),
+        DockerImageBuildResponse.class);
 
     private final Integer code;
     /** short description of pipeline type. */
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplate.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplate.scala
new file mode 100644
index 000000000..c4f9b14a6
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplate.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+/**
+ * Base spark docker file image template.
+ *
+ * @param workspacePath
+ *   Path of dockerfile workspace, it should be a directory.
+ * @param sparkBaseImage
+ *   Spark base docker image name, see https://hub.docker.com/r/apache/spark.
+ * @param sparkMainJarPath
+ *   Path of spark job main jar which would copy to $SPARK_HOME/usrlib/
+ * @param sparkExtraLibPaths
+ *   Path of additional spark lib path which would copy to $SPARK_HOME/lib/
+ */
+case class SparkDockerfileTemplate(
+    workspacePath: String,
+    sparkBaseImage: String,
+    sparkMainJarPath: String,
+    sparkExtraLibPaths: Set[String])
+  extends SparkDockerfileTemplateTrait {
+
+  /** offer content of DockerFile */
+  override def offerDockerfileContent: String = {
+    s"""FROM $sparkBaseImage
+       |USER root
+       |RUN mkdir -p $SPARK_HOME/usrlib
+       |COPY $mainJarName $SPARK_HOME/usrlib/$mainJarName
+       |COPY $extraLibName $SPARK_HOME/lib/
+       |""".stripMargin
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplateTrait.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplateTrait.scala
new file mode 100644
index 000000000..729aca65b
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkDockerfileTemplateTrait.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+import org.apache.streampark.common.constants.Constants
+import org.apache.streampark.common.fs.LfsOperator
+
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+
+/** Spark image dockerfile template. */
+trait SparkDockerfileTemplateTrait {
+
+  /** Path of dockerfile workspace, it should be a directory. */
+  def workspacePath: String
+
+  /** Spark base docker image name, see https://hub.docker.com/r/apache/spark 
*/
+  def sparkBaseImage: String
+
+  /** Path of spark job main jar which would copy to $SPARK_HOME/usrlib/ */
+  def sparkMainJarPath: String
+
+  /** Path of additional spark lib path which would copy to $SPARK_HOME/lib/ */
+  def sparkExtraLibPaths: Set[String]
+
+  /** Offer content of DockerFile. */
+  def offerDockerfileContent: String
+
+  /** Startup spark main jar path inner Docker */
+  def innerMainJarPath: String = s"local:///opt/spark/usrlib/$mainJarName"
+
+  /** output dockerfile name */
+  protected val DEFAULT_DOCKER_FILE_NAME = "Dockerfile"
+  protected val SPARK_LIB_PATH = "lib"
+  protected val SPARK_HOME: String = "$SPARK_HOME"
+
+  /** Dockerfile building workspace. */
+  lazy val workspace: Path = {
+    val path = Paths.get(workspacePath).toAbsolutePath
+    if (!LfsOperator.exists(workspacePath)) LfsOperator.mkdirs(workspacePath)
+    path
+  }
+
+  /**
+   * spark main jar name, the main jar would copy from `sparkMainjarPath` to
+   * `workspacePath/mainJarName.jar`.
+   */
+  lazy val mainJarName: String = {
+    val mainJarPath = Paths.get(sparkMainJarPath).toAbsolutePath
+    if (mainJarPath.getParent != workspace) {
+      LfsOperator.copy(
+        mainJarPath.toString,
+        s"${workspace.toString}/${mainJarPath.getFileName.toString}")
+    }
+    mainJarPath.getFileName.toString
+  }
+
+  /**
+   * spark extra jar lib, the jar file in `sparkExtraLibPaths` would be copyed 
into
+   * `SPARK_LIB_PATH`.
+   */
+  lazy val extraLibName: String = {
+    LfsOperator.mkCleanDirs(s"${workspace.toString}/$SPARK_LIB_PATH")
+    sparkExtraLibPaths
+      .map(new File(_))
+      .filter(_.exists())
+      .filter(_.getName.endsWith(Constants.JAR_SUFFIX))
+      .flatMap {
+        case f if f.isDirectory =>
+          f.listFiles
+            .filter(_.isFile)
+            .filter(_.getName.endsWith(Constants.JAR_SUFFIX))
+            .map(_.getAbsolutePath)
+        case f if f.isFile => Array(f.getAbsolutePath)
+      }
+      .foreach(LfsOperator.copy(_, s"${workspace.toString}/$SPARK_LIB_PATH"))
+    SPARK_LIB_PATH
+  }
+
+  /**
+   * write content of DockerFile to outputPath, the output dockerfile name is 
"dockerfile".
+   *
+   * @return
+   *   File Object for actual output Dockerfile
+   */
+  def writeDockerfile: File = {
+    val output = new File(s"$workspacePath/$DEFAULT_DOCKER_FILE_NAME")
+    FileUtils.write(output, offerDockerfileContent, "UTF-8")
+    output
+  }
+
+  /**
+   * write content of DockerFile to outputPath using specified output 
dockerfile name.
+   *
+   * @return
+   *   File Object for actual output Dockerfile
+   */
+  def writeDockerfile(dockerfileName: String): File = {
+    val output = new File(s"$workspacePath/$dockerfileName")
+    FileUtils.write(output, offerDockerfileContent, "UTF-8")
+    output
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkHadoopDockerfileTemplate.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkHadoopDockerfileTemplate.scala
new file mode 100644
index 000000000..9bc3d0f84
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/SparkHadoopDockerfileTemplate.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.docker
+
+import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.util.HadoopConfigUtils
+
+import javax.annotation.Nullable
+
+import java.nio.file.Paths
+
+/**
+ * flink-hadoop integration docker image template.
+ *
+ * @param workspacePath
+ *   Path of dockerfile workspace, it should be a directory.
+ * @param sparkBaseImage
+ *   Flink base docker image name, see https://hub.docker.com/_/flink.
+ * @param sparkMainJarPath
+ *   Path of spark job main jar which would copy to $FLINK_HOME/usrlib/
+ * @param sparkExtraLibPaths
+ *   Path of additional flink lib path which would copy to $FLINK_HOME/lib/
+ * @param hadoopConfDirPath
+ *   Path of hadoop conf directory.
+ * @param hiveConfDirPath
+ *   Path of hive conf directory.
+ */
+case class SparkHadoopDockerfileTemplate(
+    workspacePath: String,
+    sparkBaseImage: String,
+    sparkMainJarPath: String,
+    sparkExtraLibPaths: Set[String],
+    @Nullable hadoopConfDirPath: String,
+    @Nullable hiveConfDirPath: String)
+  extends SparkDockerfileTemplateTrait {
+
+  val hadoopConfDir: String =
+    workspace
+      .relativize(Paths.get(Option(hadoopConfDirPath).getOrElse("")))
+      .toString
+
+  val hiveConfDir: String =
+    workspace
+      .relativize(Paths.get(Option(hiveConfDirPath).getOrElse("")))
+      .toString
+
+  /** offer content of DockerFile */
+  override def offerDockerfileContent: String = {
+    var dockerfile =
+      s"""FROM $sparkBaseImage
+         |RUN mkdir -p $SPARK_HOME/usrlib
+         |""".stripMargin
+    if (hadoopConfDir.nonEmpty) {
+      dockerfile +=
+        s"""
+           |COPY $hadoopConfDir /opt/hadoop-conf
+           |ENV HADOOP_CONF_DIR /opt/hadoop-conf
+           |""".stripMargin
+    }
+    if (hiveConfDir.nonEmpty) {
+      dockerfile +=
+        s"""
+           |COPY $hiveConfDir /opt/hive-conf
+           |ENV HIVE_CONF_DIR /opt/hive-conf
+           |""".stripMargin
+    }
+    dockerfile +=
+      s"""
+         |COPY $extraLibName $SPARK_HOME/lib/
+         |COPY $mainJarName $SPARK_HOME/usrlib/$mainJarName
+         |""".stripMargin
+    dockerfile
+  }
+
+}
+
+object SparkHadoopDockerfileTemplate {
+
+  /** Use relevant system variables as the value of hadoopConfDirPath, 
hiveConfDirPath. */
+  def fromSystemHadoopConf(
+      workspacePath: String,
+      sparkBaseImage: String,
+      sparkMainJarPath: String,
+      sparkExtraLibPaths: Set[String]): SparkHadoopDockerfileTemplate = {
+    // get hadoop and hive config directory from system and copy to 
workspacePath
+    val hadoopConfDir = HadoopConfigUtils.getSystemHadoopConfDir match {
+      case None => ""
+      case hadoopConf if !LfsOperator.exists(hadoopConf.get) => ""
+      case hadoopConf =>
+        val dstDir = s"$workspacePath/hadoop-conf"
+        LfsOperator.mkCleanDirs(dstDir)
+        LfsOperator.copyDir(hadoopConf.get, dstDir)
+        dstDir
+    }
+    val hiveConfDir = HadoopConfigUtils.getSystemHiveConfDir match {
+      case None => ""
+      case hiveConf if !LfsOperator.exists(hiveConf.get) => ""
+      case hiveConf =>
+        val dstDir = s"$workspacePath/hive-conf"
+        LfsOperator.mkCleanDirs(dstDir)
+        LfsOperator.copyDir(hiveConf.get, dstDir)
+        dstDir
+    }
+    SparkHadoopDockerfileTemplate(
+      workspacePath,
+      sparkBaseImage,
+      sparkMainJarPath,
+      sparkExtraLibPaths,
+      hadoopConfDir,
+      hiveConfDir)
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
index 76a40a9ac..ed3393889 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala
@@ -17,11 +17,12 @@
 
 package org.apache.streampark.flink.packer.pipeline
 
-import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
+import org.apache.streampark.common.conf.{FlinkVersion, SparkVersion, 
Workspace}
 import org.apache.streampark.common.enums.{FlinkDeployMode, FlinkJobType, 
SparkDeployMode, SparkJobType}
-import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
+import org.apache.streampark.flink.kubernetes.model.{K8sPodTemplates => 
FlinkK8sPodTemplates}
 import org.apache.streampark.flink.packer.docker.DockerConf
 import org.apache.streampark.flink.packer.maven.DependencyInfo
+import org.apache.streampark.spark.kubernetes.model.SparkK8sPodTemplates
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -96,7 +97,7 @@ case class FlinkK8sApplicationBuildRequest(
     clusterId: String,
     k8sNamespace: String,
     flinkBaseImage: String,
-    flinkPodTemplate: K8sPodTemplates,
+    flinkPodTemplate: FlinkK8sPodTemplates,
     integrateWithHadoop: Boolean = false,
     dockerConfig: DockerConf,
     ingressTemplate: String)
@@ -131,3 +132,18 @@ case class SparkYarnBuildRequest(
     jobType: SparkJobType,
     deployMode: SparkDeployMode,
     dependencyInfo: DependencyInfo) extends BuildParam
+
+case class SparkK8sApplicationBuildRequest(
+    appName: String,
+    workspace: String,
+    mainClass: String,
+    mainJar: String,
+    deployMode: SparkDeployMode,
+    jobType: SparkJobType,
+    sparkVersion: SparkVersion,
+    dependencyInfo: DependencyInfo,
+    k8sNamespace: String,
+    sparkBaseImage: String,
+    sparkPodTemplate: SparkK8sPodTemplates,
+    integrateWithHadoop: Boolean = false,
+    dockerConfig: DockerConf) extends BuildParam
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala
new file mode 100644
index 000000000..d90e8d758
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.pipeline.impl
+
+import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.util.ThreadUtils
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
+import org.apache.streampark.flink.packer.docker._
+import org.apache.streampark.flink.packer.pipeline._
+import org.apache.streampark.flink.packer.pipeline.BuildPipeline.executor
+
+import com.github.dockerjava.api.command.PushImageCmd
+import com.github.dockerjava.core.command.{HackBuildImageCmd, 
HackPullImageCmd, HackPushImageCmd}
+import com.google.common.collect.Sets
+import org.apache.commons.lang3.StringUtils
+
+import java.io.File
+import java.nio.file.Paths
+import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/** Building pipeline for Spark kubernetes-native application mode */
+class SparkK8sApplicationBuildPipeline(request: 
SparkK8sApplicationBuildRequest)
+  extends BuildPipeline {
+
+  override def pipeType: PipelineTypeEnum =
+    PipelineTypeEnum.SPARK_NATIVE_K8S_APPLICATION
+
+  private var dockerProcessWatcher: DockerProgressWatcher =
+    new SilentDockerProgressWatcher
+
+  // non-thread-safe
+  private val dockerProcess = new DockerResolveProgress(
+    DockerPullProgress.empty(),
+    DockerBuildProgress.empty(),
+    DockerPushProgress.empty())
+
+  override protected def offerBuildParam: SparkK8sApplicationBuildRequest =
+    request
+
+  def registerDockerProgressWatcher(watcher: DockerProgressWatcher): Unit = {
+    dockerProcessWatcher = watcher
+  }
+
+  @throws[Throwable]
+  override protected def buildProcess(): DockerImageBuildResponse = {
+
+    // Step-1: init build workspace of spark job
+    // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
+    val buildWorkspace =
+      execStep(1) {
+        val buildWorkspace =
+          s"${request.workspace}/${request.k8sNamespace}"
+        LfsOperator.mkCleanDirs(buildWorkspace)
+        logInfo(s"Recreate building workspace: $buildWorkspace")
+        buildWorkspace
+      }.getOrElse(throw getError.exception)
+
+    // Step-2: export k8s pod template files
+    val podTemplatePaths = request.sparkPodTemplate match {
+      case podTemplate if podTemplate.isEmpty =>
+        skipStep(2)
+        Map[String, String]()
+      case podTemplate =>
+        execStep(2) {
+          val podTemplateFiles =
+            PodTemplateTool
+              .preparePodTemplateFiles(buildWorkspace, podTemplate)
+              .tmplFiles
+          logInfo(s"Export spark podTemplates: 
${podTemplateFiles.values.mkString(",")}")
+          podTemplateFiles
+        }.getOrElse(throw getError.exception)
+    }
+
+    // Step-3: prepare spark job jar
+    val (mainJarPath, extJarLibs) =
+      execStep(3) {
+        val mainJarName = Paths.get(request.mainJar).getFileName
+        val mainJarPath = s"$buildWorkspace/$mainJarName"
+        LfsOperator.copy(request.mainJar, mainJarPath)
+        logInfo(s"Prepared spark job jar: $mainJarPath")
+        mainJarPath -> Set[String]()
+      }.getOrElse(throw getError.exception)
+
+    // Step-4: generate and Export spark image dockerfiles
+    val (dockerfile, dockerFileTemplate) =
+      execStep(4) {
+        val dockerFileTemplate = {
+          if (request.integrateWithHadoop) {
+            SparkHadoopDockerfileTemplate.fromSystemHadoopConf(
+              buildWorkspace,
+              request.sparkBaseImage,
+              mainJarPath,
+              extJarLibs)
+          } else {
+            SparkDockerfileTemplate(
+              buildWorkspace,
+              request.sparkBaseImage,
+              mainJarPath,
+              extJarLibs)
+          }
+        }
+        val dockerFile = dockerFileTemplate.writeDockerfile
+        logInfo(
+          s"Output spark dockerfile: ${dockerFile.getAbsolutePath}, content: 
\n${dockerFileTemplate.offerDockerfileContent}")
+        dockerFile -> dockerFileTemplate
+      }.getOrElse(throw getError.exception)
+
+    val dockerConf = request.dockerConfig
+    val baseImageTag = request.sparkBaseImage.trim
+    val pushImageTag = {
+      if (request.k8sNamespace.isEmpty || request.appName.isEmpty) {
+        throw new IllegalArgumentException("k8sNamespace or appName cannot be 
empty")
+      }
+      val expectedImageTag =
+        s"streampark-sparkjob-${request.k8sNamespace}-${request.appName}"
+      compileTag(expectedImageTag, dockerConf.registerAddress, 
dockerConf.imageNamespace)
+    }
+
+    // Step-5: pull spark base image
+    execStep(5) {
+      usingDockerClient {
+        dockerClient =>
+          val pullImageCmd = {
+            // when the register address prefix is explicitly identified on 
base image tag,
+            // the user's pre-saved docker register auth info would be used.
+            val pullImageCmdState =
+              dockerConf.registerAddress != null && !baseImageTag.startsWith(
+                dockerConf.registerAddress)
+            if (pullImageCmdState) {
+              dockerClient.pullImageCmd(baseImageTag)
+            } else {
+              dockerClient
+                .pullImageCmd(baseImageTag)
+                .withAuthConfig(dockerConf.toAuthConf)
+            }
+          }
+          val pullCmdCallback = pullImageCmd
+            .asInstanceOf[HackPullImageCmd]
+            .start(watchDockerPullProcess {
+              pullRsp =>
+                dockerProcess.pull.update(pullRsp)
+                
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
+            })
+          pullCmdCallback.awaitCompletion
+          logInfo(s"Already pulled docker image from remote register, 
imageTag=$baseImageTag")
+      }(err => throw new Exception(s"Pull docker image failed, 
imageTag=$baseImageTag", err))
+    }.getOrElse(throw getError.exception)
+
+    // Step-6: build spark image
+    execStep(6) {
+      usingDockerClient {
+        dockerClient =>
+          val buildImageCmd = dockerClient
+            .buildImageCmd()
+            .withBaseDirectory(new File(buildWorkspace))
+            .withDockerfile(dockerfile)
+            .withTags(Sets.newHashSet(pushImageTag))
+
+          val buildCmdCallback = buildImageCmd
+            .asInstanceOf[HackBuildImageCmd]
+            .start(watchDockerBuildStep {
+              buildStep =>
+                dockerProcess.build.update(buildStep)
+                Future(
+                  
dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
+            })
+          val imageId = buildCmdCallback.awaitImageId
+          logInfo(s"Built docker image, imageId=$imageId, 
imageTag=$pushImageTag")
+      }(err => throw new Exception(s"Build docker image failed. 
tag=$pushImageTag", err))
+    }.getOrElse(throw getError.exception)
+
+    // Step-7: push spark image
+    execStep(7) {
+      usingDockerClient {
+        dockerClient =>
+          val pushCmd: PushImageCmd = dockerClient
+            .pushImageCmd(pushImageTag)
+            .withAuthConfig(dockerConf.toAuthConf)
+
+          val pushCmdCallback = pushCmd
+            .asInstanceOf[HackPushImageCmd]
+            .start(watchDockerPushProcess {
+              pushRsp =>
+                dockerProcess.push.update(pushRsp)
+                
Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
+            })
+          pushCmdCallback.awaitCompletion
+          logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
+      }(err => throw new Exception(s"Push docker image failed. 
tag=$pushImageTag", err))
+    }.getOrElse(throw getError.exception)
+
+    DockerImageBuildResponse(
+      buildWorkspace,
+      pushImageTag,
+      podTemplatePaths,
+      dockerFileTemplate.innerMainJarPath)
+  }
+
+  /** compile image tag with namespace and remote address. */
+  private[this] def compileTag(
+      tag: String,
+      registerAddress: String,
+      imageNamespace: String): String = {
+    var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
+    val addRegisterAddressState =
+      StringUtils.isNotBlank(registerAddress) && 
!tagName.startsWith(registerAddress)
+    if (addRegisterAddressState) {
+      tagName = s"$registerAddress/$tagName"
+    }
+    tagName.toLowerCase
+  }
+
+}
+
+object SparkK8sApplicationBuildPipeline {
+
+  val execPool = new ThreadPoolExecutor(
+    Runtime.getRuntime.availableProcessors * 5,
+    Runtime.getRuntime.availableProcessors() * 10,
+    60L,
+    TimeUnit.SECONDS,
+    new LinkedBlockingQueue[Runnable](2048),
+    ThreadUtils.threadFactory("streampark-docker-progress-watcher-executor"),
+    new ThreadPoolExecutor.DiscardOldestPolicy)
+
+  implicit val executor: ExecutionContext =
+    ExecutionContext.fromExecutorService(execPool)
+
+  def of(request: SparkK8sApplicationBuildRequest): 
SparkK8sApplicationBuildPipeline =
+    new SparkK8sApplicationBuildPipeline(request)
+
+}
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
index 32d09b856..2e198d075 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
@@ -63,5 +63,4 @@ object SparkClient extends Logger {
         }
       })
   }
-
 }

Reply via email to