This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new e378ea292 [Improve] flink-app & spark-app id auto-increment
improvements (#4098)
e378ea292 is described below
commit e378ea29221b1ac1b3534db7de6c076a137757b1
Author: benjobs <[email protected]>
AuthorDate: Sat Sep 28 07:26:57 2024 +0800
[Improve] flink-app & spark-app id auto-increment improvements (#4098)
* [Improve] StopRequest class-name rename to CancelRequest
* [Improve] spark & flink app-id auto-increment Improvements
* [Improve] t_flink_log table rename to t_app_log
* [Improve] Effective entity rename to FlinkEffective
---
.../apache/streampark/common/conf/Workspace.scala | 2 -
.../main/assembly/script/schema/mysql-schema.sql | 50 ++++++++++++++--------
.../main/assembly/script/schema/pgsql-schema.sql | 20 ++++-----
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 25 +++++++++--
.../console/core/controller/ConfigController.java | 2 +-
...> FlinkApplicationBuildPipelineController.java} | 4 +-
...roller.java => FlinkApplicationController.java} | 6 +--
...java => FlinkApplicationHistoryController.java} | 2 +-
...Controller.java => FlinkCatalogController.java} | 6 +--
.../console/core/controller/ProxyController.java | 2 +-
.../SparkApplicationBuildPipelineController.java | 2 +-
.../controller/SparkApplicationController.java | 4 +-
.../core/controller/SparkConfigController.java | 2 +-
.../core/controller/SparkProxyController.java | 2 +-
.../entity/{Effective.java => Application.java} | 19 ++++----
.../console/core/entity/ApplicationLog.java | 20 +++++++--
.../console/core/entity/FlinkApplication.java | 2 +-
.../entity/{Effective.java => FlinkEffective.java} | 2 +-
.../console/core/entity/SparkApplication.java | 6 +--
.../console/core/enums/EngineTypeEnum.java | 4 +-
...EffectiveMapper.java => ApplicationMapper.java} | 5 ++-
...ectiveMapper.java => FlinkEffectiveMapper.java} | 4 +-
.../console/core/runner/EnvInitializer.java | 1 -
.../console/core/runner/QuickStartRunner.java | 2 +-
...atalogService.java => FlinkCatalogService.java} | 4 +-
...tiveService.java => FlinkEffectiveService.java} | 6 +--
.../{ => application}/AppBuildPipeService.java | 2 +-
.../{ => application}/ApplicationLogService.java | 2 +-
.../application/ApplicationService.java} | 11 +++--
.../FlinkApplicationBackUpService.java | 2 +-
.../FlinkApplicationConfigService.java | 2 +-
.../SparkAppBuildPipeService.java | 2 +-
.../SparkApplicationBackUpService.java | 2 +-
.../SparkApplicationConfigService.java | 2 +-
.../SparkApplicationLogService.java | 2 +-
.../impl/AppBuildPipeServiceImpl.java | 10 ++---
.../impl/ApplicationLogServiceImpl.java | 4 +-
.../application/impl/ApplicationServiceImpl.java | 49 +++++++++++++++++++++
.../impl/FlinkApplicationActionServiceImpl.java | 44 +++++++++----------
.../impl/FlinkApplicationBackUpServiceImpl.java | 10 ++---
.../impl/FlinkApplicationConfigServiceImpl.java | 8 ++--
.../impl/FlinkApplicationManageServiceImpl.java | 28 +++++++++---
.../impl/SparkAppBuildPipeServiceImpl.java | 8 ++--
.../impl/SparkApplicationActionServiceImpl.java | 34 +++++++--------
.../impl/SparkApplicationBackUpServiceImpl.java | 8 ++--
.../impl/SparkApplicationConfigServiceImpl.java | 4 +-
.../impl/SparkApplicationLogServiceImpl.java | 4 +-
.../impl/SparkApplicationManageServiceImpl.java | 33 +++++++++-----
.../core/service/impl/FlinkCatalogServiceImpl.java | 4 +-
...iceImpl.java => FlinkEffectiveServiceImpl.java} | 43 ++++++++++---------
.../service/impl/FlinkSavepointServiceImpl.java | 8 ++--
.../core/service/impl/FlinkSqlServiceImpl.java | 6 +--
.../core/service/impl/ProxyServiceImpl.java | 4 +-
.../core/service/impl/SparkSqlServiceImpl.java | 2 +-
.../core/task/ApplicationBackUpCleanTask.java | 2 +-
.../console/core/watcher/SparkAppHttpWatcher.java | 2 +-
.../src/main/resources/db/data-h2.sql | 19 ++++++--
.../src/main/resources/db/schema-h2.sql | 46 +++++++++++++-------
.../{EffectiveMapper.xml => ApplicationMapper.xml} | 2 +-
.../main/resources/mapper/core/EffectiveMapper.xml | 2 +-
...java => FlinkApplicationManageServiceTest.java} | 2 +-
...rviceTest.java => FlinkCatalogServiceTest.java} | 4 +-
...iceTest.java => FlinkSavepointServiceTest.java} | 9 ++--
.../streampark/spark/client/SparkClient.scala | 8 ++--
.../{StopRequest.scala => CancelRequest.scala} | 2 +-
.../{StopResponse.scala => CancelResponse.scala} | 2 +-
.../spark/client/SparkClientEndpoint.scala | 4 +-
.../streampark/spark/client/impl/YarnClient.scala | 20 ++++-----
.../spark/client/trait/SparkClientTrait.scala | 8 ++--
69 files changed, 409 insertions(+), 265 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index c8ce1dd95..cc9161f5e 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -109,8 +109,6 @@ case class Workspace(storageType: StorageType) {
lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"
- lazy val SPARK_APP_WORKSPACE = s"$WORKSPACE/spark-workspace"
-
lazy val APP_FLINK = s"$WORKSPACE/flink"
lazy val APP_SPARK = s"$WORKSPACE/spark"
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 09c6620c6..e054d46c7 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -22,27 +22,22 @@ set names utf8mb4;
set foreign_key_checks = 0;
-- ----------------------------
--- table structure for t_flink_app_backup
+-- Table structure for t_app
-- ----------------------------
-drop table if exists `t_flink_app_backup`;
-create table `t_flink_app_backup` (
- `id` bigint not null auto_increment,
- `app_id` bigint default null,
- `sql_id` bigint default null,
- `config_id` bigint default null,
- `version` int default null,
- `path` varchar(128) collate utf8mb4_general_ci default null,
- `description` varchar(255) collate utf8mb4_general_ci default null,
- `create_time` datetime default null comment 'create time',
- primary key (`id`) using btree
-) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+create table if not exists `t_app` (
+`id` bigint not null,
+`job_type` tinyint default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
+);
-- ----------------------------
-- Table structure for t_flink_app
-- ----------------------------
drop table if exists `t_flink_app`;
create table `t_flink_app` (
- `id` bigint not null auto_increment,
+ `id` bigint not null,
`team_id` bigint not null,
`job_type` tinyint default null,
`deploy_mode` tinyint default null,
@@ -110,6 +105,23 @@ create table `t_flink_app` (
) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+-- ----------------------------
+-- table structure for t_flink_app_backup
+-- ----------------------------
+drop table if exists `t_flink_app_backup`;
+create table `t_flink_app_backup` (
+`id` bigint not null,
+`app_id` bigint default null,
+`sql_id` bigint default null,
+`config_id` bigint default null,
+`version` int default null,
+`path` varchar(128) collate utf8mb4_general_ci default null,
+`description` varchar(255) collate utf8mb4_general_ci default null,
+`create_time` datetime default null comment 'create time',
+primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+
+
-- ----------------------------
-- table structure for t_flink_config
-- ----------------------------
@@ -161,14 +173,14 @@ create table `t_flink_env` (
-- ----------------------------
--- table structure for t_flink_log
+-- table structure for t_app_log
-- ----------------------------
-drop table if exists `t_flink_log`;
-create table `t_flink_log` (
+drop table if exists `t_app_log`;
+create table `t_app_log` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
- `yarn_app_id` varchar(64) collate utf8mb4_general_ci default null,
- `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
+ `cluster_id` varchar(64) collate utf8mb4_general_ci default null,
+ `tracking_url` varchar(255) collate utf8mb4_general_ci default null,
`success` tinyint default null,
`exception` text collate utf8mb4_general_ci,
`option_time` datetime default null,
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 6b2df02a1..dc1bf538a 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -39,7 +39,7 @@ drop table if exists "public"."t_app_build_pipe";
drop table if exists "public"."t_flink_app_backup";
drop table if exists "public"."t_alert_config";
drop table if exists "public"."t_access_token";
-drop table if exists "public"."t_flink_log";
+drop table if exists "public"."t_app_log";
drop table if exists "public"."t_team";
drop table if exists "public"."t_variable";
drop table if exists "public"."t_external_link";
@@ -66,7 +66,7 @@ drop sequence if exists
"public"."streampark_t_distributed_task_id_seq";
drop sequence if exists "public"."streampark_t_flink_app_backup_id_seq";
drop sequence if exists "public"."streampark_t_alert_config_id_seq";
drop sequence if exists "public"."streampark_t_access_token_id_seq";
-drop sequence if exists "public"."streampark_t_flink_log_id_seq";
+drop sequence if exists "public"."streampark_t_app_log_id_seq";
drop sequence if exists "public"."streampark_t_team_id_seq";
drop sequence if exists "public"."streampark_t_variable_id_seq";
drop sequence if exists "public"."streampark_t_external_link_id_seq";
@@ -192,7 +192,7 @@ create sequence "public"."streampark_t_flink_app_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue
9223372036854775807;
create table "public"."t_flink_app" (
- "id" int8 not null default
nextval('streampark_t_flink_app_id_seq'::regclass),
+ "id" int8 not null,
"team_id" int8,
"job_type" int2,
"deploy_mode" int2,
@@ -403,16 +403,16 @@ create index "un_env_name" on "public"."t_flink_env"
using btree (
-- ----------------------------
--- table structure for t_flink_log
+-- table structure for t_app_log
-- ----------------------------
-create sequence "public"."streampark_t_flink_log_id_seq"
+create sequence "public"."streampark_t_app_log_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue
9223372036854775807;
-create table "public"."t_flink_log" (
- "id" int8 not null default
nextval('streampark_t_flink_log_id_seq'::regclass),
+create table "public"."t_app_log" (
+ "id" int8 not null default nextval('streampark_t_app_log_id_seq'::regclass),
"app_id" int8,
- "yarn_app_id" varchar(64) collate "pg_catalog"."default",
- "job_manager_url" varchar(255) collate "pg_catalog"."default",
+ "cluster_id" varchar(64) collate "pg_catalog"."default",
+ "tracking_url" varchar(255) collate "pg_catalog"."default",
"success" boolean,
"exception" text collate "pg_catalog"."default",
"option_time" timestamp(6),
@@ -420,7 +420,7 @@ create table "public"."t_flink_log" (
"user_id" int8
)
;
-alter table "public"."t_flink_log" add constraint "t_flink_log_pkey" primary
key ("id");
+alter table "public"."t_app_log" add constraint "t_app_log_pkey" primary key
("id");
-- ----------------------------
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 6ba50d281..997146f81 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -20,14 +20,33 @@ use streampark;
set names utf8mb4;
set foreign_key_checks = 0;
-alter table t_app_backup rename to t_flink_app_backup;
+
+-- ----------------------------
+-- Table structure for t_app
+-- ----------------------------
+create table if not exists `t_app` (
+`id` bigint not null,
+`job_type` tinyint default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
+);
+
alter table `t_flink_app`
- add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
+ modify column `id` not null;
+add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
-- modify_time change with duration #3188
modify column `modify_time` datetime not null default current_timestamp
comment 'modify time';
-alter table `t_flink_log`
+alter table t_app_backup rename to t_flink_app_backup;
+
+alter table t_flink_log rename to t_app_log;
+
+alter table `t_app_log`
+ change column `yarn_app_id` `cluster_id` varchar(64) default null,
+ change column `job_manager_url` `tracking_url` varchar(255) default null,
+ add column `job_type` tinyint default null,
add column `user_id` bigint default null comment 'operator user id';
alter table `t_flink_project`
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
index dd63d21aa..7622e92f2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ConfigController.java
@@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java
similarity index 95%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java
index ada06d148..211b9c3aa 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationBuildPipelineController.java
@@ -21,7 +21,7 @@ import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.AppBuildPipeService;
import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
import org.apache.streampark.flink.packer.pipeline.PipelineTypeEnum;
@@ -42,7 +42,7 @@ import java.util.Optional;
@Validated
@RestController
@RequestMapping("flink/pipe")
-public class ApplicationBuildPipelineController {
+public class FlinkApplicationBuildPipelineController {
@Autowired
private AppBuildPipeService appBuildPipeService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java
index 3d9252731..8474cf698 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationController.java
@@ -28,10 +28,10 @@ import
org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.ResourceService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
@@ -55,7 +55,7 @@ import java.util.Map;
@Validated
@RestController
@RequestMapping("flink/app")
-public class ApplicationController {
+public class FlinkApplicationController {
@Autowired
private FlinkApplicationManageService applicationManageService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java
index 04cea4eda..b4e93c402 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkApplicationHistoryController.java
@@ -37,7 +37,7 @@ import java.util.List;
@Validated
@RestController
@RequestMapping("flink/history")
-public class ApplicationHistoryController {
+public class FlinkApplicationHistoryController {
@Autowired
private FlinkApplicationInfoService applicationInfoService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java
similarity index 95%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java
index 78847211f..2817c2ecc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java
@@ -21,7 +21,7 @@ import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.bean.FlinkCatalogParams;
-import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.service.FlinkCatalogService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -40,10 +40,10 @@ import java.io.IOException;
@Validated
@RestController
@RequestMapping("flink/catalog")
-public class CatalogController {
+public class FlinkCatalogController {
@Autowired
- CatalogService catalogService;
+ FlinkCatalogService catalogService;
@Permission(team = "#catalog.teamId")
@PostMapping("create")
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
index 2b586a1a6..d5504d5ac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -21,8 +21,8 @@ import
org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.enums.UserTypeEnum;
-import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ProxyService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.Member;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
index 260a55d53..dfaadf8c6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationBuildPipelineController.java
@@ -20,7 +20,7 @@ package org.apache.streampark.console.core.controller;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.Permission;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
index bc3043699..2e181883d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkApplicationController.java
@@ -27,11 +27,11 @@ import
org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.AppExistsStateEnum;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
index cc58bb216..7d76ca356 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkConfigController.java
@@ -22,7 +22,7 @@ import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationConfig;
-import
org.apache.streampark.console.core.service.SparkApplicationConfigService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
import org.apache.shiro.authz.annotation.RequiresPermissions;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
index c7ad83062..fa68395ea 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkProxyController.java
@@ -22,7 +22,7 @@ import
org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.enums.UserTypeEnum;
import org.apache.streampark.console.core.service.ProxyService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.system.entity.Member;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
similarity index 79%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index b379bd355..6239fa2a8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -17,33 +17,30 @@
package org.apache.streampark.console.core.entity;
-import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
-
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import java.io.Serializable;
import java.util.Date;
@Data
-@TableName("t_flink_effective")
+@TableName("t_app")
@Slf4j
-public class Effective {
+public class Application implements Serializable {
@TableId(type = IdType.AUTO)
private Long id;
-
- private Long appId;
/**
- * 1) config <br>
- * 2) flink Sql<br>
+ * 1: flink job
+ * 2: spark job
*/
- private Integer targetType;
+ private Integer jobType;
- private Long targetId;
private Date createTime;
- private transient EffectiveTypeEnum effectiveType;
+ private Date modifyTime;
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
index a6e1ed92a..574a320f1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
@@ -26,22 +26,34 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Date;
@Data
-@TableName("t_flink_log")
+@TableName("t_app_log")
@Slf4j
public class ApplicationLog {
@TableId(type = IdType.AUTO)
private Long id;
+
/** appId */
private Long appId;
- /** applicationId */
- private String yarnAppId;
+
+ /**
+ * 1: flink
+ * 2: spark
+ */
+ private Integer jobType;
+
+ /** clusterId */
+ private String clusterId;
+
/** The address of the jobmanager, that is, the direct access address of
the Flink web UI */
- private String jobManagerUrl;
+ private String trackingUrl;
+
/** start status */
private Boolean success;
+
/** option name */
private Integer optionName;
+
/** option time */
private Date optionTime;
/** exception at the start */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index 12f40f408..382f71fef 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -66,7 +66,7 @@ import java.util.Optional;
@Slf4j
public class FlinkApplication implements Serializable {
- @TableId(type = IdType.AUTO)
+ @TableId(type = IdType.INPUT)
private Long id;
private Long teamId;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java
index b379bd355..bfb806441 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Effective.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEffective.java
@@ -30,7 +30,7 @@ import java.util.Date;
@Data
@TableName("t_flink_effective")
@Slf4j
-public class Effective {
+public class FlinkEffective {
@TableId(type = IdType.AUTO)
private Long id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 8cc8abccf..da6421401 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -59,7 +59,7 @@ import java.util.Objects;
@Slf4j
public class SparkApplication extends BaseEntity {
- @TableId(type = IdType.AUTO)
+ @TableId(type = IdType.INPUT)
private Long id;
private Long teamId;
@@ -336,14 +336,14 @@ public class SparkApplication extends BaseEntity {
@JsonIgnore
public String getLocalAppHome() {
- String path = String.format("%s/%s",
Workspace.local().SPARK_APP_WORKSPACE(), id.toString());
+ String path = String.format("%s/%s",
Workspace.local().APP_WORKSPACE(), id.toString());
log.info("local appHome:{}", path);
return path;
}
@JsonIgnore
public String getRemoteAppHome() {
- String path = String.format("%s/%s",
Workspace.remote().SPARK_APP_WORKSPACE(), id.toString());
+ String path = String.format("%s/%s",
Workspace.remote().APP_WORKSPACE(), id.toString());
log.info("remote appHome:{}", path);
return path;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
index 3b15ce6cb..2033e2471 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/EngineTypeEnum.java
@@ -27,10 +27,10 @@ import java.util.Arrays;
public enum EngineTypeEnum {
/** Apache Flink: activated by default */
- FLINK(0),
+ FLINK(1),
/** Apache Spark */
- SPARK(1);
+ SPARK(2);
@EnumValue
private final int code;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
similarity index 87%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index b3815425f..48820fe3b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -17,9 +17,10 @@
package org.apache.streampark.console.core.mapper;
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.Application;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface EffectiveMapper extends BaseMapper<Effective> {
+public interface ApplicationMapper extends BaseMapper<Application> {
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java
similarity index 86%
copy from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java
index b3815425f..b16e6fa83 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEffectiveMapper.java
@@ -17,9 +17,9 @@
package org.apache.streampark.console.core.mapper;
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEffective;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface EffectiveMapper extends BaseMapper<Effective> {
+public interface FlinkEffectiveMapper extends BaseMapper<FlinkEffective> {
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c2d026bad..4e5ba38ac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -147,7 +147,6 @@ public class EnvInitializer implements ApplicationRunner {
Arrays.asList(
workspace.APP_UPLOADS(),
workspace.APP_WORKSPACE(),
- workspace.SPARK_APP_WORKSPACE(),
workspace.APP_BACKUPS(),
workspace.APP_SAVEPOINTS(),
workspace.APP_PYTHON(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
index afd50f1ad..0e04a1ab9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -24,10 +24,10 @@ import
org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkSql;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
+import
org.apache.streampark.console.core.service.application.AppBuildPipeService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import lombok.extern.slf4j.Slf4j;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
similarity index 94%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
index c19d467b4..160ae952f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
@@ -24,8 +24,8 @@ import org.apache.streampark.console.core.entity.FlinkCatalog;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
-/** This interface is use to managed catalog */
-public interface CatalogService extends IService<FlinkCatalog> {
+/** This interface is used to managed catalog */
+public interface FlinkCatalogService extends IService<FlinkCatalog> {
/**
* Create Catalog
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java
similarity index 85%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java
index 46d87db3a..1cd38cb1c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/EffectiveService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEffectiveService.java
@@ -17,16 +17,16 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEffective;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import com.baomidou.mybatisplus.extension.service.IService;
-public interface EffectiveService extends IService<Effective> {
+public interface FlinkEffectiveService extends IService<FlinkEffective> {
void remove(Long appId, EffectiveTypeEnum config);
- Effective get(Long appId, EffectiveTypeEnum config);
+ FlinkEffective get(Long appId, EffectiveTypeEnum config);
void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java
similarity index 97%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java
index c73c63ca0..a4cdacc3d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/AppBuildPipeService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/AppBuildPipeService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.DockerResolvedSnapshot;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java
similarity index 96%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java
index cb8fb1eac..17a068b3a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationLogService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.core.entity.ApplicationLog;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java
similarity index 68%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java
index b3815425f..e6404ad20 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationService.java
@@ -15,11 +15,14 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.mapper;
+package org.apache.streampark.console.core.service.application;
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.extension.service.IService;
-public interface EffectiveMapper extends BaseMapper<Effective> {
+public interface ApplicationService extends IService<Application> {
+
+ Application create(EngineTypeEnum engineTypeEnum);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java
index cafe8376f..29cc0de3c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationBackUpService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationBackUpService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.InternalException;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java
index 7850b2aa5..a13843a2e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkApplicationConfigService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/FlinkApplicationConfigService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.core.entity.FlinkApplication;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java
similarity index 97%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java
index ee2d2ae64..25ea3cb11 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkAppBuildPipeService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkAppBuildPipeService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java
index 1c3f2f92e..5b46fc53d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationBackUpService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationBackUpService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.InternalException;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java
index b289910d4..7745b97a3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationConfigService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationConfigService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.core.entity.SparkApplication;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java
similarity index 96%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java
index 6edee09ac..7c3687cbc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkApplicationLogService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationLogService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service;
+package org.apache.streampark.console.core.service.application;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java
index bc17e47b0..3434e67db 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/AppBuildPipeServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.constants.Constants;
@@ -45,16 +45,16 @@ import
org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.ResourceTypeEnum;
import
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SettingService;
+import
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java
similarity index 94%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java
index 3ac1563b4..feef8cc0c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationLogServiceImpl.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.mapper.ApplicationLogMapper;
-import org.apache.streampark.console.core.service.ApplicationLogService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java
new file mode 100644
index 000000000..866ceab92
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationServiceImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.application.impl;
+
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
+import org.apache.streampark.console.core.mapper.ApplicationMapper;
+import
org.apache.streampark.console.core.service.application.ApplicationService;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Date;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
+public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper,
Application>
+ implements
+ ApplicationService {
+
+ @Override
+ public Application create(EngineTypeEnum engineTypeEnum) {
+ Application application = new Application();
+ application.setJobType(engineTypeEnum.getCode());
+ application.setCreateTime(new Date());
+ application.setModifyTime(new Date());
+ save(application);
+ return application;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index 451fee22c..582d80316 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -54,11 +54,7 @@ import
org.apache.streampark.console.core.enums.OperationEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.FlinkApplicationMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.DistributedTaskService;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
@@ -66,7 +62,11 @@ import
org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
+import
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationActionService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
@@ -272,9 +272,9 @@ public class FlinkApplicationActionServiceImpl extends
ServiceImpl<FlinkApplicat
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
applicationLog.setAppId(application.getId());
- applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+ applicationLog.setTrackingUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
- applicationLog.setYarnAppId(application.getClusterId());
+ applicationLog.setClusterId(application.getClusterId());
applicationLog.setUserId(ServiceHelper.getUserId());
if (appParam.getRestoreOrTriggerSavepoint()) {
@@ -528,44 +528,44 @@ public class FlinkApplicationActionServiceImpl extends
ServiceImpl<FlinkApplicat
FlinkApplication appParam,
SubmitResponse response,
ApplicationLog applicationLog,
- FlinkApplication application) {
+ FlinkApplication flinkApplication) {
applicationLog.setSuccess(true);
if (response.flinkConfig() != null) {
String jmMemory =
response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
-
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+
flinkApplication.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
}
String tmMemory =
response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
if (tmMemory != null) {
-
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+
flinkApplication.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
}
}
if (StringUtils.isNoneEmpty(response.jobId())) {
- application.setJobId(response.jobId());
+ flinkApplication.setJobId(response.jobId());
}
- if (FlinkDeployMode.isYarnMode(application.getDeployMode())) {
- application.setClusterId(response.clusterId());
- applicationLog.setYarnAppId(response.clusterId());
+ if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode())) {
+ flinkApplication.setClusterId(response.clusterId());
+ applicationLog.setClusterId(response.clusterId());
}
if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
- application.setJobManagerUrl(response.jobManagerUrl());
- applicationLog.setJobManagerUrl(response.jobManagerUrl());
+ flinkApplication.setJobManagerUrl(response.jobManagerUrl());
+ applicationLog.setTrackingUrl(response.jobManagerUrl());
}
- applicationLog.setYarnAppId(response.clusterId());
- application.setStartTime(new Date());
- application.setEndTime(null);
+ applicationLog.setClusterId(response.clusterId());
+ flinkApplication.setStartTime(new Date());
+ flinkApplication.setEndTime(null);
// if start completed, will be added task to tracking queue
- if (application.isKubernetesModeJob()) {
- processForK8sApp(application, applicationLog);
+ if (flinkApplication.isKubernetesModeJob()) {
+ processForK8sApp(flinkApplication, applicationLog);
} else {
FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionStateEnum.STARTING);
- FlinkAppHttpWatcher.doWatching(application);
+ FlinkAppHttpWatcher.doWatching(flinkApplication);
}
// update app
- updateById(application);
+ updateById(flinkApplication);
// save log
applicationLogService.save(applicationLog);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java
similarity index 96%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java
index fdbc9a794..b3c0982d3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationBackUpServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackUpServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.domain.RestRequest;
@@ -29,10 +29,10 @@ import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.FlinkApplicationBackUpMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
import org.apache.streampark.console.core.service.FlinkSqlService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -62,7 +62,7 @@ public class FlinkApplicationBackUpServiceImpl
private FlinkApplicationConfigService configService;
@Autowired
- private EffectiveService effectiveService;
+ private FlinkEffectiveService effectiveService;
@Autowired
private FlinkSqlService flinkSqlService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
similarity index 97%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
index e9abadf0a..2f8a02dbc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.Utils;
@@ -27,8 +27,8 @@ import
org.apache.streampark.console.core.entity.FlinkApplicationConfig;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import org.apache.streampark.console.core.mapper.FlinkApplicationConfigMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -64,7 +64,7 @@ public class FlinkApplicationConfigServiceImpl
private ResourceLoader resourceLoader;
@Autowired
- private EffectiveService effectiveService;
+ private FlinkEffectiveService effectiveService;
@Override
public synchronized void create(FlinkApplication appParam, Boolean latest)
{
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
index 974cfeb2c..684b04dd5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
@@ -29,6 +29,7 @@ import
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -36,22 +37,24 @@ import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
import org.apache.streampark.console.core.enums.ChangeTypeEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.FlinkApplicationMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import org.apache.streampark.console.core.service.EffectiveService;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SavepointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
+import
org.apache.streampark.console.core.service.application.ApplicationService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -104,6 +107,9 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
@Autowired
private ProjectService projectService;
+ @Autowired
+ private ApplicationService applicationService;
+
@Autowired
private FlinkApplicationBackUpService backUpService;
@@ -120,7 +126,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
private SavepointService savepointService;
@Autowired
- private EffectiveService effectiveService;
+ private FlinkEffectiveService effectiveService;
@Autowired
private SettingService settingService;
@@ -348,7 +354,12 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new
File(jarPath)));
}
- if (save(appParam)) {
+ // 1) save application
+ Application application =
applicationService.create(EngineTypeEnum.FLINK);
+ appParam.setId(application.getId());
+
+ boolean saveSuccess = save(appParam);
+ if (saveSuccess) {
if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = new FlinkSql(appParam);
flinkSqlService.create(flinkSql);
@@ -429,6 +440,9 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
newApp.setTeamId(persist.getTeamId());
newApp.setDependency(persist.getDependency());
+ Application application =
applicationService.create(EngineTypeEnum.FLINK);
+ newApp.setId(application.getId());
+
boolean saved = save(newApp);
if (saved) {
if (newApp.isFlinkSqlJob()) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java
index 315886161..063a96c0c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkAppBuildPipeServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.constants.Constants;
@@ -45,12 +45,12 @@ import
org.apache.streampark.console.core.enums.ResourceTypeEnum;
import
org.apache.streampark.console.core.mapper.ApplicationBuildPipelineMapper;
import org.apache.streampark.console.core.service.MessageService;
import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
-import
org.apache.streampark.console.core.service.SparkApplicationConfigService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
+import
org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index a6dce8bbb..4812ebbc1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -45,21 +45,21 @@ import
org.apache.streampark.console.core.enums.SparkOperationEnum;
import org.apache.streampark.console.core.enums.SparkOptionStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SparkAppBuildPipeService;
-import
org.apache.streampark.console.core.service.SparkApplicationConfigService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
import org.apache.streampark.console.core.service.VariableService;
+import
org.apache.streampark.console.core.service.application.SparkAppBuildPipeService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.core.watcher.SparkAppHttpWatcher;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.streampark.spark.client.SparkClient;
-import org.apache.streampark.spark.client.bean.StopRequest;
-import org.apache.streampark.spark.client.bean.StopResponse;
+import org.apache.streampark.spark.client.bean.CancelRequest;
+import org.apache.streampark.spark.client.bean.CancelResponse;
import org.apache.streampark.spark.client.bean.SubmitRequest;
import org.apache.streampark.spark.client.bean.SubmitResponse;
@@ -129,9 +129,9 @@ public class SparkApplicationActionServiceImpl
@Autowired
private ResourceService resourceService;
- private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap
= new ConcurrentHashMap<>();
+ private final Map<Long, CompletableFuture<SubmitResponse>>
startJobFutureMap = new ConcurrentHashMap<>();
- private final Map<Long, CompletableFuture<StopResponse>> stopFutureMap =
new ConcurrentHashMap<>();
+ private final Map<Long, CompletableFuture<CancelResponse>>
cancelJobFutureMap = new ConcurrentHashMap<>();
@Override
public void revoke(Long appId) throws ApplicationException {
@@ -167,8 +167,8 @@ public class SparkApplicationActionServiceImpl
@Override
public void forcedStop(Long id) {
- CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(id);
- CompletableFuture<StopResponse> stopFuture = stopFutureMap.remove(id);
+ CompletableFuture<SubmitResponse> startFuture =
startJobFutureMap.remove(id);
+ CompletableFuture<CancelResponse> stopFuture =
cancelJobFutureMap.remove(id);
SparkApplication application = this.baseMapper.selectApp(id);
if (startFuture != null) {
startFuture.cancel(true);
@@ -205,21 +205,21 @@ public class SparkApplicationActionServiceImpl
Map<String, String> stopProper = new HashMap<>();
- StopRequest stopRequest =
- new StopRequest(
+ CancelRequest stopRequest =
+ new CancelRequest(
application.getId(),
sparkEnv.getSparkVersion(),
SparkDeployMode.of(application.getDeployMode()),
stopProper,
application.getAppId());
- CompletableFuture<StopResponse> stopFuture =
- CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest),
executorService);
+ CompletableFuture<CancelResponse> stopFuture =
+ CompletableFuture.supplyAsync(() ->
SparkClient.cancel(stopRequest), executorService);
- stopFutureMap.put(application.getId(), stopFuture);
+ cancelJobFutureMap.put(application.getId(), stopFuture);
stopFuture.whenComplete(
(cancelResponse, throwable) -> {
- stopFutureMap.remove(application.getId());
+ cancelJobFutureMap.remove(application.getId());
if (throwable != null) {
String exception =
ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
@@ -329,11 +329,11 @@ public class SparkApplicationActionServiceImpl
CompletableFuture<SubmitResponse> future = CompletableFuture
.supplyAsync(() -> SparkClient.submit(submitRequest),
executorService);
- startFutureMap.put(application.getId(), future);
+ startJobFutureMap.put(application.getId(), future);
future.whenComplete(
(response, throwable) -> {
// 1) remove Future
- startFutureMap.remove(application.getId());
+ startJobFutureMap.remove(application.getId());
// 2) exception
if (throwable != null) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackUpServiceImpl.java
similarity index 97%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackUpServiceImpl.java
index 0009738af..50d8923e9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackUpServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.domain.RestRequest;
@@ -29,10 +29,10 @@ import org.apache.streampark.console.core.entity.SparkSql;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationBackUpMapper;
-import
org.apache.streampark.console.core.service.SparkApplicationBackUpService;
-import
org.apache.streampark.console.core.service.SparkApplicationConfigService;
import org.apache.streampark.console.core.service.SparkEffectiveService;
import org.apache.streampark.console.core.service.SparkSqlService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -143,7 +143,7 @@ public class SparkApplicationBackUpServiceImpl
if (!backUpPages.getRecords().isEmpty()) {
SparkApplicationBackUp backup = backUpPages.getRecords().get(0);
String path = backup.getPath();
- appParam.getFsOperator().move(path,
appParam.getWorkspace().SPARK_APP_WORKSPACE());
+ appParam.getFsOperator().move(path,
appParam.getWorkspace().APP_WORKSPACE());
super.removeById(backup.getId());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationConfigServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java
index c8dd9bd8c..3d00aa68c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationConfigServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.Utils;
@@ -27,8 +27,8 @@ import
org.apache.streampark.console.core.entity.SparkApplicationConfig;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationConfigMapper;
-import
org.apache.streampark.console.core.service.SparkApplicationConfigService;
import org.apache.streampark.console.core.service.SparkEffectiveService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java
similarity index 94%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java
index cb6e338af..34ce31cd3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationLogServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationLogServiceImpl.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.service.impl;
+package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.core.entity.SparkApplicationLog;
import org.apache.streampark.console.core.mapper.SparkApplicationLogMapper;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index 112ed1cf5..8f3554543 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -28,26 +28,28 @@ import
org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
+import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkApplicationConfig;
import org.apache.streampark.console.core.entity.SparkSql;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
import org.apache.streampark.console.core.enums.ChangeTypeEnum;
+import org.apache.streampark.console.core.enums.EngineTypeEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.SparkAppStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
-import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
-import org.apache.streampark.console.core.service.SettingService;
-import
org.apache.streampark.console.core.service.SparkApplicationBackUpService;
-import
org.apache.streampark.console.core.service.SparkApplicationConfigService;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.SparkEffectiveService;
import org.apache.streampark.console.core.service.SparkSqlService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import
org.apache.streampark.console.core.service.application.AppBuildPipeService;
+import
org.apache.streampark.console.core.service.application.ApplicationService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationConfigService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
@@ -95,6 +97,9 @@ public class SparkApplicationManageServiceImpl
@Autowired
private ProjectService projectService;
+ @Autowired
+ private ApplicationService applicationService;
+
@Autowired
private SparkApplicationBackUpService backUpService;
@@ -110,9 +115,6 @@ public class SparkApplicationManageServiceImpl
@Autowired
private SparkEffectiveService effectiveService;
- @Autowired
- private SettingService settingService;
-
@Autowired
private AppBuildPipeService appBuildPipeService;
@@ -194,9 +196,9 @@ public class SparkApplicationManageServiceImpl
try {
application
.getFsOperator()
-
.delete(application.getWorkspace().SPARK_APP_WORKSPACE().concat("/").concat(appId.toString()));
+
.delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
// try to delete yarn-application, and leave no trouble.
- String path =
Workspace.of(StorageType.HDFS).SPARK_APP_WORKSPACE().concat("/").concat(appId.toString());
+ String path =
Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString());
if (HdfsOperator.exists(path)) {
HdfsOperator.delete(path);
}
@@ -286,7 +288,13 @@ public class SparkApplicationManageServiceImpl
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new
File(jarPath)));
}
- if (save(appParam)) {
+ // 1) save application
+ Application application =
applicationService.create(EngineTypeEnum.SPARK);
+ appParam.setId(application.getId());
+
+ boolean saveSuccess = save(appParam);
+
+ if (saveSuccess) {
if (appParam.isSparkSqlJob()) {
SparkSql sparkSql = new SparkSql(appParam);
sparkSqlService.create(sparkSql);
@@ -352,6 +360,9 @@ public class SparkApplicationManageServiceImpl
newApp.setModifyTime(newApp.getCreateTime());
newApp.setTags(oldApp.getTags());
+ Application application =
applicationService.create(EngineTypeEnum.SPARK);
+ newApp.setId(application.getId());
+
boolean saved = save(newApp);
if (saved) {
if (newApp.isSparkSqlJob()) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
index 9cc1e12c6..b173cb276 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java
@@ -24,7 +24,7 @@ import
org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.core.bean.FlinkCatalogParams;
import org.apache.streampark.console.core.entity.FlinkCatalog;
import org.apache.streampark.console.core.mapper.FlinkCatalogMapper;
-import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.service.FlinkCatalogService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -46,7 +46,7 @@ import java.util.regex.Pattern;
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor =
Exception.class)
public class FlinkCatalogServiceImpl extends ServiceImpl<FlinkCatalogMapper,
FlinkCatalog>
implements
- CatalogService {
+ FlinkCatalogService {
private static final String CATALOG_REGEX =
"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEffectiveServiceImpl.java
similarity index 59%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEffectiveServiceImpl.java
index d8b5f4352..bdeb8c0e2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEffectiveServiceImpl.java
@@ -17,10 +17,10 @@
package org.apache.streampark.console.core.service.impl;
-import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEffective;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
-import org.apache.streampark.console.core.mapper.EffectiveMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
+import org.apache.streampark.console.core.mapper.FlinkEffectiveMapper;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -35,34 +35,34 @@ import java.util.Date;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
-public class EffectiveServiceImpl extends ServiceImpl<EffectiveMapper,
Effective>
+public class FlinkEffectiveServiceImpl extends
ServiceImpl<FlinkEffectiveMapper, FlinkEffective>
implements
- EffectiveService {
+ FlinkEffectiveService {
@Override
public void remove(Long appId, EffectiveTypeEnum effectiveTypeEnum) {
- LambdaQueryWrapper<Effective> queryWrapper = new
LambdaQueryWrapper<Effective>()
- .eq(Effective::getAppId, appId)
- .eq(Effective::getTargetType, effectiveTypeEnum.getType());
+ LambdaQueryWrapper<FlinkEffective> queryWrapper = new
LambdaQueryWrapper<FlinkEffective>()
+ .eq(FlinkEffective::getAppId, appId)
+ .eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType());
baseMapper.delete(queryWrapper);
}
@Override
- public Effective get(Long appId, EffectiveTypeEnum effectiveTypeEnum) {
- LambdaQueryWrapper<Effective> queryWrapper = new
LambdaQueryWrapper<Effective>()
- .eq(Effective::getAppId, appId)
- .eq(Effective::getTargetType, effectiveTypeEnum.getType());
+ public FlinkEffective get(Long appId, EffectiveTypeEnum effectiveTypeEnum)
{
+ LambdaQueryWrapper<FlinkEffective> queryWrapper = new
LambdaQueryWrapper<FlinkEffective>()
+ .eq(FlinkEffective::getAppId, appId)
+ .eq(FlinkEffective::getTargetType, effectiveTypeEnum.getType());
return this.getOne(queryWrapper);
}
@Override
public void saveOrUpdate(Long appId, EffectiveTypeEnum type, Long id) {
- LambdaQueryWrapper<Effective> queryWrapper = new
LambdaQueryWrapper<Effective>()
- .eq(Effective::getAppId, appId)
- .eq(Effective::getTargetType, type.getType());
+ LambdaQueryWrapper<FlinkEffective> queryWrapper = new
LambdaQueryWrapper<FlinkEffective>()
+ .eq(FlinkEffective::getAppId, appId)
+ .eq(FlinkEffective::getTargetType, type.getType());
long count = count(queryWrapper);
if (count == 0) {
- Effective effective = new Effective();
+ FlinkEffective effective = new FlinkEffective();
effective.setAppId(appId);
effective.setTargetType(type.getType());
effective.setTargetId(id);
@@ -70,16 +70,17 @@ public class EffectiveServiceImpl extends
ServiceImpl<EffectiveMapper, Effective
save(effective);
} else {
update(
- new LambdaUpdateWrapper<Effective>()
- .eq(Effective::getAppId, appId)
- .eq(Effective::getTargetType, type.getType())
- .set(Effective::getTargetId, id));
+ new LambdaUpdateWrapper<FlinkEffective>()
+ .eq(FlinkEffective::getAppId, appId)
+ .eq(FlinkEffective::getTargetType, type.getType())
+ .set(FlinkEffective::getTargetId, id));
}
}
@Override
public void removeByAppId(Long appId) {
- LambdaQueryWrapper<Effective> queryWrapper = new
LambdaQueryWrapper<Effective>().eq(Effective::getAppId, appId);
+ LambdaQueryWrapper<FlinkEffective> queryWrapper =
+ new
LambdaQueryWrapper<FlinkEffective>().eq(FlinkEffective::getAppId, appId);
this.remove(queryWrapper);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index e075e5da4..4191d316d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -35,11 +35,11 @@ import
org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.OperationEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.mapper.FlinkSavepointMapper;
-import org.apache.streampark.console.core.service.ApplicationLogService;
-import
org.apache.streampark.console.core.service.FlinkApplicationConfigService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SavepointService;
+import
org.apache.streampark.console.core.service.application.ApplicationLogService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import org.apache.streampark.console.core.util.ServiceHelper;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -191,9 +191,9 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
applicationLog.setAppId(application.getId());
- applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+ applicationLog.setTrackingUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
- applicationLog.setYarnAppId(application.getClusterId());
+ applicationLog.setClusterId(application.getClusterId());
applicationLog.setUserId(ServiceHelper.getUserId());
return applicationLog;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index c95cb3b7c..45048e5e2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -28,10 +28,10 @@ import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import org.apache.streampark.console.core.mapper.FlinkSqlMapper;
-import org.apache.streampark.console.core.service.EffectiveService;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
+import org.apache.streampark.console.core.service.FlinkEffectiveService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import org.apache.streampark.flink.core.FlinkSqlValidationResult;
import org.apache.streampark.flink.proxy.FlinkShimsProxy;
@@ -58,7 +58,7 @@ public class FlinkSqlServiceImpl extends
ServiceImpl<FlinkSqlMapper, FlinkSql>
FlinkSqlService {
@Autowired
- private EffectiveService effectiveService;
+ private FlinkEffectiveService effectiveService;
@Autowired
private FlinkApplicationBackUpService backUpService;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
index 590b373c2..4b492fc08 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProxyServiceImpl.java
@@ -131,7 +131,7 @@ public class ProxyServiceImpl implements ProxyService {
@Override
public ResponseEntity<?> proxyYarn(HttpServletRequest request,
ApplicationLog log) throws Exception {
ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
- String yarnId = log.getYarnAppId();
+ String yarnId = log.getClusterId();
if (StringUtils.isBlank(yarnId)) {
return builder.body("The yarn application id is null.");
}
@@ -158,7 +158,7 @@ public class ProxyServiceImpl implements ProxyService {
public ResponseEntity<?> proxyHistory(HttpServletRequest request,
ApplicationLog log) throws Exception {
ResponseEntity.BodyBuilder builder =
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
- String url = log.getJobManagerUrl();
+ String url = log.getTrackingUrl();
if (StringUtils.isBlank(url)) {
return builder.body("The jobManager url is null.");
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
index 046826880..cf157007f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkSqlServiceImpl.java
@@ -28,10 +28,10 @@ import org.apache.streampark.console.core.entity.SparkSql;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
import org.apache.streampark.console.core.mapper.SparkSqlMapper;
-import
org.apache.streampark.console.core.service.SparkApplicationBackUpService;
import org.apache.streampark.console.core.service.SparkEffectiveService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.SparkSqlService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationBackUpService;
import org.apache.streampark.spark.client.proxy.SparkShimsProxy;
import org.apache.streampark.spark.core.util.SparkSqlValidationResult;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
index 22fa38e0f..b75848b66 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.task;
import org.apache.streampark.console.core.entity.FlinkApplicationBackUp;
-import
org.apache.streampark.console.core.service.FlinkApplicationBackUpService;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationBackUpService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index 8e7c15476..b2c1ed083 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -28,11 +28,11 @@ import
org.apache.streampark.console.core.enums.StopFromEnum;
import org.apache.streampark.console.core.metrics.spark.Job;
import
org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
-import org.apache.streampark.console.core.service.SparkApplicationLogService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.alert.AlertService;
import
org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import
org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
+import
org.apache.streampark.console.core.service.application.SparkApplicationLogService;
import
org.apache.streampark.console.core.service.application.SparkApplicationManageService;
import org.apache.streampark.console.core.utils.AlertTemplateUtils;
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index f8459eee8..3c082209a 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -21,6 +21,12 @@
insert into `t_team` values (100000, 'default', 'The default team', now(),
now());
insert into `t_team` values (100001, 'test', 'The test team', now(), now());
+-- ----------------------------
+-- Records of flink-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`)
+VALUES (100000, 1, now(), now());
+
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
@@ -45,6 +51,13 @@ insert into `t_flink_project` values (100000, 100000,
'streampark-quickstart', '
-- ----------------------------
insert into `t_flink_sql` values (100000, 100000,
'eNqlUUtPhDAQvu+vmFs1AYIHT5s94AaVqGxSSPZIKgxrY2mxrdGfb4GS3c0+LnJo6Mz36syapkmZQpk8vKbQMMt2KOFmAe5rK4Nf3yhrhCwvA1/TTDaqO61UxmooSprlT1PDGkgKEKpmwvIOjWVdP3W2zpG+JfQFHjfU46xxrVvYZuWztye1khJrqzSBFRCfjUwSYQiqt1xJJvyPcbWJp9WPCXvUoUEn0ZAVufcs0nIUjYn2L4s++YiY75eBLr+2Dnl3GYKTWRyfQKYRRR2XZxXmNvu9yh9GHAmUO/sxyMRkGNly4c714RZ7zaWtLHsX+N9NjvVrWxm99jmyvEhpOUhujmIYFI5zkCOYzYIj11a7QH7Tyz+nE8bw',
null, null, 1, 1, now());
+
+-- ----------------------------
+-- Records of spark-app
+-- ----------------------------
+INSERT INTO `t_app` (`id`, `job_type`, `create_time`, `modify_time`)
+VALUES (100001, 2, now(), now());
+
-- ----------------------------
-- Records of t_spark_app
-- ----------------------------
@@ -52,17 +65,17 @@ insert into `t_spark_app` (
`id`, `team_id`, `job_type`, `app_type`, `app_name`, `deploy_mode`,
`resource_from`, `main_class`,
`yarn_queue`, `k8s_image_pull_policy`, `k8s_namespace`, `state`,
`option_state`, `user_id`,
`description`, `tracking`, `release`, `build`, `create_time`,
`modify_time`, `tags`)
-values (100000, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
+values (100001, 100000, 2, 4, 'Spark SQL Demo', 2, 2,
'org.apache.streampark.spark.cli.SqlClient', 'default', 0, 'default', 0, 0,
100000, 'Spark SQL Demo', 0, 1, 1, now(), now(), 'streampark,test');
-- ----------------------------
-- Records of t_spark_effective
-- ----------------------------
-insert into `t_spark_effective` values (100000, 100000, 4, 100000, now());
+insert into `t_spark_effective` values (100000, 100001, 4, 100000, now());
-- ----------------------------
-- Records of t_spark_sql
-- ----------------------------
-insert into `t_spark_sql` values (100000, 100000,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
+insert into `t_spark_sql` values (100000, 100001,
'eNq1jr0OgjAURnee4m4FY/oCTJVUg/KT9F7cK2kQiy2W+P6KMQ6yuDh9+YZzcjIlBUkgsSkkXCbv0N9Da0ifBgOx01cDSCqvdmsIpuu9e98kavA54EPH9ajbs+HTqIPl023gsyeN8gqlIsgrqhfmoygaiTEre2vYGliDgiW/IXvd2hdymIls0d87+5f6jxdlITOCFWxVXX5npg92MWtB',
null, null, 1, 1, now());
-- ----------------------------
-- Records of t_menu
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index b427e28d1..dd2de3986 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -16,19 +16,14 @@
*/
-- ----------------------------
--- Table structure for t_flink_app_backup
+-- Table structure for t_app
-- ----------------------------
-create table if not exists `t_flink_app_backup` (
- `id` bigint generated by default as identity not null,
- `app_id` bigint default null,
- `sql_id` bigint default null,
- `config_id` bigint default null,
- `version` int default null,
- `path` varchar(128) default null,
- `description` varchar(255) default null,
- `create_time` datetime default null comment 'create time',
- `modify_time` datetime default null comment 'modify time',
- primary key(`id`)
+create table if not exists `t_app` (
+`id` bigint generated by default as identity not null,
+`job_type` tinyint default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
);
-- ----------------------------
@@ -99,6 +94,24 @@ create table if not exists `t_flink_app` (
primary key(`id`)
);
+
+-- ----------------------------
+-- Table structure for t_flink_app_backup
+-- ----------------------------
+create table if not exists `t_flink_app_backup` (
+`id` bigint generated by default as identity not null,
+`app_id` bigint default null,
+`sql_id` bigint default null,
+`config_id` bigint default null,
+`version` int default null,
+`path` varchar(128) default null,
+`description` varchar(255) default null,
+`create_time` datetime default null comment 'create time',
+`modify_time` datetime default null comment 'modify time',
+primary key(`id`)
+);
+
+
-- ----------------------------
-- Table structure for t_flink_config
-- ----------------------------
@@ -145,13 +158,14 @@ create table if not exists `t_flink_env` (
-- ----------------------------
--- Table structure for t_flink_log
+-- Table structure for t_app_log
-- ----------------------------
-create table if not exists `t_flink_log` (
+create table if not exists `t_app_log` (
`id` bigint generated by default as identity not null,
`app_id` bigint default null,
- `yarn_app_id` varchar(64) default null,
- `job_manager_url` varchar(255) default null,
+ `job_type` tinyint default null,
+ `cluster_id` varchar(64) default null,
+ `tracking_url` varchar(255) default null,
`success` tinyint default null,
`exception` text ,
`option_time` datetime default null,
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
similarity index 92%
copy from
streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
copy to
streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 59a6ac14a..85569bf7f 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -16,6 +16,6 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.streampark.console.core.mapper.EffectiveMapper">
+<mapper
namespace="org.apache.streampark.console.core.mapper.ApplicationMapper">
</mapper>
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
index 59a6ac14a..2dae562b2 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/EffectiveMapper.xml
@@ -16,6 +16,6 @@
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.streampark.console.core.mapper.EffectiveMapper">
+<mapper
namespace="org.apache.streampark.console.core.mapper.FlinkEffectiveMapper">
</mapper>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java
similarity index 98%
rename from
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
rename to
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java
index d199f4de9..5724ecee9 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationManageServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkApplicationManageServiceTest.java
@@ -37,7 +37,7 @@ import java.util.Date;
import static org.assertj.core.api.Assertions.assertThat;
/** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */
-class ApplicationManageServiceTest extends SpringUnitTestBase {
+class FlinkApplicationManageServiceTest extends SpringUnitTestBase {
@Autowired
private FlinkApplicationManageService applicationManageService;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
similarity index 97%
rename from
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
rename to
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
index 23decd3bd..b6089adf9 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/CatalogServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
@@ -32,10 +32,10 @@ import
org.springframework.beans.factory.annotation.Autowired;
import static org.assertj.core.api.Assertions.assertThat;
/** CatalogService Tests */
-public class CatalogServiceTest extends SpringUnitTestBase {
+public class FlinkCatalogServiceTest extends SpringUnitTestBase {
@Autowired
- private CatalogService catalogService;
+ private FlinkCatalogService catalogService;
@AfterEach
void cleanTestRecordsInDatabase() {
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
similarity index 96%
rename from
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
rename to
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
index 833faa4ab..66f43f940 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavepointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
@@ -22,12 +22,13 @@ import org.apache.streampark.common.enums.FlinkDeployMode;
import org.apache.streampark.common.enums.FlinkJobType;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.console.SpringUnitTestBase;
-import org.apache.streampark.console.core.entity.Effective;
import org.apache.streampark.console.core.entity.FlinkApplication;
import org.apache.streampark.console.core.entity.FlinkApplicationConfig;
+import org.apache.streampark.console.core.entity.FlinkEffective;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.EffectiveTypeEnum;
+import
org.apache.streampark.console.core.service.application.FlinkApplicationConfigService;
import
org.apache.streampark.console.core.service.application.FlinkApplicationManageService;
import
org.apache.streampark.console.core.service.impl.FlinkSavepointServiceImpl;
@@ -47,7 +48,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
* FlinkSavepointServiceImpl} of {@link
* SavepointService}.
*/
-class SavepointServiceTest extends SpringUnitTestBase {
+class FlinkSavepointServiceTest extends SpringUnitTestBase {
@Autowired
private SavepointService savepointService;
@@ -56,7 +57,7 @@ class SavepointServiceTest extends SpringUnitTestBase {
private FlinkApplicationConfigService configService;
@Autowired
- private EffectiveService effectiveService;
+ private FlinkEffectiveService effectiveService;
@Autowired
private FlinkEnvService flinkEnvService;
@@ -137,7 +138,7 @@ class SavepointServiceTest extends SpringUnitTestBase {
+ String.format("%s=%s", CHECKPOINTING_INTERVAL.key(),
"3min")));
configService.updateById(appCfg);
- Effective effective = new Effective();
+ FlinkEffective effective = new FlinkEffective();
effective.setTargetId(appCfg.getId());
effective.setAppId(appId);
effective.setTargetType(EffectiveTypeEnum.CONFIG.getType());
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
index 0abc5f4c9..32d09b856 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/SparkClient.scala
@@ -32,15 +32,15 @@ object SparkClient extends Logger {
private[this] val SUBMIT_REQUEST =
"org.apache.streampark.spark.client.bean.SubmitRequest" -> "submit"
- private[this] val STOP_REQUEST =
- "org.apache.streampark.spark.client.bean.StopRequest" -> "stop"
+ private[this] val CANCEL_REQUEST =
+ "org.apache.streampark.spark.client.bean.CancelRequest" -> "cancel"
def submit(submitRequest: SubmitRequest): SubmitResponse = {
proxy[SubmitResponse](submitRequest, submitRequest.sparkVersion,
SUBMIT_REQUEST)
}
- def stop(stopRequest: StopRequest): StopResponse = {
- proxy[StopResponse](stopRequest, stopRequest.sparkVersion, STOP_REQUEST)
+ def cancel(stopRequest: CancelRequest): CancelResponse = {
+ proxy[CancelResponse](stopRequest, stopRequest.sparkVersion,
CANCEL_REQUEST)
}
private[this] def proxy[T: ClassTag](
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
similarity index 97%
rename from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
rename to
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
index 6440d9f43..22a2b4e08 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelRequest.scala
@@ -23,7 +23,7 @@ import org.apache.streampark.common.util.Implicits.JavaMap
import javax.annotation.Nullable
-case class StopRequest(
+case class CancelRequest(
id: Long,
sparkVersion: SparkVersion,
deployMode: SparkDeployMode,
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
similarity index 94%
rename from
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
rename to
streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
index 42b480534..7b22a362f 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/CancelResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.spark.client.bean
-case class StopResponse(savePoint: String)
+case class CancelResponse(savePoint: String)
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
index 104dd9ff6..d68f93596 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
@@ -39,9 +39,9 @@ object SparkClientEndpoint {
}
}
- def stop(stopRequest: StopRequest): StopResponse = {
+ def cancel(stopRequest: CancelRequest): CancelResponse = {
clients.get(stopRequest.deployMode) match {
- case Some(client) => client.stop(stopRequest)
+ case Some(client) => client.cancel(stopRequest)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${stopRequest.deployMode} spark stop.")
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index d26ea8098..007b80d06 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.spark.client.impl
-import
org.apache.streampark.common.conf.ConfigKeys.{KEY_SPARK_YARN_AM_NODE_LABEL,
KEY_SPARK_YARN_EXECUTOR_NODE_LABEL, KEY_SPARK_YARN_QUEUE,
KEY_SPARK_YARN_QUEUE_LABEL, KEY_SPARK_YARN_QUEUE_NAME}
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums.SparkDeployMode
import org.apache.streampark.common.util.{HadoopUtils, YarnUtils}
import org.apache.streampark.common.util.Implicits._
@@ -37,22 +37,22 @@ object YarnClient extends SparkClientTrait {
private lazy val sparkHandles = new ConcurrentHashMap[String,
SparkAppHandle]()
- override def doStop(stopRequest: StopRequest): StopResponse = {
- val sparkAppHandle = sparkHandles.remove(stopRequest.appId)
+ override def doCancel(cancelRequest: CancelRequest): CancelResponse = {
+ val sparkAppHandle = sparkHandles.remove(cancelRequest.appId)
if (sparkAppHandle != null) {
Try(sparkAppHandle.stop()) match {
case Success(_) =>
- logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${stopRequest.appId} is stopped successfully.")
- StopResponse(null)
+ logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${cancelRequest.appId} is stopped successfully.")
+ CancelResponse(null)
case Failure(e) =>
logger.error("[StreamPark][Spark][YarnClient] sparkAppHandle kill
failed. Try kill by yarn", e)
- yarnKill(stopRequest.appId)
- StopResponse(null)
+ yarnKill(cancelRequest.appId)
+ CancelResponse(null)
}
} else {
- logger.warn(s"[StreamPark][Spark][YarnClient] spark job:
${stopRequest.appId} is not existed. Try kill by yarn")
- yarnKill(stopRequest.appId)
- StopResponse(null)
+ logger.warn(s"[StreamPark][Spark][YarnClient] spark job:
${cancelRequest.appId} is not existed. Try kill by yarn")
+ yarnKill(cancelRequest.appId)
+ CancelResponse(null)
}
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index b13bdef1d..746de6cd1 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -60,24 +60,24 @@ trait SparkClientTrait extends Logger {
def setConfig(submitRequest: SubmitRequest): Unit
@throws[Exception]
- def stop(stopRequest: StopRequest): StopResponse = {
+ def cancel(stopRequest: CancelRequest): CancelResponse = {
logInfo(
s"""
- |----------------------------------------- spark job stop
----------------------------------
+ |----------------------------------------- spark job cancel
----------------------------------
| userSparkHome : ${stopRequest.sparkVersion.sparkHome}
| sparkVersion : ${stopRequest.sparkVersion.version}
| appId : ${stopRequest.appId}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
- doStop(stopRequest)
+ doCancel(stopRequest)
}
@throws[Exception]
def doSubmit(submitRequest: SubmitRequest): SubmitResponse
@throws[Exception]
- def doStop(stopRequest: StopRequest): StopResponse
+ def doCancel(cancelRequest: CancelRequest): CancelResponse
private def prepareConfig(submitRequest: SubmitRequest): Unit = {
// 1) filter illegal configuration key