This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new e378ea292 [Improve] flink-app & spark-app id auto-increment 
improvements (#4098)
e378ea292 is described below

commit e378ea29221b1ac1b3534db7de6c076a137757b1
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 28 07:26:57 2024 +0800

    [Improve] flink-app & spark-app id auto-increment improvements (#4098)
    
    * [Improve] StopRequest class-name rename to CancelRequest
    
    * [Improve]  spark & flink app-id auto-increment Improvements
    
    * [Improve] t_flink_log table rename to t_app_log
    
    * [Improve] Effective entity rename to FlinkEffective
---
 .../apache/streampark/common/conf/Workspace.scala  |  2 -
 .../main/assembly/script/schema/mysql-schema.sql   | 50 ++++++++++++++--------
 .../main/assembly/script/schema/pgsql-schema.sql   | 20 ++++-----
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   | 25 +++++++++--
 .../console/core/controller/ConfigController.java  |  2 +-
 ...> FlinkApplicationBuildPipelineController.java} |  4 +-
 ...roller.java => FlinkApplicationController.java} |  6 +--
 ...java => FlinkApplicationHistoryController.java} |  2 +-
 ...Controller.java => FlinkCatalogController.java} |  6 +--
 .../console/core/controller/ProxyController.java   |  2 +-
 .../SparkApplicationBuildPipelineController.java   |  2 +-
 .../controller/SparkApplicationController.java     |  4 +-
 .../core/controller/SparkConfigController.java     |  2 +-
 .../core/controller/SparkProxyController.java      |  2 +-
 .../entity/{Effective.java => Application.java}    | 19 ++++----
 .../console/core/entity/ApplicationLog.java        | 20 +++++++--
 .../console/core/entity/FlinkApplication.java      |  2 +-
 .../entity/{Effective.java => FlinkEffective.java} |  2 +-
 .../console/core/entity/SparkApplication.java      |  6 +--
 .../console/core/enums/EngineTypeEnum.java         |  4 +-
 ...EffectiveMapper.java => ApplicationMapper.java} |  5 ++-
 ...ectiveMapper.java => FlinkEffectiveMapper.java} |  4 +-
 .../console/core/runner/EnvInitializer.java        |  1 -
 .../console/core/runner/QuickStartRunner.java      |  2 +-
 ...atalogService.java => FlinkCatalogService.java} |  4 +-
 ...tiveService.java => FlinkEffectiveService.java} |  6 +--
 .../{ => application}/AppBuildPipeService.java     |  2 +-
 .../{ => application}/ApplicationLogService.java   |  2 +-
 .../application/ApplicationService.java}           | 11 +++--
 .../FlinkApplicationBackUpService.java             |  2 +-
 .../FlinkApplicationConfigService.java             |  2 +-
 .../SparkAppBuildPipeService.java                  |  2 +-
 .../SparkApplicationBackUpService.java             |  2 +-
 .../SparkApplicationConfigService.java             |  2 +-
 .../SparkApplicationLogService.java                |  2 +-
 .../impl/AppBuildPipeServiceImpl.java              | 10 ++---
 .../impl/ApplicationLogServiceImpl.java            |  4 +-
 .../application/impl/ApplicationServiceImpl.java   | 49 +++++++++++++++++++++
 .../impl/FlinkApplicationActionServiceImpl.java    | 44 +++++++++----------
 .../impl/FlinkApplicationBackUpServiceImpl.java    | 10 ++---
 .../impl/FlinkApplicationConfigServiceImpl.java    |  8 ++--
 .../impl/FlinkApplicationManageServiceImpl.java    | 28 +++++++++---
 .../impl/SparkAppBuildPipeServiceImpl.java         |  8 ++--
 .../impl/SparkApplicationActionServiceImpl.java    | 34 +++++++--------
 .../impl/SparkApplicationBackUpServiceImpl.java    |  8 ++--
 .../impl/SparkApplicationConfigServiceImpl.java    |  4 +-
 .../impl/SparkApplicationLogServiceImpl.java       |  4 +-
 .../impl/SparkApplicationManageServiceImpl.java    | 33 +++++++++-----
 .../core/service/impl/FlinkCatalogServiceImpl.java |  4 +-
 ...iceImpl.java => FlinkEffectiveServiceImpl.java} | 43 ++++++++++---------
 .../service/impl/FlinkSavepointServiceImpl.java    |  8 ++--
 .../core/service/impl/FlinkSqlServiceImpl.java     |  6 +--
 .../core/service/impl/ProxyServiceImpl.java        |  4 +-
 .../core/service/impl/SparkSqlServiceImpl.java     |  2 +-
 .../core/task/ApplicationBackUpCleanTask.java      |  2 +-
 .../console/core/watcher/SparkAppHttpWatcher.java  |  2 +-
 .../src/main/resources/db/data-h2.sql              | 19 ++++++--
 .../src/main/resources/db/schema-h2.sql            | 46 +++++++++++++-------
 .../{EffectiveMapper.xml => ApplicationMapper.xml} |  2 +-
 .../main/resources/mapper/core/EffectiveMapper.xml |  2 +-
 ...java => FlinkApplicationManageServiceTest.java} |  2 +-
 ...rviceTest.java => FlinkCatalogServiceTest.java} |  4 +-
 ...iceTest.java => FlinkSavepointServiceTest.java} |  9 ++--
 .../streampark/spark/client/SparkClient.scala      |  8 ++--
 .../{StopRequest.scala => CancelRequest.scala}     |  2 +-
 .../{StopResponse.scala => CancelResponse.scala}   |  2 +-
 .../spark/client/SparkClientEndpoint.scala         |  4 +-
 .../streampark/spark/client/impl/YarnClient.scala  | 20 ++++-----
 .../spark/client/trait/SparkClientTrait.scala      |  8 ++--
 69 files changed, 409 insertions(+), 265 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index c8ce1dd95..cc9161f5e 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -109,8 +109,6 @@ case class Workspace(storageType: StorageType) {
 
   lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"
 
-  lazy val SPARK_APP_WORKSPACE = s"$WORKSPACE/spark-workspace"
-
   lazy val APP_FLINK = s"$WORKSPACE/flink"
 
   lazy val APP_SPARK = s"$WORKSPACE/spark"
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 09c6620c6..e054d46c7 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -22,27 +22,22 @@ set names utf8mb4;
 set foreign_key_checks = 0;
 
 -- ----------------------------
--- table structure for t_flink_app_backup
+-- Table structure for t_app
 -- ----------------------------
-drop table if exists `t_flink_app_backup`;
-create table `t_flink_app_backup` (
-  `id` bigint not null auto_increment,
-  `app_id` bigint default null,
-  `sql_id` bigint default null,
-  `config_id` bigint default null,
-  `version` int default null,
-  `path` varchar(128) collate utf8mb4_general_ci default null,
-  `description` varchar(255) collate utf8mb4_general_ci default null,
-  `create_time` datetime default null comment 'create time',
-  primary key (`id`) using btree
-) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
+create table if not exists `t_app` (
+`id` bigint not null,
+`job_type` tinyint default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
+);
 
 -- ----------------------------
 -- Table structure for t_flink_app
 -- ----------------------------
 drop table if exists `t_flink_app`;
 create table `t_flink_app` (
-  `id` bigint not null auto_increment,
+  `id` bigint not null,
   `team_id` bigint not null,
   `job_type` tinyint default null,
   `deploy_mode` tinyint default null,
@@ -110,6 +105,23 @@ create table `t_flink_app` (
 ) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
 
 
+-- ----------------------------
+-- table structure for t_flink_app_backup
+-- ----------------------------
+drop table if exists `t_flink_app_backup`;
+create table `t_flink_app_backup` (
+`id` bigint not null,
+`app_id` bigint default null,
+`sql_id` bigint default null,
+`config_id` bigint default null,
+`version` int default null,
+`path` varchar(128) collate utf8mb4_general_ci default null,
+`description` varchar(255) collate utf8mb4_general_ci default null,
+`create_time` datetime default null comment 'create time',
+primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
+
+
 -- ----------------------------
 -- table structure for t_flink_config
 -- ----------------------------
@@ -161,14 +173,14 @@ create table `t_flink_env` (
 
 
 -- ----------------------------
--- table structure for t_flink_log
+-- table structure for t_app_log
 -- ----------------------------
-drop table if exists `t_flink_log`;
-create table `t_flink_log` (
+drop table if exists `t_app_log`;
+create table `t_app_log` (
   `id` bigint not null auto_increment,
   `app_id` bigint default null,
-  `yarn_app_id` varchar(64) collate utf8mb4_general_ci default null,
-  `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
+  `cluster_id` varchar(64) collate utf8mb4_general_ci default null,
+  `tracking_url` varchar(255) collate utf8mb4_general_ci default null,
   `success` tinyint default null,
   `exception` text collate utf8mb4_general_ci,
   `option_time` datetime default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 6b2df02a1..dc1bf538a 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -39,7 +39,7 @@ drop table if exists "public"."t_app_build_pipe";
 drop table if exists "public"."t_flink_app_backup";
 drop table if exists "public"."t_alert_config";
 drop table if exists "public"."t_access_token";
-drop table if exists "public"."t_flink_log";
+drop table if exists "public"."t_app_log";
 drop table if exists "public"."t_team";
 drop table if exists "public"."t_variable";
 drop table if exists "public"."t_external_link";
@@ -66,7 +66,7 @@ drop sequence if exists 
"public"."streampark_t_distributed_task_id_seq";
 drop sequence if exists "public"."streampark_t_flink_app_backup_id_seq";
 drop sequence if exists "public"."streampark_t_alert_config_id_seq";
 drop sequence if exists "public"."streampark_t_access_token_id_seq";
-drop sequence if exists "public"."streampark_t_flink_log_id_seq";
+drop sequence if exists "public"."streampark_t_app_log_id_seq";
 drop sequence if exists "public"."streampark_t_team_id_seq";
 drop sequence if exists "public"."streampark_t_variable_id_seq";
 drop sequence if exists "public"."streampark_t_external_link_id_seq";
@@ -192,7 +192,7 @@ create sequence "public"."streampark_t_flink_app_id_seq"
     increment 1 start 10000 cache 1 minvalue 10000 maxvalue 
9223372036854775807;
 
 create table "public"."t_flink_app" (
-  "id" int8 not null default 
nextval('streampark_t_flink_app_id_seq'::regclass),
+  "id" int8 not null,
   "team_id" int8,
   "job_type" int2,
   "deploy_mode" int2,
@@ -403,16 +403,16 @@ create index "un_env_name" on "public"."t_flink_env" 
using btree (
 
 
 -- ----------------------------
--- table structure for t_flink_log
+-- table structure for t_app_log
 -- ----------------------------
-create sequence "public"."streampark_t_flink_log_id_seq"
+create sequence "public"."streampark_t_app_log_id_seq"
     increment 1 start 10000 cache 1 minvalue 10000 maxvalue 
9223372036854775807;
 
-create table "public"."t_flink_log" (
-  "id" int8 not null default 
nextval('streampark_t_flink_log_id_seq'::regclass),
+create table "public"."t_app_log" (
+  "id" int8 not null default nextval('streampark_t_app_log_id_seq'::regclass),
   "app_id" int8,
-  "yarn_app_id" varchar(64) collate "pg_catalog"."default",
-  "job_manager_url" varchar(255) collate "pg_catalog"."default",
+  "cluster_id" varchar(64) collate "pg_catalog"."default",
+  "tracking_url" varchar(255) collate "pg_catalog"."default",
   "success" boolean,
   "exception" text collate "pg_catalog"."default",
   "option_time" timestamp(6),
@@ -420,7 +420,7 @@ create table "public"."t_flink_log" (
   "user_id" int8
 )
 ;
-alter table "public"."t_flink_log" add constraint "t_flink_log_pkey" primary 
key ("id");
+alter table "public"."t_app_log" add constraint "t_app_log_pkey" primary key 
("id");
 
 
 -- ----------------------------
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 6ba50d281..997146f81 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -20,14 +20,33 @@ use streampark;
 set names utf8mb4;
 set foreign_key_checks = 0;
 
-alter table t_app_backup rename to t_flink_app_backup;
+
+-- ----------------------------
+-- Table structure for t_app
+-- ----------------------------
+create table if not exists `t_app` (
+`id` bigint not null,
+`job_type` tinyint default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
+);
+
 
 alter table `t_flink_app`
-    add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
+    modify column `id` not null;
+add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
     -- modify_time change with duration #3188
     modify column `modify_time` datetime not null default current_timestamp 
comment 'modify time';
 
-alter table `t_flink_log`
+alter table t_app_backup rename to t_flink_app_backup;
+
+alter table t_flink_log rename to t_app_log;
+
+alter table `t_app_log`
+    change column `yarn_app_id` `cluster_id` varchar(64) default null,
+    change column `job_manager_url` `tracking_url` varchar(255) default null,
+    add column `job_type` tinyint default null,
     add column `user_id` bigint default null comment 'operator user id';
 
 alter table `t_flink_project`
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
index dd63d21aa..7622e92f2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
@@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java
similarity index 95%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java
index ada06d148..211b9c3aa 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java
@@ -21,7 +21,7 @@ import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.annotation.Permission;
 import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.AppBuildPipeService;
 import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
 import org.apache.streampark.flink.packer.pipeline.PipelineTypeEnum;
 
@@ -42,7 +42,7 @@ import java.util.Optional;
 @Validated
 @RestController
 @RequestMapping("flink/pipe")
-public class ApplicationBuildPipelineController {
+public class FlinkApplicationBuildPipelineController {
 
     @Autowired
     private AppBuildPipeService appBuildPipeService;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java
index 3d9252731..8474cf698 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java
@@ -28,10 +28,10 @@ import 
org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
 import org.apache.streampark.console.core.enums.AppExistsStateEnum;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
 import org.apache.streampark.console.core.service.ResourceService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 
@@ -55,7 +55,7 @@ import java.util.Map;
 @Validated
 @RestController
 @RequestMapping("flink/app")
-public class ApplicationController {
+public class FlinkApplicationController {
 
     @Autowired
     private FlinkApplicationManageService applicationManageService;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java
index 04cea4eda..b4e93c402 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java
@@ -37,7 +37,7 @@ import java.util.List;
 @Validated
 @RestController
 @RequestMapping("flink/history")
-public class ApplicationHistoryController {
+public class FlinkApplicationHistoryController {
 
     @Autowired
     private FlinkApplicationInfoService applicationInfoService;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java
similarity index 95%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java
index 78847211f..2817c2ecc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java
@@ -21,7 +21,7 @@ import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.annotation.Permission;
 import org.apache.streampark.console.core.bean.FlinkCatalogParams;
-import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.service.FlinkCatalogService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -40,10 +40,10 @@ import java.io.IOException;
 @Validated
 @RestController
 @RequestMapping("flink/catalog")
-public class CatalogController {
+public class FlinkCatalogController {
 
     @Autowired
-    CatalogService catalogService;
+    FlinkCatalogService catalogService;
 
     @Permission(team = "#catalog.teamId")
     @PostMapping("create")
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
index 2b586a1a6..d5504d5ac 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -21,8 +21,8 @@ import 
org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.enums.UserTypeEnum;
-import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ProxyService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.system.entity.Member;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
index 260a55d53..dfaadf8c6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
@@ -20,7 +20,7 @@ package org.apache.streampark.console.core.controller;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.annotation.Permission;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
index bc3043699..2e181883d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
@@ -27,11 +27,11 @@ import 
org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
 import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.enums.AppExistsStateEnum;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
 import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
index cc58bb216..7d76ca356 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
@@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.entity.SparkApplicationConfig;
-import 
org.apache.streampark.console.core.service.SparkApplicationConfigService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
index c7ad83062..fa68395ea 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
@@ -22,7 +22,7 @@ import 
org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.enums.UserTypeEnum;
 import org.apache.streampark.console.core.service.ProxyService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.system.entity.Member;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
similarity index 79%
copy from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index b379bd355..6239fa2a8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -17,33 +17,30 @@
 
 package org.apache.streampark.console.core.entity;
 
-import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
-
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.Serializable;
 import java.util.Date;
 
 @Data
-@TableName("t_flink_effective")
+@TableName("t_app")
 @Slf4j
-public class Effective {
+public class Application implements Serializable {
 
     @TableId(type = IdType.AUTO)
     private Long id;
-
-    private Long appId;
     /**
-     * 1) config <br>
-     * 2) flink Sql<br>
+     * 1: flink job
+     * 2: spark job
      */
-    private Integer targetType;
+    private Integer jobType;
 
-    private Long targetId;
     private Date createTime;
 
-    private transient EffectiveTypeEnum effectiveType;
+    private Date modifyTime;
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
index a6e1ed92a..574a320f1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
@@ -26,22 +26,34 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.Date;
 
 @Data
-@TableName("t_flink_log")
+@TableName("t_app_log")
 @Slf4j
 public class ApplicationLog {
 
     @TableId(type = IdType.AUTO)
     private Long id;
+
     /** appId */
     private Long appId;
-    /** applicationId */
-    private String yarnAppId;
+
+    /**
+     * 1: flink
+     * 2: spark
+     */
+    private Integer jobType;
+
+    /** clusterId */
+    private String clusterId;
+
     /** The address of the jobmanager, that is, the direct access address of 
the Flink web UI */
-    private String jobManagerUrl;
+    private String trackingUrl;
+
     /** start status */
     private Boolean success;
+
     /** option name */
     private Integer optionName;
+
     /** option time */
     private Date optionTime;
     /** exception at the start */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index 12f40f408..382f71fef 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -66,7 +66,7 @@ import java.util.Optional;
 @Slf4j
 public class FlinkApplication implements Serializable {
 
-    @TableId(type = IdType.AUTO)
+    @TableId(type = IdType.INPUT)
     private Long id;
 
     private Long teamId;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java
index b379bd355..bfb806441 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java
@@ -30,7 +30,7 @@ import java.util.Date;
 @Data
 @TableName("t_flink_effective")
 @Slf4j
-public class Effective {
+public class FlinkEffective {
 
     @TableId(type = IdType.AUTO)
     private Long id;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 8cc8abccf..da6421401 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -59,7 +59,7 @@ import java.util.Objects;
 @Slf4j
 public class SparkApplication extends BaseEntity {
 
-    @TableId(type = IdType.AUTO)
+    @TableId(type = IdType.INPUT)
     private Long id;
 
     private Long teamId;
@@ -336,14 +336,14 @@ public class SparkApplication extends BaseEntity {
 
     @JsonIgnore
     public String getLocalAppHome() {
-        String path = String.format("%s/%s", 
Workspace.local().SPARK_APP_WORKSPACE(), id.toString());
+        String path = String.format("%s/%s", 
Workspace.local().APP_WORKSPACE(), id.toString());
         log.info("local appHome:{}", path);
         return path;
     }
 
     @JsonIgnore
     public String getRemoteAppHome() {
-        String path = String.format("%s/%s", 
Workspace.remote().SPARK_APP_WORKSPACE(), id.toString());
+        String path = String.format("%s/%s", 
Workspace.remote().APP_WORKSPACE(), id.toString());
         log.info("remote appHome:{}", path);
         return path;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
index 3b15ce6cb..2033e2471 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
@@ -27,10 +27,10 @@ import java.util.Arrays;
 public enum EngineTypeEnum {
 
     /** Apache Flink: activated by default */
-    FLINK(0),
+    FLINK(1),
 
     /** Apache Spark */
-    SPARK(1);
+    SPARK(2);
 
     @EnumValue
     private final int code;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
similarity index 87%
copy from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index b3815425f..48820fe3b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -17,9 +17,10 @@
 
 package org.apache.streampark.console.core.mapper;
 
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.Application;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
-public interface EffectiveMapper extends BaseMapper<Effective> {
+public interface ApplicationMapper extends BaseMapper<Application> {
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java
similarity index 86%
copy from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java
index b3815425f..b16e6fa83 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java
@@ -17,9 +17,9 @@
 
 package org.apache.streampark.console.core.mapper;
 
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEffective;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
-public interface EffectiveMapper extends BaseMapper<Effective> {
+public interface FlinkEffectiveMapper extends BaseMapper<FlinkEffective> {
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c2d026bad..4e5ba38ac 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -147,7 +147,6 @@ public class EnvInitializer implements ApplicationRunner {
         Arrays.asList(
             workspace.APP_UPLOADS(),
             workspace.APP_WORKSPACE(),
-            workspace.SPARK_APP_WORKSPACE(),
             workspace.APP_BACKUPS(),
             workspace.APP_SAVEPOINTS(),
             workspace.APP_PYTHON(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
index afd50f1ad..0e04a1ab9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -24,10 +24,10 @@ import 
org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.FlinkSql;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
+import 
org.apache.streampark.console.core.service.application.AppBuildPipeService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
similarity index 94%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
index c19d467b4..160ae952f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
@@ -24,8 +24,8 @@ import org.apache.streampark.console.core.entity.FlinkCatalog;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 
-/** This interface is use to managed catalog */
-public interface CatalogService extends IService<FlinkCatalog> {
+/** This interface is used to managed catalog */
+public interface FlinkCatalogService extends IService<FlinkCatalog> {
 
     /**
      * Create Catalog
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java
similarity index 85%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java
index 46d87db3a..1cd38cb1c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java
@@ -17,16 +17,16 @@
 
 package org.apache.streampark.console.core.service;
 
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEffective;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 
 import com.baomidou.mybatisplus.extension.service.IService;
 
-public interface EffectiveService extends IService<Effective> {
+public interface FlinkEffectiveService extends IService<FlinkEffective> {
 
     void remove(Long appId, EffectiveTypeEnum config);
 
-    Effective get(Long appId, EffectiveTypeEnum config);
+    FlinkEffective get(Long appId, EffectiveTypeEnum config);
 
     void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java
similarity index 97%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java
index c73c63ca0..a4cdacc3d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java
similarity index 96%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java
index cb8fb1eac..17a068b3a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.core.entity.ApplicationLog;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java
similarity index 68%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java
index b3815425f..e6404ad20 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.mapper;
+package org.apache.streampark.console.core.service.application;
 
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
 
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.extension.service.IService;
 
-public interface EffectiveMapper extends BaseMapper<Effective> {
+public interface ApplicationService extends IService<Application> {
+
+    Application create(EngineTypeEnum engineTypeEnum);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java
index cafe8376f..29cc0de3c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.InternalException;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java
index 7850b2aa5..a13843a2e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.core.entity.FlinkApplication;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java
similarity index 97%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java
index ee2d2ae64..25ea3cb11 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java
index 1c3f2f92e..5b46fc53d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.InternalException;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java
index b289910d4..7745b97a3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.core.entity.SparkApplication;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java
similarity index 96%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java
index 6edee09ac..7c3687cbc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.core.entity.SparkApplicationLog;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java
index bc17e47b0..3434e67db 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.constants.Constants;
@@ -45,16 +45,16 @@ import 
org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.enums.ResourceTypeEnum;
 import 
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.SettingService;
+import 
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java
similarity index 94%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java
index 3ac1563b4..feef8cc0c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.core.entity.ApplicationLog;
 import org.apache.streampark.console.core.mapper.ApplicationLogMapper;
-import org.apache.streampark.console.core.service.ApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java
new file mode 100644
index 000000000..866ceab92
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.application.impl;
+
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
+import org.apache.streampark.console.core.mapper.ApplicationMapper;
+import 
org.apache.streampark.console.core.service.application.ApplicationService;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Date;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
+public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, 
Application>
+    implements
+        ApplicationService {
+
+    @Override
+    public Application create(EngineTypeEnum engineTypeEnum) {
+        Application application = new Application();
+        application.setJobType(engineTypeEnum.getCode());
+        application.setCreateTime(new Date());
+        application.setModifyTime(new Date());
+        save(application);
+        return application;
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index 451fee22c..582d80316 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -54,11 +54,7 @@ import 
org.apache.streampark.console.core.enums.OperationEnum;
 import org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.mapper.FlinkApplicationMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.DistributedTaskService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
@@ -66,7 +62,11 @@ import 
org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.SavepointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
+import 
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
@@ -272,9 +272,9 @@ public class FlinkApplicationActionServiceImpl extends 
ServiceImpl<FlinkApplicat
         ApplicationLog applicationLog = new ApplicationLog();
         applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
         applicationLog.setAppId(application.getId());
-        applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+        applicationLog.setTrackingUrl(application.getJobManagerUrl());
         applicationLog.setOptionTime(new Date());
-        applicationLog.setYarnAppId(application.getClusterId());
+        applicationLog.setClusterId(application.getClusterId());
         applicationLog.setUserId(ServiceHelper.getUserId());
 
         if (appParam.getRestoreOrTriggerSavepoint()) {
@@ -528,44 +528,44 @@ public class FlinkApplicationActionServiceImpl extends 
ServiceImpl<FlinkApplicat
                                    FlinkApplication appParam,
                                    SubmitResponse response,
                                    ApplicationLog applicationLog,
-                                   FlinkApplication application) {
+                                   FlinkApplication flinkApplication) {
         applicationLog.setSuccess(true);
         if (response.flinkConfig() != null) {
             String jmMemory = 
response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
             if (jmMemory != null) {
-                
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+                
flinkApplication.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
             }
             String tmMemory = 
response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
             if (tmMemory != null) {
-                
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+                
flinkApplication.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
             }
         }
         if (StringUtils.isNoneEmpty(response.jobId())) {
-            application.setJobId(response.jobId());
+            flinkApplication.setJobId(response.jobId());
         }
 
-        if (FlinkDeployMode.isYarnMode(application.getDeployMode())) {
-            application.setClusterId(response.clusterId());
-            applicationLog.setYarnAppId(response.clusterId());
+        if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode())) {
+            flinkApplication.setClusterId(response.clusterId());
+            applicationLog.setClusterId(response.clusterId());
         }
 
         if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
-            application.setJobManagerUrl(response.jobManagerUrl());
-            applicationLog.setJobManagerUrl(response.jobManagerUrl());
+            flinkApplication.setJobManagerUrl(response.jobManagerUrl());
+            applicationLog.setTrackingUrl(response.jobManagerUrl());
         }
-        applicationLog.setYarnAppId(response.clusterId());
-        application.setStartTime(new Date());
-        application.setEndTime(null);
+        applicationLog.setClusterId(response.clusterId());
+        flinkApplication.setStartTime(new Date());
+        flinkApplication.setEndTime(null);
 
         // if start completed, will be added task to tracking queue
-        if (application.isKubernetesModeJob()) {
-            processForK8sApp(application, applicationLog);
+        if (flinkApplication.isKubernetesModeJob()) {
+            processForK8sApp(flinkApplication, applicationLog);
         } else {
             FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionStateEnum.STARTING);
-            FlinkAppHttpWatcher.doWatching(application);
+            FlinkAppHttpWatcher.doWatching(flinkApplication);
         }
         // update app
-        updateById(application);
+        updateById(flinkApplication);
         // save log
         applicationLogService.save(applicationLog);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java
similarity index 96%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java
index fdbc9a794..b3c0982d3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -29,10 +29,10 @@ import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.mapper.FlinkApplicationBackUpMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -62,7 +62,7 @@ public class FlinkApplicationBackUpServiceImpl
     private FlinkApplicationConfigService configService;
 
     @Autowired
-    private EffectiveService effectiveService;
+    private FlinkEffectiveService effectiveService;
 
     @Autowired
     private FlinkSqlService flinkSqlService;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
similarity index 97%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
index e9abadf0a..2f8a02dbc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.Utils;
@@ -27,8 +27,8 @@ import 
org.apache.streampark.console.core.entity.FlinkApplicationConfig;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 import org.apache.streampark.console.core.mapper.FlinkApplicationConfigMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -64,7 +64,7 @@ public class FlinkApplicationConfigServiceImpl
     private ResourceLoader resourceLoader;
 
     @Autowired
-    private EffectiveService effectiveService;
+    private FlinkEffectiveService effectiveService;
 
     @Override
     public synchronized void create(FlinkApplication appParam, Boolean latest) 
{
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
index 974cfeb2c..684b04dd5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.ObjectUtils;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
 import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -36,22 +37,24 @@ import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Resource;
 import org.apache.streampark.console.core.enums.CandidateTypeEnum;
 import org.apache.streampark.console.core.enums.ChangeTypeEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
 import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
 import org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.mapper.FlinkApplicationMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import org.apache.streampark.console.core.service.EffectiveService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.ProjectService;
 import org.apache.streampark.console.core.service.ResourceService;
 import org.apache.streampark.console.core.service.SavepointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import 
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.ApplicationService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -104,6 +107,9 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
     @Autowired
     private ProjectService projectService;
 
+    @Autowired
+    private ApplicationService applicationService;
+
     @Autowired
     private FlinkApplicationBackUpService backUpService;
 
@@ -120,7 +126,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
     private SavepointService savepointService;
 
     @Autowired
-    private EffectiveService effectiveService;
+    private FlinkEffectiveService effectiveService;
 
     @Autowired
     private SettingService settingService;
@@ -348,7 +354,12 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
             
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new 
File(jarPath)));
         }
 
-        if (save(appParam)) {
+        // 1) save application
+        Application application = 
applicationService.create(EngineTypeEnum.FLINK);
+        appParam.setId(application.getId());
+
+        boolean saveSuccess = save(appParam);
+        if (saveSuccess) {
             if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
                 FlinkSql flinkSql = new FlinkSql(appParam);
                 flinkSqlService.create(flinkSql);
@@ -429,6 +440,9 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
         newApp.setTeamId(persist.getTeamId());
         newApp.setDependency(persist.getDependency());
 
+        Application application = 
applicationService.create(EngineTypeEnum.FLINK);
+        newApp.setId(application.getId());
+
         boolean saved = save(newApp);
         if (saved) {
             if (newApp.isFlinkSqlJob()) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java
index 315886161..063a96c0c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.constants.Constants;
@@ -45,12 +45,12 @@ import 
org.apache.streampark.console.core.enums.ResourceTypeEnum;
 import 
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
 import org.apache.streampark.console.core.service.MessageService;
 import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
-import 
org.apache.streampark.console.core.service.SparkApplicationConfigService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.SparkSqlService;
+import 
org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index a6dce8bbb..4812ebbc1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -45,21 +45,21 @@ import 
org.apache.streampark.console.core.enums.SparkOperationEnum;
 import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
 import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
-import 
org.apache.streampark.console.core.service.SparkApplicationConfigService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.SparkSqlService;
 import org.apache.streampark.console.core.service.VariableService;
+import 
org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 import org.apache.streampark.spark.client.SparkClient;
-import org.apache.streampark.spark.client.bean.StopRequest;
-import org.apache.streampark.spark.client.bean.StopResponse;
+import org.apache.streampark.spark.client.bean.CancelRequest;
+import org.apache.streampark.spark.client.bean.CancelResponse;
 import org.apache.streampark.spark.client.bean.SubmitRequest;
 import org.apache.streampark.spark.client.bean.SubmitResponse;
 
@@ -129,9 +129,9 @@ public class SparkApplicationActionServiceImpl
     @Autowired
     private ResourceService resourceService;
 
-    private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap 
= new ConcurrentHashMap<>();
+    private final Map<Long, CompletableFuture<SubmitResponse>> 
startJobFutureMap = new ConcurrentHashMap<>();
 
-    private final Map<Long, CompletableFuture<StopResponse>> stopFutureMap = 
new ConcurrentHashMap<>();
+    private final Map<Long, CompletableFuture<CancelResponse>> 
cancelJobFutureMap = new ConcurrentHashMap<>();
 
     @Override
     public void revoke(Long appId) throws ApplicationException {
@@ -167,8 +167,8 @@ public class SparkApplicationActionServiceImpl
 
     @Override
     public void forcedStop(Long id) {
-        CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(id);
-        CompletableFuture<StopResponse> stopFuture = stopFutureMap.remove(id);
+        CompletableFuture<SubmitResponse> startFuture = 
startJobFutureMap.remove(id);
+        CompletableFuture<CancelResponse> stopFuture = 
cancelJobFutureMap.remove(id);
         SparkApplication application = this.baseMapper.selectApp(id);
         if (startFuture != null) {
             startFuture.cancel(true);
@@ -205,21 +205,21 @@ public class SparkApplicationActionServiceImpl
 
         Map<String, String> stopProper = new HashMap<>();
 
-        StopRequest stopRequest =
-            new StopRequest(
+        CancelRequest stopRequest =
+            new CancelRequest(
                 application.getId(),
                 sparkEnv.getSparkVersion(),
                 SparkDeployMode.of(application.getDeployMode()),
                 stopProper,
                 application.getAppId());
 
-        CompletableFuture<StopResponse> stopFuture =
-            CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest), 
executorService);
+        CompletableFuture<CancelResponse> stopFuture =
+            CompletableFuture.supplyAsync(() -> 
SparkClient.cancel(stopRequest), executorService);
 
-        stopFutureMap.put(application.getId(), stopFuture);
+        cancelJobFutureMap.put(application.getId(), stopFuture);
         stopFuture.whenComplete(
             (cancelResponse, throwable) -> {
-                stopFutureMap.remove(application.getId());
+                cancelJobFutureMap.remove(application.getId());
                 if (throwable != null) {
                     String exception = 
ExceptionUtils.stringifyException(throwable);
                     applicationLog.setException(exception);
@@ -329,11 +329,11 @@ public class SparkApplicationActionServiceImpl
         CompletableFuture<SubmitResponse> future = CompletableFuture
             .supplyAsync(() -> SparkClient.submit(submitRequest), 
executorService);
 
-        startFutureMap.put(application.getId(), future);
+        startJobFutureMap.put(application.getId(), future);
         future.whenComplete(
             (response, throwable) -> {
                 // 1) remove Future
-                startFutureMap.remove(application.getId());
+                startJobFutureMap.remove(application.getId());
 
                 // 2) exception
                 if (throwable != null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackUpServiceImpl.java
similarity index 97%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackUpServiceImpl.java
index 0009738af..50d8923e9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackUpServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.console.base.domain.RestRequest;
@@ -29,10 +29,10 @@ import org.apache.streampark.console.core.entity.SparkSql;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationBackUpMapper;
-import 
org.apache.streampark.console.core.service.SparkApplicationBackUpService;
-import 
org.apache.streampark.console.core.service.SparkApplicationConfigService;
 import org.apache.streampark.console.core.service.SparkEffectiveService;
 import org.apache.streampark.console.core.service.SparkSqlService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -143,7 +143,7 @@ public class SparkApplicationBackUpServiceImpl
         if (!backUpPages.getRecords().isEmpty()) {
             SparkApplicationBackUp backup = backUpPages.getRecords().get(0);
             String path = backup.getPath();
-            appParam.getFsOperator().move(path, 
appParam.getWorkspace().SPARK_APP_WORKSPACE());
+            appParam.getFsOperator().move(path, 
appParam.getWorkspace().APP_WORKSPACE());
             super.removeById(backup.getId());
         }
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationConfigServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java
index c8dd9bd8c..3d00aa68c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.Utils;
@@ -27,8 +27,8 @@ import 
org.apache.streampark.console.core.entity.SparkApplicationConfig;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationConfigMapper;
-import 
org.apache.streampark.console.core.service.SparkApplicationConfigService;
 import org.apache.streampark.console.core.service.SparkEffectiveService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java
similarity index 94%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java
index cb6e338af..34ce31cd3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.core.entity.SparkApplicationLog;
 import org.apache.streampark.console.core.mapper.SparkApplicationLogMapper;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index 112ed1cf5..8f3554543 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -28,26 +28,28 @@ import 
org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.Resource;
 import org.apache.streampark.console.core.entity.SparkApplication;
 import org.apache.streampark.console.core.entity.SparkApplicationConfig;
 import org.apache.streampark.console.core.entity.SparkSql;
 import org.apache.streampark.console.core.enums.CandidateTypeEnum;
 import org.apache.streampark.console.core.enums.ChangeTypeEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
 import org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.enums.ReleaseStateEnum;
 import org.apache.streampark.console.core.enums.SparkAppStateEnum;
 import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ProjectService;
 import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SettingService;
-import 
org.apache.streampark.console.core.service.SparkApplicationBackUpService;
-import 
org.apache.streampark.console.core.service.SparkApplicationConfigService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import org.apache.streampark.console.core.service.SparkEffectiveService;
 import org.apache.streampark.console.core.service.SparkSqlService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import 
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import 
org.apache.streampark.console.core.service.application.ApplicationService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
@@ -95,6 +97,9 @@ public class SparkApplicationManageServiceImpl
     @Autowired
     private ProjectService projectService;
 
+    @Autowired
+    private ApplicationService applicationService;
+
     @Autowired
     private SparkApplicationBackUpService backUpService;
 
@@ -110,9 +115,6 @@ public class SparkApplicationManageServiceImpl
     @Autowired
     private SparkEffectiveService effectiveService;
 
-    @Autowired
-    private SettingService settingService;
-
     @Autowired
     private AppBuildPipeService appBuildPipeService;
 
@@ -194,9 +196,9 @@ public class SparkApplicationManageServiceImpl
         try {
             application
                 .getFsOperator()
-                
.delete(application.getWorkspace().SPARK_APP_WORKSPACE().concat("/").concat(appId.toString()));
+                
.delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
             // try to delete yarn-application, and leave no trouble.
-            String path = 
Workspace.of(StorageType.HDFS).SPARK_APP_WORKSPACE().concat("/").concat(appId.toString());
+            String path = 
Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString());
             if (HdfsOperator.exists(path)) {
                 HdfsOperator.delete(path);
             }
@@ -286,7 +288,13 @@ public class SparkApplicationManageServiceImpl
             
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new 
File(jarPath)));
         }
 
-        if (save(appParam)) {
+        // 1) save application
+        Application application = 
applicationService.create(EngineTypeEnum.SPARK);
+        appParam.setId(application.getId());
+
+        boolean saveSuccess = save(appParam);
+
+        if (saveSuccess) {
             if (appParam.isSparkSqlJob()) {
                 SparkSql sparkSql = new SparkSql(appParam);
                 sparkSqlService.create(sparkSql);
@@ -352,6 +360,9 @@ public class SparkApplicationManageServiceImpl
         newApp.setModifyTime(newApp.getCreateTime());
         newApp.setTags(oldApp.getTags());
 
+        Application application = 
applicationService.create(EngineTypeEnum.SPARK);
+        newApp.setId(application.getId());
+
         boolean saved = save(newApp);
         if (saved) {
             if (newApp.isSparkSqlJob()) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
index 9cc1e12c6..b173cb276 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
@@ -24,7 +24,7 @@ import 
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.core.bean.FlinkCatalogParams;
 import org.apache.streampark.console.core.entity.FlinkCatalog;
 import org.apache.streampark.console.core.mapper.FlinkCatalogMapper;
-import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.service.FlinkCatalogService;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -46,7 +46,7 @@ import java.util.regex.Pattern;
 @Transactional(propagation = Propagation.SUPPORTS, rollbackFor = 
Exception.class)
 public class FlinkCatalogServiceImpl extends ServiceImpl<FlinkCatalogMapper, 
FlinkCatalog>
     implements
-        CatalogService {
+        FlinkCatalogService {
 
     private static final String CATALOG_REGEX = 
"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$";
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEffectiveServiceImpl.java
similarity index 59%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEffectiveServiceImpl.java
index d8b5f4352..bdeb8c0e2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEffectiveServiceImpl.java
@@ -17,10 +17,10 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEffective;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
-import org.apache.streampark.console.core.mapper.EffectiveMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
+import org.apache.streampark.console.core.mapper.FlinkEffectiveMapper;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -35,34 +35,34 @@ import java.util.Date;
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
-public class EffectiveServiceImpl extends ServiceImpl<EffectiveMapper, 
Effective>
+public class FlinkEffectiveServiceImpl extends 
ServiceImpl<FlinkEffectiveMapper, FlinkEffective>
     implements
-        EffectiveService {
+        FlinkEffectiveService {
 
     @Override
     public void remove(Long appId, EffectiveTypeEnum effectiveTypeEnum) {
-        LambdaQueryWrapper<Effective> queryWrapper = new 
LambdaQueryWrapper<Effective>()
-            .eq(Effective::getAppId, appId)
-            .eq(Effective::getTargetType, effectiveTypeEnum.getType());
+        LambdaQueryWrapper<FlinkEffective> queryWrapper = new 
LambdaQueryWrapper<FlinkEffective>()
+            .eq(FlinkEffective::getAppId, appId)
+            .eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType());
         baseMapper.delete(queryWrapper);
     }
 
     @Override
-    public Effective get(Long appId, EffectiveTypeEnum effectiveTypeEnum) {
-        LambdaQueryWrapper<Effective> queryWrapper = new 
LambdaQueryWrapper<Effective>()
-            .eq(Effective::getAppId, appId)
-            .eq(Effective::getTargetType, effectiveTypeEnum.getType());
+    public FlinkEffective get(Long appId, EffectiveTypeEnum effectiveTypeEnum) 
{
+        LambdaQueryWrapper<FlinkEffective> queryWrapper = new 
LambdaQueryWrapper<FlinkEffective>()
+            .eq(FlinkEffective::getAppId, appId)
+            .eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType());
         return this.getOne(queryWrapper);
     }
 
     @Override
     public void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id) {
-        LambdaQueryWrapper<Effective> queryWrapper = new 
LambdaQueryWrapper<Effective>()
-            .eq(Effective::getAppId, appId)
-            .eq(Effective::getTargetType, type.getType());
+        LambdaQueryWrapper<FlinkEffective> queryWrapper = new 
LambdaQueryWrapper<FlinkEffective>()
+            .eq(FlinkEffective::getAppId, appId)
+            .eq(FlinkEffective::getTargetType, type.getType());
         long count = count(queryWrapper);
         if (count == 0) {
-            Effective effective = new Effective();
+            FlinkEffective effective = new FlinkEffective();
             effective.setAppId(appId);
             effective.setTargetType(type.getType());
             effective.setTargetId(id);
@@ -70,16 +70,17 @@ public class EffectiveServiceImpl extends 
ServiceImpl<EffectiveMapper, Effective
             save(effective);
         } else {
             update(
-                new LambdaUpdateWrapper<Effective>()
-                    .eq(Effective::getAppId, appId)
-                    .eq(Effective::getTargetType, type.getType())
-                    .set(Effective::getTargetId, id));
+                new LambdaUpdateWrapper<FlinkEffective>()
+                    .eq(FlinkEffective::getAppId, appId)
+                    .eq(FlinkEffective::getTargetType, type.getType())
+                    .set(FlinkEffective::getTargetId, id));
         }
     }
 
     @Override
     public void removeByAppId(Long appId) {
-        LambdaQueryWrapper<Effective> queryWrapper = new 
LambdaQueryWrapper<Effective>().eq(Effective::getAppId, appId);
+        LambdaQueryWrapper<FlinkEffective> queryWrapper =
+            new 
LambdaQueryWrapper<FlinkEffective>().eq(FlinkEffective::getAppId, appId);
         this.remove(queryWrapper);
     }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index e075e5da4..4191d316d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -35,11 +35,11 @@ import 
org.apache.streampark.console.core.enums.CheckPointTypeEnum;
 import org.apache.streampark.console.core.enums.OperationEnum;
 import org.apache.streampark.console.core.enums.OptionStateEnum;
 import org.apache.streampark.console.core.mapper.FlinkSavepointMapper;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.SavepointService;
+import 
org.apache.streampark.console.core.service.application.ApplicationLogService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 import org.apache.streampark.console.core.util.ServiceHelper;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -191,9 +191,9 @@ public class FlinkSavepointServiceImpl extends 
ServiceImpl<FlinkSavepointMapper,
         ApplicationLog applicationLog = new ApplicationLog();
         applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
         applicationLog.setAppId(application.getId());
-        applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+        applicationLog.setTrackingUrl(application.getJobManagerUrl());
         applicationLog.setOptionTime(new Date());
-        applicationLog.setYarnAppId(application.getClusterId());
+        applicationLog.setClusterId(application.getClusterId());
         applicationLog.setUserId(ServiceHelper.getUserId());
         return applicationLog;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index c95cb3b7c..45048e5e2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -28,10 +28,10 @@ import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.enums.CandidateTypeEnum;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 import org.apache.streampark.console.core.mapper.FlinkSqlMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
 import org.apache.streampark.flink.core.FlinkSqlValidationResult;
 import org.apache.streampark.flink.proxy.FlinkShimsProxy;
 
@@ -58,7 +58,7 @@ public class FlinkSqlServiceImpl extends 
ServiceImpl<FlinkSqlMapper, FlinkSql>
         FlinkSqlService {
 
     @Autowired
-    private EffectiveService effectiveService;
+    private FlinkEffectiveService effectiveService;
 
     @Autowired
     private FlinkApplicationBackUpService backUpService;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index 590b373c2..4b492fc08 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -131,7 +131,7 @@ public class ProxyServiceImpl implements ProxyService {
     @Override
     public ResponseEntity<?> proxyYarn(HttpServletRequest request, 
ApplicationLog log) throws Exception {
         ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
-        String yarnId = log.getYarnAppId();
+        String yarnId = log.getClusterId();
         if (StringUtils.isBlank(yarnId)) {
             return builder.body("The yarn application id is null.");
         }
@@ -158,7 +158,7 @@ public class ProxyServiceImpl implements ProxyService {
     public ResponseEntity<?> proxyHistory(HttpServletRequest request, 
ApplicationLog log) throws Exception {
         ResponseEntity.BodyBuilder builder = 
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
 
-        String url = log.getJobManagerUrl();
+        String url = log.getTrackingUrl();
         if (StringUtils.isBlank(url)) {
             return builder.body("The jobManager url is null.");
         }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
index 046826880..cf157007f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
@@ -28,10 +28,10 @@ import org.apache.streampark.console.core.entity.SparkSql;
 import org.apache.streampark.console.core.enums.CandidateTypeEnum;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
 import org.apache.streampark.console.core.mapper.SparkSqlMapper;
-import 
org.apache.streampark.console.core.service.SparkApplicationBackUpService;
 import org.apache.streampark.console.core.service.SparkEffectiveService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.SparkSqlService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationBackUpService;
 import org.apache.streampark.spark.client.proxy.SparkShimsProxy;
 import org.apache.streampark.spark.core.util.SparkSqlValidationResult;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
index 22fa38e0f..b75848b66 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
-import 
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index 8e7c15476..b2c1ed083 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -28,11 +28,11 @@ import 
org.apache.streampark.console.core.enums.StopFromEnum;
 import org.apache.streampark.console.core.metrics.spark.Job;
 import 
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
 import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
 import org.apache.streampark.console.core.service.SparkEnvService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import 
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
 import 
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
 import org.apache.streampark.console.core.utils.AlertTemplateUtils;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index f8459eee8..3c082209a 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -21,6 +21,12 @@
 insert into `t_team` values (100000, 'default', 'The default team', now(), 
now());
 insert into `t_team` values (100001, 'test', 'The test team', now(), now());
 
+-- ----------------------------
+-- Records of flink-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`)
+VALUES (100000, 1, now(), now());
+
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
@@ -45,6 +51,13 @@ insert into `t_flink_project` values (100000, 100000, 
'streampark-quickstart', '
 -- ----------------------------
 insert into `t_flink_sql` values (100000, 100000, 
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
 null, null, 1, 1, now());
 
+
+-- ----------------------------
+-- Records of spark-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`)
+VALUES (100001, 2, now(), now());
+
 -- ----------------------------
 -- Records of t_spark_app
 -- ----------------------------
@@ -52,17 +65,17 @@ insert into `t_spark_app` (
      `id`, `team_id`, `job_type`, `app_type`, `app_name`, `deploy_mode`, 
`resource_from`, `main_class`,
      `yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`, 
`option_state`, `user_id`,
      `description`, `tracking`, `release`, `build`, `create_time`, 
`modify_time`, `tags`)
-values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+values (100001, 100000, 2, 4, 'Spark SQL Demo', 2, 2, 
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0, 
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
 
 -- ----------------------------
 -- Records of t_spark_effective
 -- ----------------------------
-insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+insert into `t_spark_effective` values (100000, 100001, 4, 100000, now());
 
 -- ----------------------------
 -- Records of t_spark_sql
 -- ----------------------------
-insert into `t_spark_sql` values (100000, 100000, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
+insert into `t_spark_sql` values (100000, 100001, 
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
 null, null, 1, 1, now());
 
 -- ----------------------------
 -- Records of t_menu
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index b427e28d1..dd2de3986 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -16,19 +16,14 @@
  */
 
 -- ----------------------------
--- Table structure for t_flink_app_backup
+-- Table structure for t_app
 -- ----------------------------
-create table if not exists `t_flink_app_backup` (
-  `id` bigint generated by default as identity not null,
-  `app_id` bigint default null,
-  `sql_id` bigint default null,
-  `config_id` bigint default null,
-  `version` int default null,
-  `path` varchar(128)  default null,
-  `description` varchar(255) default null,
-  `create_time` datetime default null comment 'create time',
-  `modify_time` datetime default null comment 'modify time',
-  primary key(`id`)
+create table if not exists `t_app` (
+`id` bigint generated by default as identity not null,
+`job_type` tinyint default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
 );
 
 -- ----------------------------
@@ -99,6 +94,24 @@ create table if not exists `t_flink_app` (
   primary key(`id`)
 );
 
+
+-- ----------------------------
+-- Table structure for t_flink_app_backup
+-- ----------------------------
+create table if not exists `t_flink_app_backup` (
+`id` bigint generated by default as identity not null,
+`app_id` bigint default null,
+`sql_id` bigint default null,
+`config_id` bigint default null,
+`version` int default null,
+`path` varchar(128)  default null,
+`description` varchar(255) default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
+);
+
+
 -- ----------------------------
 -- Table structure for t_flink_config
 -- ----------------------------
@@ -145,13 +158,14 @@ create table if not exists `t_flink_env` (
 
 
 -- ----------------------------
--- Table structure for t_flink_log
+-- Table structure for t_app_log
 -- ----------------------------
-create table if not exists `t_flink_log` (
+create table if not exists `t_app_log` (
   `id` bigint generated by default as identity not null,
   `app_id` bigint default null,
-  `yarn_app_id` varchar(64)  default null,
-  `job_manager_url` varchar(255)  default null,
+  `job_type` tinyint default null,
+  `cluster_id` varchar(64)  default null,
+  `tracking_url` varchar(255)  default null,
   `success` tinyint default null,
   `exception` text ,
   `option_time` datetime default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
similarity index 92%
copy from 
streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
copy to 
streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 59a6ac14a..85569bf7f 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -16,6 +16,6 @@
   ~ limitations under the License.
   -->
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
-<mapper namespace="org.apache.streampark.console.core.mapper.EffectiveMapper">
+<mapper 
namespace="org.apache.streampark.console.core.mapper.ApplicationMapper">
 
 </mapper>
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
index 59a6ac14a..2dae562b2 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
@@ -16,6 +16,6 @@
   ~ limitations under the License.
   -->
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
-<mapper namespace="org.apache.streampark.console.core.mapper.EffectiveMapper">
+<mapper 
namespace="org.apache.streampark.console.core.mapper.FlinkEffectiveMapper">
 
 </mapper>
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java
similarity index 98%
rename from 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
rename to 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java
index d199f4de9..5724ecee9 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java
@@ -37,7 +37,7 @@ import java.util.Date;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */
-class ApplicationManageServiceTest extends SpringUnitTestBase {
+class FlinkApplicationManageServiceTest extends SpringUnitTestBase {
 
     @Autowired
     private FlinkApplicationManageService applicationManageService;
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
similarity index 97%
rename from 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
rename to 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
index 23decd3bd..b6089adf9 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
@@ -32,10 +32,10 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** CatalogService Tests */
-public class CatalogServiceTest extends SpringUnitTestBase {
+public class FlinkCatalogServiceTest extends SpringUnitTestBase {
 
     @Autowired
-    private CatalogService catalogService;
+    private FlinkCatalogService catalogService;
 
     @AfterEach
     void cleanTestRecordsInDatabase() {
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
similarity index 96%
rename from 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
rename to 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
index 833faa4ab..66f43f940 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
@@ -22,12 +22,13 @@ import org.apache.streampark.common.enums.FlinkDeployMode;
 import org.apache.streampark.common.enums.FlinkJobType;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.console.SpringUnitTestBase;
-import org.apache.streampark.console.core.entity.Effective;
 import org.apache.streampark.console.core.entity.FlinkApplication;
 import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
+import org.apache.streampark.console.core.entity.FlinkEffective;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
 import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
+import 
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
 import 
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
 import 
org.apache.streampark.console.core.service.impl.FlinkSavepointServiceImpl;
 
@@ -47,7 +48,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * FlinkSavepointServiceImpl} of {@link
  * SavepointService}.
  */
-class SavepointServiceTest extends SpringUnitTestBase {
+class FlinkSavepointServiceTest extends SpringUnitTestBase {
 
     @Autowired
     private SavepointService savepointService;
@@ -56,7 +57,7 @@ class SavepointServiceTest extends SpringUnitTestBase {
     private FlinkApplicationConfigService configService;
 
     @Autowired
-    private EffectiveService effectiveService;
+    private FlinkEffectiveService effectiveService;
 
     @Autowired
     private FlinkEnvService flinkEnvService;
@@ -137,7 +138,7 @@ class SavepointServiceTest extends SpringUnitTestBase {
                     + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(),
                         "3min")));
         configService.updateById(appCfg);
-        Effective effective = new Effective();
+        FlinkEffective effective = new FlinkEffective();
         effective.setTargetId(appCfg.getId());
         effective.setAppId(appId);
         effective.setTargetType(EffectiveTypeEnum.CONFIG.getType());
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
index 0abc5f4c9..32d09b856 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
@@ -32,15 +32,15 @@ object SparkClient extends Logger {
   private[this] val SUBMIT_REQUEST =
     "org.apache.streampark.spark.client.bean.SubmitRequest" -> "submit"
 
-  private[this] val STOP_REQUEST =
-    "org.apache.streampark.spark.client.bean.StopRequest" -> "stop"
+  private[this] val CANCEL_REQUEST =
+    "org.apache.streampark.spark.client.bean.CancelRequest" -> "cancel"
 
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     proxy[SubmitResponse](submitRequest, submitRequest.sparkVersion, 
SUBMIT_REQUEST)
   }
 
-  def stop(stopRequest: StopRequest): StopResponse = {
-    proxy[StopResponse](stopRequest, stopRequest.sparkVersion, STOP_REQUEST)
+  def cancel(stopRequest: CancelRequest): CancelResponse = {
+    proxy[CancelResponse](stopRequest, stopRequest.sparkVersion, 
CANCEL_REQUEST)
   }
 
   private[this] def proxy[T: ClassTag](
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
similarity index 97%
rename from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
rename to 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
index 6440d9f43..22a2b4e08 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
@@ -23,7 +23,7 @@ import org.apache.streampark.common.util.Implicits.JavaMap
 
 import javax.annotation.Nullable
 
-case class StopRequest(
+case class CancelRequest(
     id: Long,
     sparkVersion: SparkVersion,
     deployMode: SparkDeployMode,
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
similarity index 94%
rename from 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
rename to 
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
index 42b480534..7b22a362f 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.spark.client.bean
 
-case class StopResponse(savePoint: String)
+case class CancelResponse(savePoint: String)
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
index 104dd9ff6..d68f93596 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
@@ -39,9 +39,9 @@ object SparkClientEndpoint {
     }
   }
 
-  def stop(stopRequest: StopRequest): StopResponse = {
+  def cancel(stopRequest: CancelRequest): CancelResponse = {
     clients.get(stopRequest.deployMode) match {
-      case Some(client) => client.stop(stopRequest)
+      case Some(client) => client.cancel(stopRequest)
       case _ =>
         throw new UnsupportedOperationException(
           s"Unsupported ${stopRequest.deployMode} spark stop.")
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index d26ea8098..007b80d06 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.spark.client.impl
 
-import 
org.apache.streampark.common.conf.ConfigKeys.{KEY_SPARK_YARN_AM_NODE_LABEL, 
KEY_SPARK_YARN_EXECUTOR_NODE_LABEL, KEY_SPARK_YARN_QUEUE, 
KEY_SPARK_YARN_QUEUE_LABEL, KEY_SPARK_YARN_QUEUE_NAME}
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums.SparkDeployMode
 import org.apache.streampark.common.util.{HadoopUtils, YarnUtils}
 import org.apache.streampark.common.util.Implicits._
@@ -37,22 +37,22 @@ object YarnClient extends SparkClientTrait {
 
   private lazy val sparkHandles = new ConcurrentHashMap[String, 
SparkAppHandle]()
 
-  override def doStop(stopRequest: StopRequest): StopResponse = {
-    val sparkAppHandle = sparkHandles.remove(stopRequest.appId)
+  override def doCancel(cancelRequest: CancelRequest): CancelResponse = {
+    val sparkAppHandle = sparkHandles.remove(cancelRequest.appId)
     if (sparkAppHandle != null) {
       Try(sparkAppHandle.stop()) match {
         case Success(_) =>
-          logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${stopRequest.appId} is stopped successfully.")
-          StopResponse(null)
+          logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${cancelRequest.appId} is stopped successfully.")
+          CancelResponse(null)
         case Failure(e) =>
           logger.error("[StreamPark][Spark][YarnClient] sparkAppHandle kill 
failed. Try kill by yarn", e)
-          yarnKill(stopRequest.appId)
-          StopResponse(null)
+          yarnKill(cancelRequest.appId)
+          CancelResponse(null)
       }
     } else {
-      logger.warn(s"[StreamPark][Spark][YarnClient] spark job: 
${stopRequest.appId} is not existed. Try kill by yarn")
-      yarnKill(stopRequest.appId)
-      StopResponse(null)
+      logger.warn(s"[StreamPark][Spark][YarnClient] spark job: 
${cancelRequest.appId} is not existed. Try kill by yarn")
+      yarnKill(cancelRequest.appId)
+      CancelResponse(null)
     }
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index b13bdef1d..746de6cd1 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -60,24 +60,24 @@ trait SparkClientTrait extends Logger {
   def setConfig(submitRequest: SubmitRequest): Unit
 
   @throws[Exception]
-  def stop(stopRequest: StopRequest): StopResponse = {
+  def cancel(stopRequest: CancelRequest): CancelResponse = {
     logInfo(
       s"""
-         |----------------------------------------- spark job stop 
----------------------------------
+         |----------------------------------------- spark job cancel 
----------------------------------
          |     userSparkHome     : ${stopRequest.sparkVersion.sparkHome}
          |     sparkVersion      : ${stopRequest.sparkVersion.version}
          |     appId             : ${stopRequest.appId}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)
 
-    doStop(stopRequest)
+    doCancel(stopRequest)
   }
 
   @throws[Exception]
   def doSubmit(submitRequest: SubmitRequest): SubmitResponse
 
   @throws[Exception]
-  def doStop(stopRequest: StopRequest): StopResponse
+  def doCancel(cancelRequest: CancelRequest): CancelResponse
 
   private def prepareConfig(submitRequest: SubmitRequest): Unit = {
     // 1) filter illegal configuration key


Reply via email to