This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new ff8b93b04 [Improve] remapping flink job state bug fixed.
ff8b93b04 is described below
commit ff8b93b040e588c5109e610af33cab566814a083
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 14 19:49:41 2024 +0800
[Improve] remapping flink job state bug fixed.
---
.../src/main/assembly/script/data/mysql-data.sql | 2 +-
.../src/main/assembly/script/data/pgsql-data.sql | 2 +-
.../main/assembly/script/schema/mysql-schema.sql | 1 -
.../main/assembly/script/schema/pgsql-schema.sql | 1 -
.../main/assembly/script/upgrade/mysql/2.1.4.sql | 20 +++--
.../main/assembly/script/upgrade/pgsql/2.1.4.sql | 10 ++-
.../console/core/bean/AlertTemplate.java | 3 +-
.../controller/ApplicationHistoryController.java | 18 +----
.../console/core/entity/Application.java | 5 +-
.../console/core/mapper/ApplicationMapper.java | 3 +-
.../console/core/service/ApplicationService.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 89 ++++++++++++----------
.../core/service/impl/ExternalLinkServiceImpl.java | 2 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 4 +-
.../core/service/impl/SavePointServiceImpl.java | 25 +++---
.../console/core/task/FlinkAppHttpWatcher.java | 8 +-
.../core/task/FlinkK8sChangeEventListener.java | 23 +++---
.../src/main/resources/db/data-h2.sql | 2 +-
.../src/main/resources/db/schema-h2.sql | 1 -
.../resources/mapper/core/ApplicationMapper.xml | 21 ++---
.../core/service/alert/AlertServiceTest.java | 4 +-
.../src/api/flink/app/flinkHistory.ts | 6 +-
.../src/views/flink/app/EditFlink.vue | 2 +-
.../src/views/flink/app/hooks/useApp.tsx | 6 +-
.../flink/app/hooks/useCreateAndEditSchema.ts | 12 +--
.../src/views/flink/app/utils/index.ts | 4 +-
.../setting/FlinkCluster/useClusterSetting.ts | 6 +-
.../flink/kubernetes/enums/FlinkJobState.scala | 2 +-
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 32 ++++----
29 files changed, 152 insertions(+), 164 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index 79a4766bb..bc9565b5f 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -30,7 +30,7 @@ insert into `t_team` values (100000, 'default', null, now(),
now());
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null,
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null, null , null, 100000, 1, null, null,
null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
-- ----------------------------
-- Records of t_flink_effective
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
index f38496ff4..bcd706880 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql
@@ -25,7 +25,7 @@ insert into "public"."t_team" values (100000, 'default',
null, now(), now());
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
-insert into "public"."t_flink_app" values (100000, 100000, 2, 4, null, null,
'Flink SQL Demo', null, null, null, null, null, null, null , null, 100000,
null, 1, null, null, null, null, null, null, null, 0, 0, null, null, null,
null, null, null, 'Flink SQL Demo', 0, null, null, null, null, null, null,
null, 0, 0, now(), now(), null, 1, true, null, null, null, null, null, null,
false, null, null, null, 'streampark,test');
+insert into "public"."t_flink_app" values (100000, 100000, 2, 4, null, null,
'Flink SQL Demo', null, null, null, null, null, null, null , null, 100000, 1,
null, null, null, null, null, null, null, 0, 0, null, null, null, null, null,
null, 'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0,
now(), now(), null, 1, true, null, null, null, null, null, null, false, null,
null, null, 'streampark,test');
-- ----------------------------
-- Records of t_flink_effective
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 7d21dc70f..95024ed79 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -58,7 +58,6 @@ create table `t_flink_app` (
`options` longtext collate utf8mb4_general_ci,
`hot_params` text collate utf8mb4_general_ci,
`user_id` bigint default null,
- `app_id` varchar(64) collate utf8mb4_general_ci default null,
`app_type` tinyint default null,
`duration` bigint default null,
`job_id` varchar(64) collate utf8mb4_general_ci default null,
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 5f92121be..a523c04a4 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -206,7 +206,6 @@ create table "public"."t_flink_app" (
"options" text collate "pg_catalog"."default",
"hot_params" text collate "pg_catalog"."default",
"user_id" int8,
- "app_id" varchar(64) collate "pg_catalog"."default",
"app_type" int2,
"duration" int8,
"job_id" varchar(64) collate "pg_catalog"."default",
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
index 8d25f822f..04b7f6ea4 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql
@@ -17,12 +17,18 @@
use streampark;
-set names utf8mb4;
-set foreign_key_checks = 0;
+SET NAMES utf8mb4;
+SET foreign_key_checks = 0;
-update `t_flink_app` a inner join `t_flink_cluster` c
-on a.`cluster_id` = c.`cluster_id`
- and a.`execution_mode` = 5
- set a.`flink_cluster_id` = c.`id`;
+UPDATE `t_flink_app` a INNER JOIN `t_flink_cluster` c
+ON a.`cluster_id` = c.`cluster_id`
+AND a.`execution_mode` = 5
+SET a.`flink_cluster_id` = c.`id`;
-set foreign_key_checks = 1;
+UPDATE `t_flink_app`
+SET `cluster_id` = `app_id`
+WHERE `execution_mode` IN (2,3,5);
+
+ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;
+
+SET foreign_key_checks = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
index 9472e343d..6b216e6ec 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql
@@ -18,5 +18,11 @@
UPDATE t_flink_app
SET flink_cluster_id = t_flink_cluster.id
FROM t_flink_cluster
-where t_flink_app.cluster_id = t_flink_cluster.cluster_id
- and t_flink_app.execution_mode = 5;
+WHERE t_flink_app.cluster_id = t_flink_cluster.cluster_id
+ AND t_flink_app.execution_mode = 5;
+
+UPDATE t_flink_app
+SET cluster_id = app_id
+WHERE execution_mode IN (2,3,5);
+
+ALTER TABLE t_flink_app DROP COLUMN app_id;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 8dd5b4d7f..bf448fd15 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -56,7 +56,8 @@ public class AlertTemplate implements Serializable {
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
+ String url =
+ String.format(format, YarnUtils.getRMWebAppURL(false),
application.getClusterId());
template.setLink(url);
} else {
template.setLink(null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
index 2ba542336..c40c2f8f0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
@@ -17,7 +17,6 @@
package org.apache.streampark.console.core.controller;
-import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.service.ApplicationService;
@@ -32,7 +31,6 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-import java.util.ArrayList;
import java.util.List;
@Tag(name = "FLINK_APPLICATION_HISTORY_TAG")
@@ -61,20 +59,10 @@ public class ApplicationHistoryController {
}
@Operation(summary = "List the session cluster history records")
- @PostMapping("sessionClusterIds")
+ @PostMapping("k8sClusterId")
@RequiresPermissions("app:create")
- public RestResponse listSessionClusterId(int executionMode) {
- List<String> clusterIds;
- switch (ExecutionMode.of(executionMode)) {
- case KUBERNETES_NATIVE_SESSION:
- case YARN_SESSION:
- case REMOTE:
- clusterIds = applicationService.getRecentK8sClusterId(executionMode);
- break;
- default:
- clusterIds = new ArrayList<>(0);
- break;
- }
+ public RestResponse recentK8sClusterId() {
+ List<String> clusterIds = applicationService.getRecentK8sClusterId();
return RestResponse.success(clusterIds);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index cb08078ef..88db35fa1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -83,9 +83,6 @@ public class Application implements Serializable {
/** The name of the frontend and program displayed in yarn */
private String jobName;
- @TableField(updateStrategy = FieldStrategy.IGNORED)
- private String appId;
-
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobId;
@@ -96,7 +93,7 @@ public class Application implements Serializable {
/** flink version */
private Long versionId;
- /** k8s cluster id */
+ /** 1. yarn application id(on yarn) 2. k8s application id (on k8s
application) */
private String clusterId;
/** flink docker base image */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index a1f05be74..cf7b8e9cc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -41,8 +41,7 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
List<String> getRecentK8sNamespace(@Param("limitSize") Integer limit);
- List<String> getRecentK8sClusterId(
- @Param("executionMode") Integer executionMode, @Param("limitSize")
Integer limit);
+ List<String> getRecentK8sClusterId(@Param("limitSize") Integer limit);
List<String> getRecentFlinkBaseImage(@Param("limitSize") Integer limit);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index db347edf9..b9ce6d768 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -111,7 +111,7 @@ public interface ApplicationService extends
IService<Application> {
List<String> getRecentK8sNamespace();
- List<String> getRecentK8sClusterId(Integer executionMode);
+ List<String> getRecentK8sClusterId();
List<String> getRecentFlinkBaseImage();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 21e646d92..f0a2261f9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -603,8 +603,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
@Override
- public List<String> getRecentK8sClusterId(Integer executionMode) {
- return baseMapper.getRecentK8sClusterId(executionMode,
DEFAULT_HISTORY_RECORD_LIMIT);
+ public List<String> getRecentK8sClusterId() {
+ return baseMapper.getRecentK8sClusterId(DEFAULT_HISTORY_RECORD_LIMIT);
}
@Override
@@ -834,56 +834,58 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
!existsByJobName,
"[StreamPark] Application names can't be repeated, copy application
failed.");
- Application oldApp = getById(appParam.getId());
+ Application persist = getById(appParam.getId());
Application newApp = new Application();
String jobName = appParam.getJobName();
newApp.setJobName(jobName);
newApp.setClusterId(
- ExecutionMode.isSessionMode(oldApp.getExecutionModeEnum()) ?
oldApp.getClusterId() : null);
- newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() :
oldApp.getArgs());
- newApp.setVersionId(oldApp.getVersionId());
-
- newApp.setFlinkClusterId(oldApp.getFlinkClusterId());
- newApp.setRestartSize(oldApp.getRestartSize());
- newApp.setJobType(oldApp.getJobType());
- newApp.setOptions(oldApp.getOptions());
- newApp.setDynamicProperties(oldApp.getDynamicProperties());
- newApp.setResolveOrder(oldApp.getResolveOrder());
- newApp.setExecutionMode(oldApp.getExecutionMode());
- newApp.setFlinkImage(oldApp.getFlinkImage());
- newApp.setK8sNamespace(oldApp.getK8sNamespace());
- newApp.setK8sRestExposedType(oldApp.getK8sRestExposedType());
- newApp.setK8sPodTemplate(oldApp.getK8sPodTemplate());
- newApp.setK8sJmPodTemplate(oldApp.getK8sJmPodTemplate());
- newApp.setK8sTmPodTemplate(oldApp.getK8sTmPodTemplate());
- newApp.setK8sHadoopIntegration(oldApp.getK8sHadoopIntegration());
- newApp.setDescription(oldApp.getDescription());
- newApp.setAlertId(oldApp.getAlertId());
- newApp.setCpFailureAction(oldApp.getCpFailureAction());
- newApp.setCpFailureRateInterval(oldApp.getCpFailureRateInterval());
- newApp.setCpMaxFailureInterval(oldApp.getCpMaxFailureInterval());
- newApp.setMainClass(oldApp.getMainClass());
- newApp.setAppType(oldApp.getAppType());
- newApp.setResourceFrom(oldApp.getResourceFrom());
- newApp.setProjectId(oldApp.getProjectId());
- newApp.setModule(oldApp.getModule());
+ ExecutionMode.isSessionMode(persist.getExecutionModeEnum())
+ ? persist.getClusterId()
+ : null);
+ newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() :
persist.getArgs());
+ newApp.setVersionId(persist.getVersionId());
+
+ newApp.setFlinkClusterId(persist.getFlinkClusterId());
+ newApp.setRestartSize(persist.getRestartSize());
+ newApp.setJobType(persist.getJobType());
+ newApp.setOptions(persist.getOptions());
+ newApp.setDynamicProperties(persist.getDynamicProperties());
+ newApp.setResolveOrder(persist.getResolveOrder());
+ newApp.setExecutionMode(persist.getExecutionMode());
+ newApp.setFlinkImage(persist.getFlinkImage());
+ newApp.setK8sNamespace(persist.getK8sNamespace());
+ newApp.setK8sRestExposedType(persist.getK8sRestExposedType());
+ newApp.setK8sPodTemplate(persist.getK8sPodTemplate());
+ newApp.setK8sJmPodTemplate(persist.getK8sJmPodTemplate());
+ newApp.setK8sTmPodTemplate(persist.getK8sTmPodTemplate());
+ newApp.setK8sHadoopIntegration(persist.getK8sHadoopIntegration());
+ newApp.setDescription(persist.getDescription());
+ newApp.setAlertId(persist.getAlertId());
+ newApp.setCpFailureAction(persist.getCpFailureAction());
+ newApp.setCpFailureRateInterval(persist.getCpFailureRateInterval());
+ newApp.setCpMaxFailureInterval(persist.getCpMaxFailureInterval());
+ newApp.setMainClass(persist.getMainClass());
+ newApp.setAppType(persist.getAppType());
+ newApp.setResourceFrom(persist.getResourceFrom());
+ newApp.setProjectId(persist.getProjectId());
+ newApp.setModule(persist.getModule());
newApp.setUserId(serviceHelper.getUserId());
newApp.setState(FlinkAppState.ADDED.getValue());
newApp.setRelease(ReleaseState.NEED_RELEASE.get());
newApp.setOptionState(OptionState.NONE.getValue());
- newApp.setHotParams(oldApp.getHotParams());
+ newApp.setHotParams(persist.getHotParams());
// createTime & modifyTime
Date date = new Date();
newApp.setCreateTime(date);
newApp.setModifyTime(date);
- newApp.setJar(oldApp.getJar());
- newApp.setJarCheckSum(oldApp.getJarCheckSum());
- newApp.setTags(oldApp.getTags());
- newApp.setTeamId(oldApp.getTeamId());
- newApp.setDependency(oldApp.getDependency());
+ newApp.setJar(persist.getJar());
+ newApp.setJarCheckSum(persist.getJarCheckSum());
+ newApp.setTags(persist.getTags());
+ newApp.setTeamId(persist.getTeamId());
+ newApp.setDependency(persist.getDependency());
boolean saved = save(newApp);
if (saved) {
@@ -908,7 +910,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return newApp.getId();
} else {
throw new ApiAlertException(
- "create application from copy failed, copy source app: " +
oldApp.getJobName());
+ "create application from copy failed, copy source app: " +
persist.getJobName());
}
}
@@ -990,7 +992,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setDynamicProperties(appParam.getDynamicProperties());
application.setResolveOrder(appParam.getResolveOrder());
application.setExecutionMode(appParam.getExecutionMode());
- application.setClusterId(appParam.getClusterId());
+ application.setFlinkClusterId(appParam.getFlinkClusterId());
application.setFlinkImage(appParam.getFlinkImage());
application.updateHotParams(appParam);
application.setK8sRestExposedType(appParam.getK8sRestExposedType());
@@ -1285,6 +1287,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public void cancel(Application appParam) throws Exception {
FlinkAppHttpWatcher.setOptionState(appParam.getId(),
OptionState.CANCELLING);
+
Application application = getById(appParam.getId());
application.setState(FlinkAppState.CANCELLING.getValue());
@@ -1293,7 +1296,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setAppId(application.getId());
applicationLog.setJobManagerUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
- applicationLog.setYarnAppId(application.getClusterId());
+ if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ applicationLog.setYarnAppId(application.getClusterId());
+ }
if (appParam.getSavePointed()) {
if (!application.isKubernetesModeJob()) {
@@ -1692,7 +1697,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- application.setAppId(response.clusterId());
+ application.setClusterId(response.clusterId());
applicationLog.setYarnAppId(response.clusterId());
}
@@ -1946,7 +1951,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
case YARN_APPLICATION:
case YARN_PER_JOB:
case YARN_SESSION:
- clusterId = application.getAppId();
+ clusterId = application.getClusterId();
break;
case KUBERNETES_NATIVE_APPLICATION:
clusterId = application.getJobName();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 694afed63..b01aed743 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -88,7 +88,7 @@ public class ExternalLinkServiceImpl extends
ServiceImpl<ExternalLinkMapper, Ext
HashMap<String, String> map = new HashMap();
map.put(PlaceholderType.JOB_ID.get(), app.getJobId());
map.put(PlaceholderType.JOB_NAME.get(), app.getJobName());
- map.put(PlaceholderType.YARN_ID.get(), app.getAppId());
+ map.put(PlaceholderType.YARN_ID.get(), app.getClusterId());
PropertyPlaceholderHelper propertyPlaceholderHelper = new
PropertyPlaceholderHelper("{", "}");
link.setRenderedLinkUrl(
propertyPlaceholderHelper.replacePlaceholders(link.getLinkUrl().trim(),
map::get));
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1240313d9..efc2d295a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -174,7 +174,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
log.info("deploy cluster request: {}", deployRequest);
Future<DeployResponse> future =
bootstrapExecutor.submit(() -> FlinkClient.deploy(deployRequest));
- DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
+ DeployResponse deployResponse = future.get();
if (deployResponse.error() != null) {
throw new ApiDetailException(
"deploy cluster "
@@ -313,7 +313,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
try {
Future<ShutDownResponse> future =
bootstrapExecutor.submit(() -> FlinkClient.shutdown(deployRequest));
- ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
+ ShutDownResponse shutDownResponse = future.get();
if (shutDownResponse.error() != null) {
throw new ApiDetailException(
"shutdown cluster failed, error: \n"
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index b380483cc..5b59c9ff4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -289,16 +289,15 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
applicationLog.setAppId(application.getId());
applicationLog.setJobManagerUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
- applicationLog.setYarnAppId(application.getClusterId());
-
+ if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ applicationLog.setYarnAppId(application.getClusterId());
+ }
if (!application.isKubernetesModeJob()) {
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
application.setOptionTime(new Date());
this.applicationService.updateById(application);
flinkAppHttpWatcher.initialize();
- } else {
- this.applicationService.updateById(application);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
@@ -354,11 +353,13 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
.whenComplete(
(t, e) -> {
applicationLogService.save(applicationLog);
- application.setOptionState(OptionState.NONE.getValue());
- application.setOptionTime(new Date());
- applicationService.update(application);
- flinkAppHttpWatcher.cleanSavepoint(application);
- flinkAppHttpWatcher.initialize();
+ if (!application.isKubernetesModeJob()) {
+ application.setOptionState(OptionState.NONE.getValue());
+ application.setOptionTime(new Date());
+ applicationService.update(application);
+ flinkAppHttpWatcher.cleanSavepoint(application);
+ flinkAppHttpWatcher.initialize();
+ }
});
}
@@ -396,7 +397,9 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
private String getClusterId(Application application, FlinkCluster cluster) {
if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
- return application.getClusterId();
+ return
ExecutionMode.isKubernetesSessionMode(application.getExecutionMode())
+ ? cluster.getClusterId()
+ : application.getClusterId();
} else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
Utils.notNull(
@@ -406,7 +409,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
application.getFlinkClusterId()));
return cluster.getClusterId();
} else {
- return application.getAppId();
+ return application.getClusterId();
}
}
return null;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 33fbcd310..36e115b8f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -701,12 +701,12 @@ public class FlinkAppHttpWatcher {
}
private YarnAppInfo httpYarnAppInfo(Application application) throws
Exception {
- String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
+ String reqURL = "ws/v1/cluster/apps/".concat(application.getClusterId());
return yarnRestRequest(reqURL, YarnAppInfo.class);
}
private Overview httpOverview(Application application) throws IOException {
- String appId = application.getAppId();
+ String appId = application.getClusterId();
if (appId != null) {
if
(application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
@@ -731,7 +731,7 @@ public class FlinkAppHttpWatcher {
String reqURL;
if (StringUtils.isEmpty(application.getJobManagerUrl())) {
String format = "proxy/%s/" + flinkUrl;
- reqURL = String.format(format, application.getAppId());
+ reqURL = String.format(format, application.getClusterId());
} else {
String format = "%s/" + flinkUrl;
reqURL = String.format(format, application.getJobManagerUrl());
@@ -765,7 +765,7 @@ public class FlinkAppHttpWatcher {
String reqURL;
if (StringUtils.isEmpty(application.getJobManagerUrl())) {
String format = "proxy/%s/" + flinkUrl;
- reqURL = String.format(format, application.getAppId(),
application.getJobId());
+ reqURL = String.format(format, application.getClusterId(),
application.getJobId());
} else {
String format = "%s/" + flinkUrl;
reqURL = String.format(format, application.getJobManagerUrl(),
application.getJobId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 47129b165..a006aa085 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -81,17 +81,8 @@ public class FlinkK8sChangeEventListener {
return;
}
- Enumeration.Value inferState =
- FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
- jobStatus.jobState(),
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
-
- FlinkAppState appState = fromK8sFlinkJobState(inferState);
- if (app.getFlinkAppStateEnum() == appState) {
- return;
- }
-
// update application record
- setByJobStatusCV(app, jobStatus, inferState);
+ setByJobStatusCV(app, jobStatus);
applicationService.persistMetrics(app);
@@ -155,8 +146,12 @@ public class FlinkK8sChangeEventListener {
checkpointProcessor.process(applicationService.getById(event.trackId().appId()),
checkPoint);
}
- private void setByJobStatusCV(
- Application app, JobStatusCV jobStatus, Enumeration.Value inferState) {
+ private void setByJobStatusCV(Application app, JobStatusCV jobStatus) {
+ // infer the final flink job state
+ Enumeration.Value state =
+ FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(
+ jobStatus.jobState(),
toK8sFlinkJobState(app.getFlinkAppStateEnum()));
+
// corrective start-time / end-time / duration
long preStartTime = app.getStartTime() != null ?
app.getStartTime().getTime() : 0;
long startTime = Math.max(jobStatus.jobStartTime(), preStartTime);
@@ -164,7 +159,7 @@ public class FlinkK8sChangeEventListener {
long endTime = Math.max(jobStatus.jobEndTime(), preEndTime);
long duration = jobStatus.duration();
- if (FlinkJobState.isEndState(inferState)) {
+ if (FlinkJobState.isEndState(state)) {
if (endTime < startTime) {
endTime = System.currentTimeMillis();
}
@@ -173,7 +168,7 @@ public class FlinkK8sChangeEventListener {
}
}
- app.setState(fromK8sFlinkJobState(inferState).getValue());
+ app.setState(fromK8sFlinkJobState(state).getValue());
app.setJobId(jobStatus.jobId());
app.setTotalTask(jobStatus.taskTotal());
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 3433433a8..a247d61e6 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -25,7 +25,7 @@ insert into `t_team` values (100001, 'test', 'The test team',
now(), now());
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
-insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null,
null, null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
+insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL
Demo', null, null, null, null, null, null, null , null, 100000, 1, null, null,
null, null, null, null, null, '0', 0, null, null, null, null, null, null,
'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(),
now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null,
'streampark,test');
-- ----------------------------
-- Records of t_flink_effective
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index f81e9f1dd..0816a78f1 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -51,7 +51,6 @@ create table if not exists `t_flink_app` (
`options` longtext,
`hot_params` text ,
`user_id` bigint default null,
- `app_id` varchar(64) default null,
`app_type` tinyint default null,
`duration` bigint default null,
`job_id` varchar(64) default null,
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index b6f71849c..679473f63 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -28,7 +28,6 @@
<result column="dynamic_properties" jdbcType="LONGVARCHAR"
property="dynamicProperties"/>
<result column="hot_params" jdbcType="VARCHAR" property="hotParams"/>
<result column="job_name" jdbcType="VARCHAR" property="jobName"/>
- <result column="app_id" jdbcType="VARCHAR" property="appId"/>
<result column="version_id" jdbcType="BIGINT" property="versionId"/>
<result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
<result column="flink_cluster_id" jdbcType="BIGINT"
property="flinkClusterId"/>
@@ -178,13 +177,9 @@
and p.name like '%' || #{application.projectName} || '%'
</if>
</if>
- <if test="application.appId != null and application.appId != ''">
- and t.app_id = #{application.appId}
- </if>
<if test="application.state != null and application.state != ''">
and t.state = #{application.state}
</if>
-
<if test="application.userId != null and application.userId != ''">
and t.user_id = #{application.userId}
</if>
@@ -286,12 +281,12 @@
<update id="mapping"
parameterType="org.apache.streampark.console.core.entity.Application">
update t_flink_app
<set>
+ <if test="application.clusterId != null">
+ cluster_id=#{application.clusterId},
+ </if>
<if test="application.jobId != null">
job_id=#{application.jobId},
</if>
- <if test="application.appId != null">
- app_id=#{application.appId},
- </if>
end_time=null,
state=14,
tracking=1
@@ -314,11 +309,11 @@
<select id="getRecentK8sClusterId" resultType="java.lang.String"
parameterType="java.util.Map">
select cluster_id
from (
- select cluster_id, max(create_time) as ct
- from t_flink_app
- where cluster_id is not null
- and execution_mode = #{executionMode}
- group by cluster_id
+ select c.cluster_id, max(a.create_time) as ct
+ from t_flink_app a inner join t_flink_cluster c
+ on a.flink_cluster_id = c.id
+ where a.execution_mode = 5
+ group by c.cluster_id
order by ct desc
) as ci
limit #{limitSize}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index 348a2f604..955f9104f 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -162,7 +162,7 @@ class AlertServiceTest {
Application application = new Application();
application.setStartTime(new Date());
application.setJobName("Test My Job");
- application.setAppId("1234567890");
+ application.setClusterId("1234567890");
application.setAlertId(1);
application.setRestartCount(5);
@@ -205,7 +205,7 @@ class AlertServiceTest {
duration = application.getEndTime().getTime() -
application.getStartTime().getTime();
}
String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
+ String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getClusterId());
AlertTemplate template = new AlertTemplate();
template.setJobName(application.getJobName());
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
index 3352777ce..11cacb7dd 100644
---
a/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
+++
b/streampark-console/streampark-console-webapp/src/api/flink/app/flinkHistory.ts
@@ -19,7 +19,7 @@ import { defHttp } from '/@/utils/http/axios';
enum HISTORY_API {
UPLOAD_JARS = '/flink/history/uploadJars',
K8S_NAMESPACES = '/flink/history/k8sNamespaces',
- SESSION_CLUSTER_IDS = '/flink/history/sessionClusterIds',
+ K8S_SESSION_CLUSTER_ID = '/flink/history/k8sClusterId',
FLINK_BASE_IMAGES = '/flink/history/flinkBaseImages',
FLINK_POD_TEMPLATES = '/flink/history/flinkPodTemplates',
FLINK_JM_POD_TEMPLATES = '/flink/history/flinkJmPodTemplates',
@@ -42,8 +42,8 @@ export function fetchUploadJars(): Promise<string[]> {
return defHttp.post({ url: HISTORY_API.UPLOAD_JARS });
}
-export function fetchSessionClusterIds(data) {
- return defHttp.post({ url: HISTORY_API.SESSION_CLUSTER_IDS, data });
+export function fetchK8sSessionClusterId() {
+ return defHttp.post({ url: HISTORY_API.K8S_SESSION_CLUSTER_ID });
}
export function fetchFlinkBaseImages() {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index ed8f04c93..8f40c8f2a 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -204,7 +204,7 @@
return;
}
const value = await handleGetApplication();
- setFieldsValue(value);
+ await setFieldsValue(value);
if (app.resourceFrom == ResourceFromEnum.CICD) {
jars.value = await fetchListJars({
id: app.projectId,
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
index 190aa6cd3..c22f3a46a 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useApp.tsx
@@ -333,10 +333,10 @@ export const useFlinkApplication = (openStartModal: Fn)
=> {
].includes(app.executionMode) && (
<Form.Item
label="YARN Application Id"
- name="appId"
+ name="clusterId"
rules={[{ required: true, message: 'YARN ApplicationId is
required' }]}
>
- <Input type="text" placeholder="ApplicationId"
v-model:value={formValue.appId} />
+ <Input type="text" placeholder="ApplicationId"
v-model:value={formValue.clusterId} />
</Form.Item>
)}
<Form.Item
@@ -356,7 +356,7 @@ export const useFlinkApplication = (openStartModal: Fn) => {
await mappingRef.value.validate();
await fetchMapping({
id: app.id,
- appId: formValue.appId,
+ clusterId: formValue.clusterId || null,
jobId: formValue.jobId,
});
Swal.fire({
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index fd8af4ef6..09e58f493 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -42,7 +42,7 @@ import { fetchVariableAll } from '/@/api/flink/variable';
import {
fetchFlinkBaseImages,
fetchK8sNamespaces,
- fetchSessionClusterIds,
+ fetchK8sSessionClusterId,
} from '/@/api/flink/app/flinkHistory';
import { fetchSelect } from '/@/api/flink/project';
import { fetchAlertSetting } from '/@/api/flink/setting/alert';
@@ -584,13 +584,13 @@ export const useCreateAndEditSchema = (
});
//get flinkCluster
- fetchFlinkCluster().then((res) => {
- flinkClusters.value = res;
+ fetchFlinkCluster().then((resp) => {
+ flinkClusters.value = resp;
});
- fetchK8sNamespaces().then((res) => {
- historyRecord.k8sNamespace = res;
+ fetchK8sNamespaces().then((resp) => {
+ historyRecord.k8sNamespace = resp;
});
- fetchSessionClusterIds({ executionMode: ExecModeEnum.KUBERNETES_SESSION
}).then((res) => {
+ fetchK8sSessionClusterId().then((res) => {
historyRecord.k8sSessionClusterId = res;
});
fetchFlinkBaseImages().then((res) => {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 87274d832..9bcc01284 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -115,9 +115,9 @@ export async function handleView(app: AppListRecord, yarn:
Nullable<string>) {
) {
if (!yarn) {
const res = await fetchYarn();
- window.open(res + '/proxy/' + app['appId'] + '/');
+ window.open(res + '/proxy/' + app['clusterId'] + '/');
} else {
- window.open(yarn + '/proxy/' + app['appId'] + '/');
+ window.open(yarn + '/proxy/' + app['clusterId'] + '/');
}
} else {
if (app.flinkRestUrl) {
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
index dcbf76d9a..14451b34e 100644
---
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkCluster/useClusterSetting.ts
@@ -33,7 +33,7 @@ import optionData from '../../flink/app/data/option';
import {
fetchFlinkBaseImages,
fetchK8sNamespaces,
- fetchSessionClusterIds,
+ fetchK8sSessionClusterId,
} from '/@/api/flink/app/flinkHistory';
import { handleFormValue } from '../../flink/app/utils';
import { useMessage } from '/@/hooks/web/useMessage';
@@ -371,9 +371,7 @@ export const useClusterSetting = () => {
fetchK8sNamespaces().then((res) => {
historyRecord.k8sNamespace = res;
});
- fetchSessionClusterIds({
- executionMode: ExecModeEnum.KUBERNETES_SESSION,
- }).then((res) => {
+ fetchK8sSessionClusterId().then((res) => {
historyRecord.k8sSessionClusterId = res;
});
fetchFlinkBaseImages().then((res) => {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
index f56065e9d..fcd4e88e0 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/enums/FlinkJobState.scala
@@ -41,7 +41,7 @@ object FlinkJobState extends Enumeration {
SUSPENDED, RECONCILING = Value
// ending flink states, the tracking monitor will stop tracking these states
of flink job.
- val endingStates = Seq(FAILED, CANCELED, FINISHED, POS_TERMINATED,
TERMINATED, LOST)
+ private val endingStates = Seq(FAILED, CANCELED, FINISHED, POS_TERMINATED,
TERMINATED, LOST)
def of(value: String): FlinkJobState.Value = {
this.values.find(_.toString == value).getOrElse(OTHER)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 632dc8b58..9c5590f0a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -116,27 +116,24 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
val sessionIds = trackIds.filter(_.executeMode ==
FlinkK8sExecuteMode.SESSION)
val sessionCluster =
sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
val sessionFuture = sessionCluster.map {
- id =>
- val future = Future(touchSessionAllJob(id))
+ trackId =>
+ val future = Future(touchSessionAllJob(trackId))
future.onComplete(_.toOption match {
case Some(map) =>
- sessionIds.foreach(
- id => {
- map.find(_._1.jobId == id.jobId) match {
- case Some(job) =>
- updateState(job._1.copy(appId = id.appId), job._2)
+ map.find(_._1.jobId == trackId.jobId) match {
+ case Some(job) =>
+ updateState(job._1.copy(appId = trackId.appId), job._2)
+ case _ =>
+ touchSessionJob(trackId) match {
+ case Some(state) =>
+ if (state.jobState == FlinkJobState.LOST) {
+ // can't find that job in the k8s cluster.
+ watchController.unWatching(trackId)
+ }
+ eventBus.postSync(FlinkJobStatusChangeEvent(trackId,
state))
case _ =>
- // can't find that job in the k8s cluster.
- watchController.unWatching(id)
- val lostState = JobStatusCV(
- jobState = FlinkJobState.LOST,
- jobId = id.jobId,
- pollEmitTime = System.currentTimeMillis,
- pollAckTime = System.currentTimeMillis
- )
- eventBus.postSync(FlinkJobStatusChangeEvent(id,
lostState))
}
- })
+ }
case _ =>
})
future
@@ -221,6 +218,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
watchController.trackIds.update(trackId)
+
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}