This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 6fa369b36 [Bug] deploy job on k8s session mode get job state bug fixed 
(#3489)
6fa369b36 is described below

commit 6fa369b36165543b2879387b20cc74775e338e38
Author: benjobs <[email protected]>
AuthorDate: Wed Jan 17 08:51:12 2024 +0800

    [Bug] deploy job on k8s session mode get job state bug fixed (#3489)
    
    * [Improve] k8s deploy-cluster | start-app error info improvement
    
    * [Improve] deploy yarn session cluster improvement
    
    * [Improve] FlinkClient instance error issue fixed
    
    * [Improve] job on k8s session mode state bug fixed
    
    * [Improve] FE i18n improvement
    
    * [Bug] k8s application mode job cancel bug fixed.
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/conf/ConfigConst.scala       |   2 +
 .../console/core/entity/Application.java           |  46 +++-
 .../console/core/service/ApplicationService.java   |   4 +
 .../core/service/impl/AppBuildPipeServiceImpl.java |  17 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 176 ++++++++++-----
 .../core/service/impl/FlinkClusterServiceImpl.java |  36 ++-
 .../core/service/impl/YarnQueueServiceImpl.java    |   2 +-
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  69 ++----
 .../console/core/task/FlinkRESTAPIWatcher.java     |  11 +-
 .../src/api/flink/app/app.type.ts                  |   1 +
 .../Application/src/AppDarkModeToggle.vue          |   4 +-
 .../src/components/ContextMenu/src/ContextMenu.vue |   4 +-
 .../src/components/Form/src/BasicForm.vue          |   2 +-
 .../src/components/Page/src/PageFooter.vue         |   4 +-
 .../components/Table/src/components/HeaderCell.vue |   2 +-
 .../src/hooks/web/useLockPage.ts                   |   9 +-
 .../src/locales/lang/en/flink/app.ts               |  15 +-
 .../src/locales/lang/zh-CN/flink/app.ts            |  14 +-
 .../src/locales/lang/zh-CN/setting/flinkCluster.ts |   3 +-
 .../streampark-console-webapp/src/utils/props.ts   |   2 +-
 .../src/views/base/login/Login.vue                 |   5 +-
 .../src/views/flink/app/EditFlink.vue              |  11 +-
 .../src/views/flink/app/EditStreamPark.vue         |  19 +-
 .../flink/app/hooks/useCreateAndEditSchema.ts      | 108 ++++-----
 .../src/views/flink/app/hooks/useFlinkRender.tsx   |  48 +++-
 .../src/views/flink/app/utils/index.ts             |  19 +-
 .../flink/client/bean/CancelRequest.scala          |  12 +-
 .../flink/client/bean/DeployRequest.scala          |   8 +-
 .../flink/client/bean/DeployResponse.scala         |   2 +-
 .../flink/client/bean/ShutDownResponse.scala       |   2 +-
 .../client/bean/TriggerSavepointRequest.scala      |  12 +-
 .../impl/KubernetesNativeSessionClient.scala       |  34 ++-
 .../flink/client/impl/YarnSessionClient.scala      |  33 ++-
 .../flink/client/trait/FlinkClientTrait.scala      |   7 +
 .../flink/kubernetes/FlinkK8sWatchController.scala |  19 +-
 .../flink/kubernetes/model/ClusterKey.scala        |  19 +-
 .../flink/kubernetes/model/JobStatusCV.scala       |  10 +-
 .../flink/kubernetes/model/K8sEventKey.scala       |  16 +-
 .../flink/kubernetes/model/TrackId.scala           |   4 +-
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 247 +++++++++++----------
 .../impl/FlinkK8sApplicationBuildPipeline.scala    |  17 +-
 41 files changed, 664 insertions(+), 411 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index e033a6f85..5517cc302 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -54,6 +54,8 @@ object ConfigConst {
   /** kerberos */
   val KEY_KERBEROS = "kerberos"
 
+  val KEY_KERBEROS_SERVICE_ACCOUNT = "kubernetes.service-account"
+
   val KEY_HADOOP_USER_NAME = "HADOOP_USER_NAME"
 
   /** hadoop.security.authentication */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4b03ee57b..8560a28c9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -239,6 +239,7 @@ public class Application implements Serializable {
   private transient String createTimeTo;
   private transient String backUpDescription;
   private transient String yarnQueue;
+  private transient String serviceAccount;
 
   /** Flink Web UI Url */
   private transient String flinkRestUrl;
@@ -281,20 +282,34 @@ public class Application implements Serializable {
     this.tracking = shouldTracking(appState);
   }
 
-  public void setYarnQueueByHotParams() {
-    if (!(ExecutionMode.YARN_APPLICATION == this.getExecutionModeEnum()
-        || ExecutionMode.YARN_PER_JOB == this.getExecutionModeEnum())) {
+  public void setByHotParams() {
+    Map<String, Object> hotParamsMap = this.getHotParamsMap();
+    if (hotParamsMap.isEmpty()) {
       return;
     }
 
-    Map<String, Object> hotParamsMap = this.getHotParamsMap();
-    if (!hotParamsMap.isEmpty() && 
hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
-      String yarnQueue = 
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
-      String labelExpr =
-          
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
-              .map(Object::toString)
-              .orElse(null);
-      this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, 
labelExpr).toString());
+    switch (getExecutionModeEnum()) {
+      case YARN_APPLICATION:
+      case YARN_PER_JOB:
+        // 1) set yarnQueue from hostParam
+        if (hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
+          String yarnQueue = 
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
+          String labelExpr =
+              
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
+                  .map(Object::toString)
+                  .orElse(null);
+          this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, 
labelExpr).toString());
+        }
+        break;
+      case KUBERNETES_NATIVE_APPLICATION:
+        // 2) service-account.
+        Object serviceAccount = 
hotParamsMap.get(ConfigConst.KEY_KERBEROS_SERVICE_ACCOUNT());
+        if (serviceAccount != null) {
+          this.setServiceAccount(serviceAccount.toString());
+        }
+        break;
+      default:
+        break;
     }
   }
 
@@ -483,6 +498,10 @@ public class Application implements Serializable {
     return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
   }
 
+  public boolean isKubernetesModeJob() {
+    return ExecutionMode.isKubernetesMode(this.getExecutionModeEnum());
+  }
+
   @JsonIgnore
   @SneakyThrows
   public MavenDependency getMavenDependency() {
@@ -570,6 +589,11 @@ public class Application implements Serializable {
     if (needFillYarnQueueLabel(executionModeEnum)) {
       
hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue()));
     }
+    if (executionModeEnum == ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+      if (StringUtils.isNotBlank(appParam.getServiceAccount())) {
+        hotParams.put(ConfigConst.KEY_KERBEROS_SERVICE_ACCOUNT(), 
appParam.getServiceAccount());
+      }
+    }
     if (!hotParams.isEmpty()) {
       this.setHotParams(JacksonUtils.write(hotParams));
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 5ffa7a02e..42d30fec5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -23,6 +23,8 @@ import 
org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.AppExistsState;
 
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.IService;
 import org.springframework.web.multipart.MultipartFile;
@@ -124,4 +126,6 @@ public interface ApplicationService extends 
IService<Application> {
   String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception;
 
   AppExistsState checkStart(Application app);
+
+  List<ApplicationReport> getYARNApplication(String appName);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 6834d940d..1e6042b89 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -30,6 +30,7 @@ import 
org.apache.streampark.console.core.entity.AppBuildPipeline;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ApplicationConfig;
 import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.FlinkSql;
 import org.apache.streampark.console.core.entity.Message;
@@ -45,6 +46,7 @@ import 
org.apache.streampark.console.core.service.ApplicationConfigService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
 import org.apache.streampark.console.core.service.MessageService;
@@ -125,6 +127,8 @@ public class AppBuildPipeServiceImpl
 
   @Autowired private ApplicationService applicationService;
 
+  @Autowired private FlinkClusterService flinkClusterService;
+
   @Autowired private ApplicationLogService applicationLogService;
 
   @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
@@ -327,6 +331,13 @@ public class AppBuildPipeServiceImpl
         log.info("Submit params to building pipeline : {}", buildRequest);
         return FlinkRemoteBuildPipeline.of(buildRequest);
       case KUBERNETES_NATIVE_SESSION:
+        String k8sNamespace = app.getK8sNamespace();
+        String clusterId = app.getClusterId();
+        if (app.getFlinkClusterId() != null) {
+          FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+          k8sNamespace = flinkCluster.getK8sNamespace();
+          clusterId = flinkCluster.getClusterId();
+        }
         FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
             new FlinkK8sSessionBuildRequest(
                 app.getJobName(),
@@ -337,8 +348,8 @@ public class AppBuildPipeServiceImpl
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
                 app.getMavenArtifact(),
-                app.getClusterId(),
-                app.getK8sNamespace());
+                clusterId,
+                k8sNamespace);
         log.info("Submit params to building pipeline : {}", 
k8sSessionBuildRequest);
         return FlinkK8sSessionBuildPipeline.of(k8sSessionBuildRequest);
       case KUBERNETES_NATIVE_APPLICATION:
@@ -352,7 +363,7 @@ public class AppBuildPipeServiceImpl
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
                 app.getMavenArtifact(),
-                app.getClusterId(),
+                app.getJobName(),
                 app.getK8sNamespace(),
                 app.getFlinkImage(),
                 app.getK8sPodTemplates(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c27e5772e..9e827b5f8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
@@ -78,6 +79,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -155,9 +157,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
 import static org.apache.streampark.common.enums.StorageType.LFS;
-import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
-import static 
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
 
 @Slf4j
 @Service
@@ -221,6 +223,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private YarnQueueService yarnQueueService;
 
+  @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
@@ -515,9 +519,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 record -> {
                   // status of flink job on kubernetes mode had been 
automatically persisted to db
                   // in time.
-                  if (isKubernetesApp(record)) {
+                  if (record.isKubernetesModeJob()) {
                     // set duration
-                    String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
+                    String restUrl =
+                        
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
                     record.setFlinkRestUrl(restUrl);
                     if (record.getTracking() == 1
                         && record.getStartTime() != null
@@ -656,7 +661,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       return AppExistsState.INVALID;
     }
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-      boolean exists = 
!getApplicationReports(application.getJobName()).isEmpty();
+      boolean exists = !getYARNApplication(application.getJobName()).isEmpty();
       return exists ? AppExistsState.IN_YARN : AppExistsState.NO;
     }
     // todo on k8s check...
@@ -706,7 +711,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         }
         // check whether clusterId, namespace, jobId on kubernetes
         else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
-            && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(app))) {
+            && 
flinkK8sWatcher.checkIsInRemoteCluster(k8sWatcherWrapper.toTrackId(app))) {
           return AppExistsState.IN_KUBERNETES;
         }
       }
@@ -722,7 +727,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
       // check whether clusterId, namespace, jobId on kubernetes
       else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
-          && flinkK8sWatcher.checkIsInRemoteCluster(toTrackId(appParam))) {
+          && 
flinkK8sWatcher.checkIsInRemoteCluster(k8sWatcherWrapper.toTrackId(appParam))) {
         return AppExistsState.IN_KUBERNETES;
       }
     }
@@ -951,7 +956,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     application.setExecutionMode(appParam.getExecutionMode());
     application.setClusterId(appParam.getClusterId());
     application.setFlinkImage(appParam.getFlinkImage());
-    application.setK8sNamespace(appParam.getK8sNamespace());
     application.updateHotParams(appParam);
     application.setK8sRestExposedType(appParam.getK8sRestExposedType());
     application.setK8sPodTemplate(appParam.getK8sPodTemplate());
@@ -974,6 +978,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       case YARN_PER_JOB:
       case KUBERNETES_NATIVE_APPLICATION:
         application.setFlinkClusterId(null);
+        if (appParam.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+          application.setK8sNamespace(appParam.getK8sNamespace());
+          application.setServiceAccount(appParam.getServiceAccount());
+          application.doSetHotParams();
+        }
         break;
       case REMOTE:
       case YARN_SESSION:
@@ -1143,7 +1152,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     CompletableFuture<SubmitResponse> startFuture = 
startFutureMap.remove(app.getId());
     CompletableFuture<CancelResponse> cancelFuture = 
cancelFutureMap.remove(app.getId());
     Application application = this.baseMapper.getApp(app);
-    if (isKubernetesApp(application)) {
+    if (application.isKubernetesModeJob()) {
       KubernetesDeploymentHelper.watchPodTerminatedLog(
           application.getK8sNamespace(), application.getJobName(), 
application.getJobId());
     }
@@ -1194,8 +1203,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
     }
     // add flink web url info for k8s-mode
-    if (isKubernetesApp(application)) {
-      String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
+    if (application.isKubernetesModeJob()) {
+      String restUrl = 
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(application));
       application.setFlinkRestUrl(restUrl);
 
       // set duration
@@ -1206,9 +1215,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         application.setDuration(now - application.getStartTime().getTime());
       }
     }
-
-    application.setYarnQueueByHotParams();
-
+    application.setByHotParams();
     return application;
   }
 
@@ -1231,8 +1238,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   public boolean mapping(Application appParam) {
     boolean mapping = this.baseMapper.mapping(appParam);
     Application application = getById(appParam.getId());
-    if (isKubernetesApp(application)) {
-      flinkK8sWatcher.doWatching(toTrackId(application));
+    if (application.isKubernetesModeJob()) {
+      flinkK8sWatcher.doWatching(k8sWatcherWrapper.toTrackId(application));
     } else {
       FlinkRESTAPIWatcher.doWatching(application);
     }
@@ -1278,23 +1285,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
     }
 
-    String clusterId = null;
-    if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
-      clusterId = application.getClusterId();
-    } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-      if 
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        ApiAlertException.throwIfNull(
-            cluster,
-            String.format(
-                "The yarn session clusterId=%s can't found, maybe the 
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
-                application.getFlinkClusterId()));
-        clusterId = cluster.getClusterId();
-      } else {
-        clusterId = application.getAppId();
-      }
-    }
-
     Map<String, Object> properties = new HashMap<>();
 
     if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
@@ -1310,6 +1300,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       properties.put(RestOptions.PORT.key(), activeAddress.getPort());
     }
 
+    Tuple2<String, String> clusterIdNamespace = 
getNamespaceClusterId(application);
+    String namespace = clusterIdNamespace._1;
+    String clusterId = clusterIdNamespace._2;
+
     CancelRequest cancelRequest =
         new CancelRequest(
             flinkEnv.getFlinkVersion(),
@@ -1320,7 +1314,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             appParam.getSavePointed(),
             appParam.getDrain(),
             customSavepoint,
-            application.getK8sNamespace());
+            namespace);
 
     final Date triggerTime = new Date();
     CompletableFuture<CancelResponse> cancelFuture =
@@ -1328,7 +1322,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
-    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
+    TrackId trackId =
+        application.isKubernetesModeJob() ? 
k8sWatcherWrapper.toTrackId(application) : null;
 
     cancelFuture.whenComplete(
         (cancelResponse, throwable) -> {
@@ -1352,7 +1347,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 savePointService.expire(application.getId());
               }
               // re-tracking flink job on kubernetes and logging exception
-              if (isKubernetesApp(application)) {
+              if (application.isKubernetesModeJob()) {
                 flinkK8sWatcher.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1378,7 +1373,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             savePointService.save(savePoint);
           }
 
-          if (isKubernetesApp(application)) {
+          if (application.isKubernetesModeJob()) {
             flinkK8sWatcher.unWatching(trackId);
           }
         });
@@ -1450,7 +1445,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // check job on yarn is already running
     if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
       ApiAlertException.throwIfTrue(
-          !getApplicationReports(application.getJobName()).isEmpty(),
+          !getYARNApplication(application.getJobName()).isEmpty(),
           "The same job name is already running in the yarn queue");
     }
 
@@ -1569,6 +1564,27 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     String applicationArgs =
         variableService.replaceVariable(application.getTeamId(), 
application.getArgs());
 
+    String k8sNamespace = null;
+    String k8sClusterId = null;
+    FlinkK8sRestExposedType exposedType = null;
+    if (application.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
+      // For compatibility with historical versions
+      if (application.getFlinkClusterId() == null) {
+        k8sClusterId = application.getClusterId();
+        k8sNamespace = application.getK8sNamespace();
+        exposedType = application.getK8sRestExposedTypeEnum();
+      } else {
+        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+        k8sClusterId = cluster.getClusterId();
+        k8sNamespace = cluster.getK8sNamespace();
+        exposedType = cluster.getK8sRestExposedTypeEnum();
+      }
+    } else if (application.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+      k8sClusterId = application.getJobName();
+      k8sNamespace = application.getK8sNamespace();
+      exposedType = application.getK8sRestExposedTypeEnum();
+    }
+
     SubmitRequest submitRequest =
         SubmitRequest.apply(
             flinkEnv.getFlinkVersion(),
@@ -1585,11 +1601,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             applicationArgs,
             buildResult,
             extraParameter,
-            application.getClusterId(),
-            application.getK8sNamespace(),
-            application.getK8sRestExposedTypeEnum());
+            k8sClusterId,
+            k8sNamespace,
+            exposedType);
 
-    TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
     CompletableFuture<SubmitResponse> future =
         CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), 
executorService);
 
@@ -1614,7 +1629,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               app.setState(FlinkAppState.FAILED.getValue());
               app.setOptionState(OptionState.NONE.getValue());
               updateById(app);
-              if (isKubernetesApp(app)) {
+              if (app.isKubernetesModeJob()) {
+                TrackId trackId = k8sWatcherWrapper.toTrackId(application);
                 flinkK8sWatcher.unWatching(trackId);
               } else {
                 FlinkRESTAPIWatcher.unWatching(appParam.getId());
@@ -1635,7 +1651,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
             }
           }
-          application.setAppId(response.clusterId());
+
+          if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+            application.setAppId(response.clusterId());
+            applicationLog.setYarnAppId(response.clusterId());
+          }
+
           if (StringUtils.isNoneEmpty(response.jobId())) {
             application.setJobId(response.jobId());
           }
@@ -1644,18 +1665,21 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.setJobManagerUrl(response.jobManagerUrl());
             applicationLog.setJobManagerUrl(response.jobManagerUrl());
           }
-          applicationLog.setYarnAppId(response.clusterId());
+
           application.setStartTime(new Date());
           application.setEndTime(null);
 
           // if start completed, will be added task to tracking queue
-          if (isKubernetesApp(application)) {
+          if (application.isKubernetesModeJob()) {
             log.info(
                 "start job {} on {} success, doWatching...",
                 application.getJobName(),
                 application.getExecutionModeEnum().getName());
             application.setRelease(ReleaseState.DONE.get());
+
+            TrackId trackId = k8sWatcherWrapper.toTrackId(application);
             flinkK8sWatcher.doWatching(trackId);
+
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
               String domainName = settingService.getIngressModeDefault();
               if (StringUtils.isNotBlank(domainName)) {
@@ -1735,7 +1759,18 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     Map<String, String> dynamicProperties =
         
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
+
     properties.putAll(dynamicProperties);
+
+    String kerberosKeySvcAccount = ConfigConst.KEY_KERBEROS_SERVICE_ACCOUNT();
+    String svcAcc1 = (String) 
application.getHotParamsMap().get(kerberosKeySvcAccount);
+    String svcAcc2 = dynamicProperties.get(kerberosKeySvcAccount);
+    if (svcAcc1 != null) {
+      properties.put(kerberosKeySvcAccount, svcAcc1);
+    } else if (svcAcc2 != null) {
+      properties.put(kerberosKeySvcAccount, svcAcc2);
+    }
+
     ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder());
     if (resolveOrder != null) {
       properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), 
resolveOrder.getName());
@@ -1752,8 +1787,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     updateById(application);
     savePointService.expire(application.getId());
     // re-tracking flink job on kubernetes and logging exception
-    if (isKubernetesApp(application)) {
-      TrackId id = toTrackId(application);
+    if (application.isKubernetesModeJob()) {
+      TrackId id = k8sWatcherWrapper.toTrackId(application);
       flinkK8sWatcher.doWatching(id);
     } else {
       FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1761,7 +1796,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     // kill application
     if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
       try {
-        List<ApplicationReport> applications = 
getApplicationReports(application.getJobName());
+        List<ApplicationReport> applications = 
getYARNApplication(application.getJobName());
         if (!applications.isEmpty()) {
           YarnClient yarnClient = HadoopUtils.yarnClient();
           yarnClient.killApplication(applications.get(0).getApplicationId());
@@ -1821,8 +1856,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     if (isYarnNotDefaultQueue(newApp)) {
       return true;
     }
-
-    oldApp.setYarnQueueByHotParams();
+    oldApp.setByHotParams();
     if (ExecutionMode.isYarnPerJobOrAppMode(newApp.getExecutionModeEnum())
         && StringUtils.equals(oldApp.getYarnQueue(), newApp.getYarnQueue())) {
       return true;
@@ -1843,7 +1877,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         || yarnQueueService.isDefaultQueue(application.getYarnQueue());
   }
 
-  private List<ApplicationReport> getApplicationReports(String jobName) {
+  @Override
+  public List<ApplicationReport> getYARNApplication(String appName) {
     try {
       YarnClient yarnClient = HadoopUtils.yarnClient();
       Set<String> types =
@@ -1859,10 +1894,45 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       Set<String> yarnTag = Sets.newHashSet("streampark");
       List<ApplicationReport> applications = yarnClient.getApplications(types, 
states, yarnTag);
       return applications.stream()
-          .filter(report -> report.getName().equals(jobName))
+          .filter(report -> report.getName().equals(appName))
           .collect(Collectors.toList());
     } catch (Exception e) {
       throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
     }
   }
+
+  private Tuple2<String, String> getNamespaceClusterId(Application 
application) {
+    String clusterId = null;
+    String k8sNamespace = null;
+    FlinkCluster cluster;
+    switch (application.getExecutionModeEnum()) {
+      case YARN_APPLICATION:
+      case YARN_PER_JOB:
+      case YARN_SESSION:
+        clusterId = application.getAppId();
+        break;
+      case KUBERNETES_NATIVE_APPLICATION:
+        clusterId = application.getJobName();
+        k8sNamespace = application.getK8sNamespace();
+        break;
+      case KUBERNETES_NATIVE_SESSION:
+        if (application.getFlinkClusterId() == null) {
+          clusterId = application.getClusterId();
+          k8sNamespace = application.getK8sNamespace();
+        } else {
+          cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+          ApiAlertException.throwIfNull(
+              cluster,
+              String.format(
+                  "The Kubernetes session clusterId=%s can't found, maybe the 
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
+                  application.getFlinkClusterId()));
+          clusterId = cluster.getClusterId();
+          k8sNamespace = cluster.getK8sNamespace();
+        }
+        break;
+      default:
+        break;
+    }
+    return Tuple2.apply(k8sNamespace, clusterId);
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index ef1f4a283..d309eb5d3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -150,14 +151,30 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   @Transactional(rollbackFor = {Exception.class})
   public void start(Long id) {
     FlinkCluster flinkCluster = getById(id);
+    ApiAlertException.throwIfTrue(flinkCluster == null, "Invalid id, no 
related cluster found.");
+    ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+    if (executionModeEnum == ExecutionMode.YARN_SESSION) {
+      ApiAlertException.throwIfTrue(
+          
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
+          "The application name: "
+              + flinkCluster.getClusterName()
+              + " is already running in the yarn queue, please check!");
+    }
+
     try {
-      ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
       log.info("deploy cluster request: " + deployRequest);
+
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
-      if (deployResponse != null) {
+      if (deployResponse.error() != null) {
+        throw new ApiDetailException(
+            "deploy cluster "
+                + flinkCluster.getClusterName()
+                + "failed, exception:\n"
+                + Utils.stringifyException(deployResponse.error()));
+      } else {
         if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
           String address =
               YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
@@ -169,9 +186,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         flinkCluster.setClusterState(ClusterState.STARTED.getValue());
         flinkCluster.setException(null);
         updateById(flinkCluster);
-      } else {
-        throw new ApiAlertException(
-            "deploy cluster failed, unknown reason,please check you params or 
StreamPark error log");
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
@@ -191,13 +205,15 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
             flinkEnv.getFlinkVersion(),
             executionModeEnum,
             flinkCluster.getProperties(),
-            flinkCluster.getClusterId());
+            flinkCluster.getClusterId(),
+            flinkCluster.getClusterName());
       case KUBERNETES_NATIVE_SESSION:
         return KubernetesDeployRequest.apply(
             flinkEnv.getFlinkVersion(),
             executionModeEnum,
             flinkCluster.getProperties(),
             flinkCluster.getClusterId(),
+            flinkCluster.getClusterName(),
             flinkCluster.getK8sNamespace(),
             flinkCluster.getK8sConf(),
             flinkCluster.getServiceAccount(),
@@ -279,12 +295,14 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       Future<ShutDownResponse> future =
           executorService.submit(() -> FlinkClient.shutdown(deployRequest));
       ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
-      if (shutDownResponse != null) {
+      if (shutDownResponse.error() != null) {
+        throw new ApiDetailException(
+            "shutdown cluster failed, error: \n"
+                + Utils.stringifyException(shutDownResponse.error()));
+      } else {
         flinkCluster.setAddress(null);
         flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
         updateById(flinkCluster);
-      } else {
-        throw new ApiAlertException("get shutdown response failed");
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index cabde3de2..5cafab958 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -235,7 +235,7 @@ public class YarnQueueServiceImpl extends 
ServiceImpl<YarnQueueMapper, YarnQueue
             .stream()
             .filter(
                 application -> {
-                  application.setYarnQueueByHotParams();
+                  application.setByHotParams();
                   return StringUtils.equals(application.getYarnQueue(), 
queueLabel);
                 })
             .collect(Collectors.toList());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 6a113a56e..71724e41b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -19,13 +19,14 @@ package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
 import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -42,8 +43,6 @@ import javax.annotation.Nonnull;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import scala.Enumeration;
-
 import static 
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8sFlinkJobState;
 
 /**
@@ -63,6 +62,8 @@ public class FlinkK8sWatcherWrapper {
 
   @Lazy @Autowired private ApplicationService applicationService;
 
+  @Lazy @Autowired private FlinkClusterService flinkClusterService;
+
   /** Register FlinkTrackMonitor bean for tracking flink job on kubernetes. */
   @Bean(destroyMethod = "close")
   public FlinkK8sWatcher registerFlinkK8sWatcher() {
@@ -102,15 +103,6 @@ public class FlinkK8sWatcherWrapper {
     if (CollectionUtils.isEmpty(k8sApplication)) {
       return Lists.newArrayList();
     }
-    // correct corrupted data
-    List<Application> correctApps =
-        k8sApplication.stream()
-            .filter(app -> !Bridge.toTrackId(app).isLegal())
-            .collect(Collectors.toList());
-    if (CollectionUtils.isNotEmpty(correctApps)) {
-      applicationService.saveOrUpdateBatch(correctApps);
-    }
-
     // filter out the application that should be tracking
     return k8sApplication.stream()
         .filter(
@@ -121,43 +113,30 @@ public class FlinkK8sWatcherWrapper {
                   
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(), 
app.getClusterId());
               return !isEndState || deploymentExists;
             })
-        .map(Bridge::toTrackId)
+        .map(this::toTrackId)
         .collect(Collectors.toList());
   }
 
-  /** Type converter bridge */
-  public static class Bridge {
-
-    // covert Application to TrackId
-    public static TrackId toTrackId(@Nonnull Application app) {
-
-      Enumeration.Value mode = 
FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
-      if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
-        return TrackId.onApplication(
-            app.getK8sNamespace(),
-            app.getClusterId(),
-            app.getId(),
-            app.getJobId(),
-            app.getTeamId().toString());
-      } else if (FlinkK8sExecuteMode.SESSION().equals(mode)) {
-        return TrackId.onSession(
-            app.getK8sNamespace(),
-            app.getClusterId(),
-            app.getId(),
-            app.getJobId(),
-            app.getTeamId().toString());
-      } else {
-        throw new IllegalArgumentException(
-            "Illegal K8sExecuteMode, mode=" + app.getExecutionMode());
+  public TrackId toTrackId(Application app) {
+    if (app.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
+      return TrackId.onApplication(
+          app.getK8sNamespace(),
+          app.getJobName(),
+          app.getId(),
+          app.getJobId(),
+          app.getTeamId().toString());
+    } else if (app.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
+      String namespace = app.getK8sNamespace();
+      String clusterId = app.getClusterId();
+      if (app.getFlinkClusterId() != null) {
+        FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+        namespace = flinkCluster.getK8sNamespace();
+        clusterId = flinkCluster.getClusterId();
       }
+      return TrackId.onSession(
+          namespace, clusterId, app.getId(), app.getJobId(), 
app.getTeamId().toString());
+    } else {
+      throw new IllegalArgumentException("Illegal K8sExecuteMode, mode=" + 
app.getExecutionMode());
     }
   }
-
-  /** Determine if application it is flink-on-kubernetes mode. */
-  public static boolean isKubernetesApp(Application application) {
-    if (application == null) {
-      return false;
-    }
-    return ExecutionMode.isKubernetesMode(application.getExecutionMode());
-  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index fab29a74e..f1b1b5a61 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -597,7 +597,7 @@ public class FlinkRESTAPIWatcher {
   }
 
   public static void doWatching(Application application) {
-    if (isKubernetesApp(application)) {
+    if (application.isKubernetesModeJob()) {
       return;
     }
     log.info("FlinkRESTAPIWatcher add app to tracking,appId:{}", 
application.getId());
@@ -642,13 +642,12 @@ public class FlinkRESTAPIWatcher {
     return WATCHING_APPS.values();
   }
 
-  private static boolean isKubernetesApp(Application application) {
-    return FlinkK8sWatcherWrapper.isKubernetesApp(application);
-  }
-
   private static boolean isKubernetesApp(Long appId) {
     Application app = WATCHING_APPS.get(appId);
-    return FlinkK8sWatcherWrapper.isKubernetesApp(app);
+    if (app == null) {
+      return false;
+    }
+    return app.isKubernetesModeJob();
   }
 
   private YarnAppInfo httpYarnAppInfo(Application application) throws 
Exception {
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts 
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
index a56cb4dad..e79a1cedc 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.type.ts
@@ -56,6 +56,7 @@ export interface AppListRecord {
   clusterId?: string;
   flinkImage?: string;
   k8sNamespace: string;
+  serviceAccount?: string;
   state: number;
   release: number;
   build: boolean;
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
 
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
index d4e0ce164..19ba3b151 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
@@ -63,9 +63,7 @@
       height: 18px;
       background-color: #fff;
       border-radius: 50%;
-      transition:
-        transform 0.5s,
-        background-color 0.5s;
+      transition: transform 0.5s, background-color 0.5s;
       will-change: transform;
     }
 
diff --git 
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
 
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
index 78cac5c5b..e08c25f36 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
@@ -179,9 +179,7 @@
     background-color: @component-background;
     border: 1px solid rgb(0 0 0 / 8%);
     border-radius: 0.25rem;
-    box-shadow:
-      0 2px 2px 0 rgb(0 0 0 / 14%),
-      0 3px 1px -2px rgb(0 0 0 / 10%),
+    box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
       0 1px 5px 0 rgb(0 0 0 / 6%);
     background-clip: padding-box;
     user-select: none;
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
 
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
index e5a9dacf6..1cd7e3809 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
@@ -113,7 +113,7 @@
       });
 
       const getBindValue = computed(
-        () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
+        () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
       );
 
       const getSchema = computed((): FormSchema[] => {
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
 
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
index 8fdbc8f41..e89a6ce97 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
@@ -39,9 +39,7 @@
     line-height: 44px;
     background-color: @component-background;
     border-top: 1px solid @border-color-base;
-    box-shadow:
-      0 -6px 16px -8px rgb(0 0 0 / 8%),
-      0 -9px 28px 0 rgb(0 0 0 / 5%),
+    box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 
5%),
       0 -12px 48px 16px rgb(0 0 0 / 3%);
     transition: width 0.2s;
 
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
 
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854c5..35c080269 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++ 
b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
     props: {
       column: {
         type: Object as PropType<BasicColumn>,
-        default: () => ({}) as BasicColumn,
+        default: () => ({} as BasicColumn),
       },
     },
     setup(props) {
diff --git 
a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts 
b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
index 9a6607421..c543be954 100644
--- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
+++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
@@ -32,12 +32,9 @@ export function useLockPage() {
     }
     clear();
 
-    timeId = setTimeout(
-      () => {
-        lockPage();
-      },
-      lockTime * 60 * 1000,
-    );
+    timeId = setTimeout(() => {
+      lockPage();
+    }, lockTime * 60 * 1000);
   }
 
   function lockPage(): void {
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index 7e1a6abd2..cde802766 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -46,6 +46,7 @@ export default {
   uploadJar: 'Upload Jar',
   kubernetesNamespace: 'Kubernetes Namespace',
   kubernetesClusterId: 'Kubernetes ClusterId',
+  kubernetesCluster: 'Kubernetes Cluster',
   flinkBaseDockerImage: 'Flink Base Docker Image',
   restServiceExposedType: 'Rest-Service Exposed Type',
   resourceFrom: 'Resource From',
@@ -221,8 +222,6 @@ export default {
       'The application name is already exists in YARN, cannot be repeated. 
Please check',
     appNameExistsInK8sMessage:
       'The application name is already exists in Kubernetes,cannot be 
repeated. Please check',
-    appNameNotValid:
-      'The application name is invalid, must be (Chinese or English or "-" or 
"_"), two consecutive spaces cannot appear.Please check',
     flinkClusterIsRequiredMessage: 'Flink Cluster is required',
     flinkSqlIsRequiredMessage: 'Flink SQL is required',
     tagsPlaceholder: 'Please enter tags,if more than one, separate them with 
commas(,)',
@@ -234,10 +233,18 @@ export default {
     tmPlaceholder: 'Please select the resource parameters to set',
     yarnQueuePlaceholder: 'Please enter yarn queue label',
     descriptionPlaceholder: 'Please enter description for this application',
+    serviceAccountPlaceholder: 'Please enter kubernetes service-account, e.g: 
default',
     kubernetesNamespacePlaceholder: 'Please enter kubernetes Namespace, e.g: 
default',
     kubernetesClusterIdPlaceholder: 'Please enter Kubernetes clusterId',
-    kubernetesClusterIdRequire:
-      "lower case alphanumeric characters, '-', and must start and end with an 
alphanumeric character,and no more than 45 characters",
+    appNameValid: 'The application name is invalid',
+    appNameRole: 'The application name is invalid',
+    appNameK8sClusterIdRole:
+      'The current deployment mode is K8s Application mode, and the job name 
will be used as the clusterId in K8s. Therefore, the job name must follow the 
following rules:',
+    appNameK8sClusterIdRoleLength: 'must be no more than 45 characters',
+    appNameK8sClusterIdRoleRegexp:
+      'must only contain lowercase alphanumeric characters and "-",The 
required format is [a-z]([-a-z0-9]*[a-z0-9])',
+    appNameRoleContent:
+      'must be (Chinese or English or "-" or "_"), two consecutive spaces 
cannot appear.Please check',
     kubernetesClusterIdIsRequiredMessage: 'Kubernetes clusterId is required',
     flinkImagePlaceholder:
       'Please enter the tag of Flink base docker image, such as: 
flink:1.13.0-scala_2.11-java8',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 7ac9ef7e0..9cbb9c7ab 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -46,6 +46,7 @@ export default {
   uploadJar: '上传依赖Jar文件',
   kubernetesNamespace: 'K8S命名空间',
   kubernetesClusterId: 'K8S ClusterId',
+  kubernetesCluster: 'Kubernetes Session Cluster',
   flinkBaseDockerImage: 'Flink基础docker镜像',
   restServiceExposedType: 'K8S服务对外类型',
   resourceFrom: '资源来源',
@@ -216,8 +217,14 @@ export default {
     appNameNotUniqueMessage: '作业名称必须唯一, 输入的作业名称已经存在',
     appNameExistsInYarnMessage: '应用程序名称已经在YARN集群中存在,不能重复。请检查',
     appNameExistsInK8sMessage: '该应用程序名称已经在K8S集群中存在,不能重复。请检查',
-    appNameNotValid:
-      '应用程序名称无效。字符必须是(中文 或 英文 或 "-" 或 "_"),不能出现两个连续的空格,请检查',
+    appNameValid: '应用程序名称不合法',
+    appNameRole: '应用名称必须遵循以下规则:',
+    appNameK8sClusterIdRole:
+      '当前部署模式是 K8s Application模式,会将作业名称作为k8s的 clusterId,因此作业名称要遵循以下规则:',
+    appNameK8sClusterIdRoleLength: '不应超过 45 个字符',
+    appNameK8sClusterIdRoleRegexp:
+      '只能由小写字母、数字、字符、和"-" 组成,必须满足正则格式 [a-z]([-a-z0-9]*[a-z0-9])',
+    appNameRoleContent: '字符必须是(中文 或 英文 或 "-" 或 "_"),不能出现两个连续的空格',
     flinkClusterIsRequiredMessage: 'Flink集群必填',
     flinkSqlIsRequiredMessage: 'Flink SQL必填',
     tagsPlaceholder: '请输入标签,如果超过一个,用逗号(,)分隔',
@@ -229,10 +236,9 @@ export default {
     tmPlaceholder: '请选择要设置的资源参数',
     yarnQueuePlaceholder: '请输入yarn队列标签名称',
     descriptionPlaceholder: '请输入此应用程序的描述',
+    serviceAccountPlaceholder: '请输入K8S服务账号(service-account)',
     kubernetesNamespacePlaceholder: '请输入K8S命名空间, 如: default',
     kubernetesClusterIdPlaceholder: '请选择K8S ClusterId',
-    kubernetesClusterIdRequire:
-      '小写字母、数字、“-”,并且必须以字母数字字符开头和结尾,并且不超过45个字符',
     kubernetesClusterIdIsRequiredMessage: 'K8S ClusterId必填',
     flinkImagePlaceholder: 
'请输入Flink基础docker镜像的标签,如:flink:1.13.0-scala_2.11-java8',
     flinkImageIsRequiredMessage: 'Flink基础docker镜像是必填的',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
index 4d9974736..6d9df6bec 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkCluster.ts
@@ -33,7 +33,8 @@ export default {
     yarnSessionClusterId: 'Yarn Session模式集群ID',
     k8sNamespace: 'k8s命名空间',
     k8sClusterId: 'k8s集群ID',
-    serviceAccount: 'k8s命名空间绑定的服务账号',
+    k8sSessionCluster: 'k8s Session集群',
+    serviceAccount: 'k8s服务账号',
     k8sConf: 'k8s环境Kube配置文件',
     flinkImage: 'Flink基础docker镜像',
     k8sRestExposedType: 'K8S服务对外类型',
diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts 
b/streampark-console/streampark-console-webapp/src/utils/props.ts
index 5d1d35150..ebbe33a1a 100644
--- a/streampark-console/streampark-console-webapp/src/utils/props.ts
+++ b/streampark-console/streampark-console-webapp/src/utils/props.ts
@@ -175,7 +175,7 @@ export const buildProps = <
       : never;
   };
 
-export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as 
PropWrapper<T>;
+export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as 
PropWrapper<T>);
 
 export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as 
Array<keyof T>;
 export const mutable = <T extends readonly any[] | Record<string, 
unknown>>(val: T) =>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue 
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
index 2e0598e68..d82491015 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/base/login/Login.vue
@@ -59,8 +59,9 @@
         />
       </a>
       <p class="text-light-100 pt-10px" style="border-top: 1px solid #dad7d7">
-        Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache 
Software Foundation. Apache StreamPark, StreamPark, and its
-        feather logo are trademarks of The Apache Software Foundation.
+        Copyright © 2022-{{ `${new Date().getFullYear()}` }} The Apache 
Software Foundation. Apache
+        StreamPark, StreamPark, and its feather logo are trademarks of The 
Apache Software
+        Foundation.
       </p>
     </footer>
   </div>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index 31a6e3208..ed8f04c93 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -106,12 +106,21 @@
           ? 'yarnSessionClusterId'
           : 'flinkClusterId']: app.flinkClusterId,
         flinkImage: app.flinkImage,
-        k8sNamespace: app.k8sNamespace,
+        k8sNamespace: app.k8sNamespace || null,
+        serviceAccount: app.serviceAccount || null,
         alertId: selectAlertId,
         projectName: app.projectName,
         module: app.module,
         ...resetParams,
       };
+
+      if (app.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
+        Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
+      } else if (app.executionMode == ExecModeEnum.YARN_SESSION) {
+        Object.assign(defaultParams, { flinkClusterId: app.flinkClusterId });
+      } else if (app.executionMode == ExecModeEnum.REMOTE) {
+      }
+
       if (!executionMode) {
         Object.assign(defaultParams, { executionMode: app.executionMode });
       }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index c6f9197b0..0ff995bec 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -120,13 +120,26 @@
           cpFailureAction: app.cpFailureAction,
         },
         clusterId: app.clusterId,
-        [app.executionMode == ExecModeEnum.YARN_SESSION
-          ? 'yarnSessionClusterId'
-          : 'flinkClusterId']: app.flinkClusterId,
         flinkImage: app.flinkImage,
         k8sNamespace: app.k8sNamespace,
+        serviceAccount: app.serviceAccount || null,
         ...resetParams,
       };
+
+      switch (app.executionMode) {
+        case ExecModeEnum.REMOTE:
+          defaultParams['remoteClusterId'] = app.flinkClusterId;
+          break;
+        case ExecModeEnum.YARN_SESSION:
+          defaultParams['yarnSessionClusterId'] = app.flinkClusterId;
+          break;
+        case ExecModeEnum.KUBERNETES_SESSION:
+          defaultParams['k8sSessionClusterId'] = app.flinkClusterId;
+          break;
+        default:
+          break;
+      }
+
       if (!executionMode) {
         Object.assign(defaultParams, { executionMode: app.executionMode });
       }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 8a6942398..fba0fc31b 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -20,6 +20,7 @@ import { executionModes, k8sRestExposedType, resolveOrder } 
from '../data';
 import optionData from '../data/option';
 import {
   getAlertSvgIcon,
+  renderJobName,
   renderDynamicProperties,
   renderInputDropdown,
   renderInputGroup,
@@ -199,7 +200,7 @@ export const useCreateAndEditSchema = (
         ],
       },
       {
-        field: 'flinkClusterId',
+        field: 'remoteClusterId',
         label: t('flink.app.flinkCluster'),
         component: 'Select',
         componentProps: {
@@ -224,11 +225,24 @@ export const useCreateAndEditSchema = (
           { required: true, message: 
t('flink.app.addAppTips.flinkClusterIsRequiredMessage') },
         ],
       },
+      {
+        field: 'k8sSessionClusterId',
+        label: t('flink.app.flinkCluster'),
+        component: 'Select',
+        componentProps: {
+          placeholder: t('flink.app.flinkCluster'),
+          options: getExecutionCluster(ExecModeEnum.KUBERNETES_SESSION, 'id'),
+        },
+        ifShow: ({ values }) => values.executionMode == 
ExecModeEnum.KUBERNETES_SESSION,
+        rules: [
+          { required: true, message: 
t('flink.app.addAppTips.flinkClusterIsRequiredMessage') },
+        ],
+      },
       {
         field: 'k8sNamespace',
         label: t('flink.app.kubernetesNamespace'),
         component: 'Input',
-        ifShow: ({ values }) => isK8sExecMode(values.executionMode),
+        ifShow: ({ values }) => values.executionMode == 
ExecModeEnum.KUBERNETES_APPLICATION,
         render: ({ model, field }) =>
           renderInputDropdown(model, field, {
             placeholder: 
t('flink.app.addAppTips.kubernetesNamespacePlaceholder'),
@@ -236,39 +250,15 @@ export const useCreateAndEditSchema = (
           }),
       },
       {
-        field: 'clusterId',
-        label: t('flink.app.kubernetesClusterId'),
+        field: 'serviceAccount',
+        label: t('setting.flinkCluster.form.serviceAccount'),
         component: 'Input',
-        componentProps: ({ formModel }) => {
-          return {
-            placeholder: t('flink.app.addAppTips.kubernetesClusterIdRequire'),
-            onChange: (e: ChangeEvent) => (formModel.jobName = e.target.value),
-          };
-        },
         ifShow: ({ values }) => values.executionMode == 
ExecModeEnum.KUBERNETES_APPLICATION,
-        rules: [
-          {
-            required: true,
-            message: t('flink.app.addAppTips.kubernetesClusterIdRequire'),
-            pattern: /^(?=.{1,45}$)[a-z]([-a-z0-9]*[a-z0-9])$/,
-          },
-        ],
-      },
-      {
-        field: 'clusterId',
-        label: t('flink.app.kubernetesClusterId'),
-        component: 'Select',
-        ifShow: ({ values }) => values.executionMode == 
ExecModeEnum.KUBERNETES_SESSION,
-        componentProps: {
-          placeholder: 
t('flink.app.addAppTips.kubernetesClusterIdPlaceholder'),
-          options: getExecutionCluster(ExecModeEnum.KUBERNETES_SESSION, 
'clusterId'),
-        },
-        rules: [
-          {
-            required: true,
-            message: 
t('flink.app.addAppTips.kubernetesClusterIdIsRequiredMessage'),
-          },
-        ],
+        render: ({ model, field }) =>
+          renderInputDropdown(model, field, {
+            placeholder: t('flink.app.addAppTips.serviceAccountPlaceholder'),
+            options: unref(historyRecord)?.k8sNamespace || [],
+          }),
       },
       {
         field: 'flinkImage',
@@ -296,26 +286,33 @@ export const useCreateAndEditSchema = (
   });
 
   /* Detect job name field */
-  async function getJobNameCheck(_rule: RuleObject, value: StoreValue) {
+  async function getJobNameCheck(_rule: RuleObject, value: StoreValue, model: 
Recordable) {
     if (value === null || value === undefined || value === '') {
       return 
Promise.reject(t('flink.app.addAppTips.appNameIsRequiredMessage'));
-    } else {
-      const params = { jobName: value };
-      if (edit?.appId) Object.assign(params, { id: edit.appId });
-      const res = await fetchCheckName(params);
-      switch (parseInt(res)) {
-        case 0:
-          return Promise.resolve();
-        case 1:
-          return 
Promise.reject(t('flink.app.addAppTips.appNameNotUniqueMessage'));
-        case 2:
-          return 
Promise.reject(t('flink.app.addAppTips.appNameExistsInYarnMessage'));
-        case 3:
-          return 
Promise.reject(t('flink.app.addAppTips.appNameExistsInK8sMessage'));
-        default:
-          return Promise.reject(t('flink.app.addAppTips.appNameNotValid'));
+    }
+    if (model.executionMode == ExecModeEnum.KUBERNETES_APPLICATION) {
+      const regexp = /^(?=.{1,45}$)[a-z]([-a-z0-9]*[a-z0-9])$/;
+      if (!regexp.test(value)) {
+        return Promise.reject(t('flink.app.addAppTips.appNameValid'));
       }
     }
+    const params = { jobName: value };
+    if (edit?.appId) {
+      Object.assign(params, { id: edit.appId });
+    }
+    const res = await fetchCheckName(params);
+    switch (parseInt(res)) {
+      case 0:
+        return Promise.resolve();
+      case 1:
+        return 
Promise.reject(t('flink.app.addAppTips.appNameNotUniqueMessage'));
+      case 2:
+        return 
Promise.reject(t('flink.app.addAppTips.appNameExistsInYarnMessage'));
+      case 3:
+        return 
Promise.reject(t('flink.app.addAppTips.appNameExistsInK8sMessage'));
+      default:
+        return Promise.reject(t('flink.app.addAppTips.appNameValid'));
+    }
   }
 
   const getFlinkFormOtherSchemas = computed((): FormSchema[] => {
@@ -329,9 +326,16 @@ export const useCreateAndEditSchema = (
         field: 'jobName',
         label: t('flink.app.appName'),
         component: 'Input',
-        componentProps: { placeholder: 
t('flink.app.addAppTips.appNamePlaceholder') },
-        dynamicRules: () => {
-          return [{ required: true, trigger: 'blur', validator: 
getJobNameCheck }];
+        render: (renderCallbackParams) => renderJobName(renderCallbackParams),
+        dynamicRules: ({ model }) => {
+          return [
+            {
+              required: true,
+              trigger: 'blur',
+              validator: (rule: RuleObject, value: StoreValue) =>
+                getJobNameCheck(rule, value, model),
+            },
+          ];
         },
       },
       {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
index 851778227..068911c83 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx
@@ -39,7 +39,12 @@ import { handleConfTemplate } from '/@/api/flink/config';
 import { decodeByBase64 } from '/@/utils/cipher';
 import { useMessage } from '/@/hooks/web/useMessage';
 import { SelectValue } from 'ant-design-vue/lib/select';
-import { AppTypeEnum, CandidateTypeEnum, FailoverStrategyEnum } from 
'/@/enums/flinkEnum';
+import {
+  AppTypeEnum,
+  CandidateTypeEnum,
+  ExecModeEnum,
+  FailoverStrategyEnum,
+} from '/@/enums/flinkEnum';
 import { useI18n } from '/@/hooks/web/useI18n';
 import { fetchYarnQueueList } from '/@/api/flink/setting/yarnQueue';
 import { ApiSelect } from '/@/components/Form';
@@ -261,6 +266,46 @@ export const renderYarnQueue = ({ model, field }: 
RenderCallbackParams) => {
   );
 };
 
+export const renderJobName = ({ model, field }: RenderCallbackParams) => {
+  return (
+    <div>
+      <Input
+        name="jobName"
+        placeholder={t('flink.app.addAppTips.appNamePlaceholder')}
+        value={model[field]}
+        onInput={(e: ChangeEvent) => (model[field] = e?.target?.value)}
+      />
+      <p class="conf-desc mt-10px">
+        <span class="note-info">
+          <Tag color="#2db7f5" class="tag-note">
+            {t('flink.app.noteInfo.note')}
+          </Tag>
+          {model.executionMode == ExecModeEnum.KUBERNETES_APPLICATION && (
+            <span>
+              {t('flink.app.addAppTips.appNameK8sClusterIdRole')}
+              <div>
+                <Tag color="orange"> 1.</Tag>
+                {t('flink.app.addAppTips.appNameK8sClusterIdRoleLength')}
+              </div>
+              <div>
+                <Tag color="orange"> 2.</Tag>
+                {t('flink.app.addAppTips.appNameK8sClusterIdRoleRegexp')}
+              </div>
+            </span>
+          )}
+
+          {model.executionMode != ExecModeEnum.KUBERNETES_APPLICATION && (
+            <span>
+              <span>{t('flink.app.addAppTips.appNameRole')}</span>
+              <span>{t('flink.app.addAppTips.appNameRoleContent')}</span>
+            </span>
+          )}
+        </span>
+      </p>
+    </div>
+  );
+};
+
 /* render memory option */
 export const renderDynamicProperties = ({ model, field }: 
RenderCallbackParams) => {
   return (
@@ -277,7 +322,6 @@ export const renderDynamicProperties = ({ model, field }: 
RenderCallbackParams)
           <Tag color="#2db7f5" class="tag-note">
             {t('flink.app.noteInfo.note')}
           </Tag>
-          {t('flink.app.noteInfo.dynamicProperties')}
           <a
             
href="https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html";
             target="_blank"
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 715dc1ebd..87274d832 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -265,6 +265,19 @@ export function handleDependencyJsonToPom(json, pomMap, 
jarMap) {
   }
 }
 
+function getFlinkClusterId(values: Recordable) {
+  if (values.executionMode == ExecModeEnum.YARN_SESSION) {
+    return values.yarnSessionClusterId;
+  }
+  if (values.executionMode == ExecModeEnum.REMOTE) {
+    return values.remoteClusterId;
+  }
+  if (values.executionMode == ExecModeEnum.KUBERNETES_SESSION) {
+    return values.k8sSessionClusterId;
+  }
+  return null;
+}
+
 export function handleSubmitParams(
   params: Recordable,
   values: Recordable,
@@ -290,14 +303,12 @@ export function handleSubmitParams(
     description: values.description,
     k8sNamespace: values.k8sNamespace || null,
     clusterId: values.clusterId || null,
-    flinkClusterId:
-      (values.executionMode == ExecModeEnum.YARN_SESSION
-        ? values.yarnSessionClusterId
-        : values.flinkClusterId) || null,
+    flinkClusterId: getFlinkClusterId(values),
     flinkImage: values.flinkImage || null,
   });
   if (params.executionMode == ExecModeEnum.KUBERNETES_APPLICATION) {
     Object.assign(params, {
+      serviceAccount: values.serviceAccount,
       k8sPodTemplate: k8sTemplate.podTemplate,
       k8sJmPodTemplate: k8sTemplate.jmPodTemplate,
       k8sTmPodTemplate: k8sTemplate.tmPodTemplate,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index f11f561c9..7587f750e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -25,13 +25,13 @@ import javax.annotation.Nullable
 import java.util.{Map => JavaMap}
 
 case class CancelRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    jobId: String,
+    override val flinkVersion: FlinkVersion,
+    override val executionMode: ExecutionMode,
+    @Nullable override val properties: JavaMap[String, Any],
+    override val clusterId: String,
+    override val jobId: String,
     override val withSavepoint: Boolean,
     withDrain: Boolean,
-    savepointPath: String,
+    override val savepointPath: String,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index 81b36a17b..936af108e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -31,7 +31,8 @@ case class DeployRequest(
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
     properties: JavaMap[String, Any],
-    clusterId: String) {
+    clusterId: String,
+    clusterName: String) {
 
   private[client] lazy val hdfsWorkspace = {
 
@@ -65,12 +66,13 @@ class KubernetesDeployRequest(
     override val executionMode: ExecutionMode,
     override val properties: JavaMap[String, Any],
     override val clusterId: String,
+    override val clusterName: String,
     val kubernetesNamespace: String = 
KubernetesConfigOptions.NAMESPACE.defaultValue(),
     val kubeConf: String = "~/.kube/config",
     val serviceAccount: String = 
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT.defaultValue(),
     val flinkImage: String = 
KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
     val flinkRestExposedType: FlinkK8sRestExposedType = 
FlinkK8sRestExposedType.CLUSTER_IP)
-  extends DeployRequest(flinkVersion, executionMode, properties, clusterId)
+  extends DeployRequest(flinkVersion, executionMode, properties, clusterId, 
clusterName)
 
 object KubernetesDeployRequest {
   def apply(
@@ -78,6 +80,7 @@ object KubernetesDeployRequest {
       executionMode: ExecutionMode,
       properties: JavaMap[String, Any],
       clusterId: String,
+      clusterName: String,
       kubernetesNamespace: String,
       kubeConf: String,
       serviceAccount: String,
@@ -88,6 +91,7 @@ object KubernetesDeployRequest {
       executionMode,
       properties,
       clusterId,
+      clusterName,
       kubernetesNamespace,
       kubeConf,
       serviceAccount,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
index 91b6ecad0..6e1edbaff 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.flink.client.bean
 
-case class DeployResponse(address: String, clusterId: String)
+case class DeployResponse(address: String = null, clusterId: String = null, 
error: Throwable = null)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
index 5c9a14728..de2df366f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.flink.client.bean
 
-case class ShutDownResponse()
+case class ShutDownResponse(error: Throwable = null)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 8f6344834..7bb87b06e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -26,11 +26,11 @@ import java.util.{Map => JavaMap}
 
 /** Trigger savepoint request. */
 case class TriggerSavepointRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    jobId: String,
-    savepointPath: String,
+    override val flinkVersion: FlinkVersion,
+    override val executionMode: ExecutionMode,
+    @Nullable override val properties: JavaMap[String, Any],
+    override val clusterId: String,
+    override val jobId: String,
+    override val savepointPath: String,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 7638be720..3606c03a0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -169,16 +169,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         client =
           
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
-
-      if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId)
-      } else {
-        null
-      }
+      getDeployResponse(client)
     } catch {
-      case e: Exception =>
-        logError(s"start flink session fail in ${deployRequest.executionMode} 
mode")
-        throw e
+      case e: Exception => DeployResponse(error = e)
     } finally {
       Utils.close(client, clusterDescriptor, kubeClient)
     }
@@ -204,18 +197,17 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
 
     val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
     val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
-    val client = clusterDescriptor
-      .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
-      .getClusterClient
-    try {
-      client.shutDownCluster()
-      ShutDownResponse()
-    } catch {
-      case e: Exception =>
-        logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
-        throw e
-    } finally {
-      Utils.close(client)
+    Try(
+      clusterDescriptor
+        .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+        .getClusterClient
+    ) match {
+      case Failure(e) => ShutDownResponse(e)
+      case Success(c) =>
+        Try(c.shutDownCluster()) match {
+          case Success(_) => ShutDownResponse()
+          case Failure(e) => ShutDownResponse(e)
+        }
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index edcd6ed75..0e0b4b3c9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
 import org.apache.flink.configuration._
@@ -58,7 +59,10 @@ object YarnSessionClient extends YarnClientTrait {
    * @param deployRequest
    * @param flinkConfig
    */
-  def deployClusterConfig(deployRequest: DeployRequest, flinkConfig: 
Configuration): Unit = {
+  private def deployClusterConfig(
+      deployRequest: DeployRequest,
+      flinkConfig: Configuration): Unit = {
+
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
       deployRequest.flinkVersion.flinkHome)
     val currentUser = UserGroupInformation.getCurrentUser
@@ -86,6 +90,10 @@ object YarnSessionClient extends YarnClientTrait {
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
       // conf dir
       .safeSet(DeploymentOptionsInternal.CONF_DIR, 
s"${deployRequest.flinkVersion.flinkHome}/conf")
+      // app tags
+      .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
+      // app name
+      .safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName)
 
     logInfo(s"""
                
|------------------------------------------------------------------
@@ -164,10 +172,12 @@ object YarnSessionClient extends YarnClientTrait {
     try {
       val flinkConfig =
         extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
+
       deployClusterConfig(deployRequest, flinkConfig)
+
       val yarnClusterDescriptor = 
getSessionClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
-      if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty) 
{
+      if (StringUtils.isNotBlank(deployRequest.clusterId)) {
         try {
           val applicationStatus =
             clusterDescriptor.getYarnClient
@@ -183,22 +193,14 @@ object YarnSessionClient extends YarnClientTrait {
             }
           }
         } catch {
-          case _: ApplicationNotFoundException =>
-            logInfo("this applicationId have not managed by yarn ,need deploy 
...")
+          case e: Exception => return DeployResponse(error = e)
         }
       }
       val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
       client = clientProvider.getClusterClient
-      if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
-      } else {
-        null
-      }
+      getDeployResponse(client)
     } catch {
-      case e: Exception =>
-        logError(s"start flink session fail in ${deployRequest.executionMode} 
mode")
-        e.printStackTrace()
-        throw e
+      case e: Exception => DeployResponse(error = e)
     } finally {
       Utils.close(client, clusterDescriptor)
     }
@@ -234,10 +236,7 @@ object YarnSessionClient extends YarnClientTrait {
           .getFinalApplicationStatus}")
       ShutDownResponse()
     } catch {
-      case e: Exception =>
-        logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
-        e.printStackTrace()
-        throw e
+      case e: Exception => ShutDownResponse(e)
     } finally {
       Utils.close(client, clusterDescriptor)
     }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index f1b171310..09b4ed8ad 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -524,4 +524,11 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath).get()
   }
 
+  def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
+    if (client.getWebInterfaceURL != null) {
+      DeployResponse(address = client.getWebInterfaceURL, clusterId = 
client.getClusterId.toString)
+    } else {
+      DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
+    }
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index 717208cf0..94020f170 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -17,11 +17,12 @@
 
 package org.apache.streampark.flink.kubernetes
 
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{Logger, Utils}
 import org.apache.streampark.flink.kubernetes.model._
 
 import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
 
+import java.util.Objects
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
@@ -107,9 +108,19 @@ class FlinkK8sWatchController extends Logger with 
AutoCloseable {
 }
 
 //----cache----
-case class CacheKey(key: java.lang.Long) extends Serializable
+case class CacheKey(key: java.lang.Long) extends Serializable {
+  override def hashCode(): Int = Utils.hashCode(key)
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: CacheKey => Objects.equals(key, that.key)
+      case _ => false
+    }
+  }
+}
 
 class TrackIdCache {
+
   private[this] lazy val cache: Cache[CacheKey, TrackId] = 
Caffeine.newBuilder.build()
 
   def update(k: TrackId): Unit = {
@@ -153,7 +164,9 @@ class JobStatusCache {
   def getAsMap(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] =
     cache.getAllPresent(trackIds.map(t => t.appId)).toMap
 
-  def get(k: TrackId): JobStatusCV = cache.getIfPresent(CacheKey(k.appId))
+  def get(k: TrackId): JobStatusCV = {
+    cache.getIfPresent(CacheKey(k.appId))
+  }
 
   def invalidate(k: TrackId): Unit = cache.invalidate(CacheKey(k.appId))
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
index 991a9647d..73d97e275 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
@@ -17,13 +17,30 @@
 
 package org.apache.streampark.flink.kubernetes.model
 
+import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
 /** flink cluster identifier on kubernetes */
 case class ClusterKey(
     executeMode: FlinkK8sExecuteMode.Value,
     namespace: String = "default",
-    clusterId: String)
+    clusterId: String) {
+
+  override def toString: String = executeMode.toString + namespace + clusterId
+
+  override def hashCode(): Int = Utils.hashCode(executeMode, namespace, 
clusterId)
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: ClusterKey =>
+        this.executeMode == that.executeMode &&
+        this.namespace == that.namespace &&
+        this.clusterId == that.clusterId
+      case _ => false
+    }
+  }
+
+}
 
 object ClusterKey {
   def of(trackId: TrackId): ClusterKey =
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
index 7bd5bd52a..3c3781435 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
@@ -42,4 +42,12 @@ case class JobStatusCV(
     duration: Long = 0,
     taskTotal: Int = 0,
     pollEmitTime: Long,
-    pollAckTime: Long)
+    pollAckTime: Long) {
+
+  def diff(that: JobStatusCV): Boolean = {
+    that == null ||
+    that.jobState != this.jobState ||
+    that.jobId != this.jobId
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
index 75abf5e94..7eb0a51e9 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sEventKey.scala
@@ -17,4 +17,18 @@
 
 package org.apache.streampark.flink.kubernetes.model
 
-case class K8sEventKey(namespace: String, clusterId: String)
+import org.apache.streampark.common.util.Utils
+
+case class K8sEventKey(namespace: String, clusterId: String) {
+
+  override def hashCode(): Int = Utils.hashCode(namespace, clusterId)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: K8sEventKey =>
+        this.namespace == that.namespace &&
+        this.clusterId == that.clusterId
+      case _ => false
+    }
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index e280122c8..a7147ce67 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.model
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
-import java.lang.{Boolean => JavaBool}
+import java.lang.{Boolean => JavaBool, Long => JavaLong}
 
 import scala.util.Try
 
@@ -29,7 +29,7 @@ case class TrackId(
     executeMode: FlinkK8sExecuteMode.Value,
     namespace: String = "default",
     clusterId: String,
-    appId: Long,
+    appId: JavaLong = null,
     jobId: String,
     groupId: String) {
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 36bac3d68..8bb89e623 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -20,8 +20,8 @@ package org.apache.streampark.flink.kubernetes.watcher
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
-import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
-import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, 
SESSION}
+import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState, 
FlinkK8sExecuteMode}
+import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.APPLICATION
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.model._
@@ -101,59 +101,66 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
         .getOrElse(return
         )
 
-      // retrieve flink job status in thread pool
-      val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map {
-        id =>
-          val future = Future {
-            id.executeMode match {
-              case SESSION => touchSessionJob(id)
-              case APPLICATION => touchApplicationJob(id)
-            }
-          }
-
-          future.onComplete(_.getOrElse(None) match {
-            case Some(jobState) =>
-              val trackId = id.copy(jobId = jobState.jobId)
-              val latest: JobStatusCV = 
watchController.jobStatuses.get(trackId)
-
-              val eventChanged = latest == null ||
-                latest.jobState != jobState.jobState ||
-                latest.jobId != jobState.jobId
-
-              if (eventChanged) {
-                logInfo(s"eventChanged.....$trackId")
-                // put job status to cache
-                watchController.jobStatuses.put(trackId, jobState)
-                // set jobId to trackIds
-                watchController.trackIds.update(trackId)
-                eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
-              }
+      // 1) k8s application mode
+      val appFuture: Set[Future[Option[JobStatusCV]]] =
+        trackIds.filter(_.executeMode == FlinkK8sExecuteMode.APPLICATION).map {
+          id =>
+            val future = Future(touchApplicationJob(id))
+            future.onComplete(_.getOrElse(None) match {
+              case Some(jobState) =>
+                updateState(id.copy(jobId = jobState.jobId), jobState)
+              case _ =>
+            })
+            future
+        }
 
-              val deployExists = KubernetesRetriever.isDeploymentExists(
-                trackId.namespace,
-                trackId.clusterId
-              )
-
-              if (FlinkJobState.isEndState(jobState.jobState) && 
!deployExists) {
-                // remove trackId from cache of job that needs to be untracked
-                watchController.unWatching(trackId)
-                if (trackId.executeMode == APPLICATION) {
-                  watchController.endpoints.invalidate(trackId.toClusterKey)
-                }
-              }
+      // 2) k8s session mode
+      val sessionIds = trackIds.filter(_.executeMode == 
FlinkK8sExecuteMode.SESSION)
+      val sessionCluster = 
sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
+      val sessionFuture = sessionCluster.map {
+        id =>
+          val future = Future(touchSessionAllJob(id))
+          future.onComplete(_.toOption match {
+            case Some(map) =>
+              sessionIds.foreach(
+                id => {
+                  map.find(_._1.jobId == id.jobId) match {
+                    case Some(job) =>
+                      updateState(job._1.copy(appId = id.appId), job._2)
+                    case _ =>
+                      // can't find that job in the k8s cluster.
+                      watchController.unWatching(id)
+                      val lostState = JobStatusCV(
+                        jobState = FlinkJobState.LOST,
+                        jobId = id.jobId,
+                        pollEmitTime = System.currentTimeMillis,
+                        pollAckTime = System.currentTimeMillis
+                      )
+                      eventBus.postSync(FlinkJobStatusChangeEvent(id, 
lostState))
+                  }
+                })
             case _ =>
           })
           future
       }
 
       // blocking until all future are completed or timeout is reached
-      Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec 
seconds)).failed.map {
+      Try(Await.ready(Future.sequence(appFuture), conf.requestTimeoutSec 
seconds)).failed.map {
         _ =>
           logWarn(
-            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes 
mode timeout," +
+            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes 
native application mode timeout," +
               s" limitSeconds=${conf.requestTimeoutSec}," +
               s" trackIds=${trackIds.mkString(",")}")
       }
+
+      Try(Await.ready(Future.sequence(sessionFuture), conf.requestTimeoutSec 
seconds)).failed.map {
+        _ =>
+          logWarn(
+            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes 
native session mode timeout," +
+              s" limitSeconds=${conf.requestTimeoutSec}," +
+              s" trackIds=${trackIds.mkString(",")}")
+      }
+
       logDebug(
         "[FlinkJobStatusWatcher]: End of status monitoring process - " + Thread
           .currentThread()
@@ -169,43 +176,10 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    * result.
    */
   def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
-    val pollEmitTime = System.currentTimeMillis
-
-    val id = TrackId.onSession(
-      trackId.namespace,
-      trackId.clusterId,
-      trackId.appId,
-      trackId.jobId,
-      trackId.groupId
-    )
-
-    val rsMap = touchSessionAllJob(
-      trackId.namespace,
-      trackId.clusterId,
-      trackId.appId,
-      trackId.groupId
-    ).toMap
-
-    val jobState = rsMap.get(id).filter(_.jobState != 
FlinkJobState.SILENT).getOrElse {
-      val preCache = watchController.jobStatuses.get(id)
-      val state = inferSilentOrLostFromPreCache(preCache)
-      val nonFirstSilent =
-        state == FlinkJobState.SILENT && preCache != null && preCache.jobState 
== FlinkJobState.SILENT
-      if (nonFirstSilent) {
-        JobStatusCV(
-          jobState = state,
-          jobId = id.jobId,
-          pollEmitTime = preCache.pollEmitTime,
-          pollAckTime = preCache.pollAckTime)
-      } else {
-        JobStatusCV(
-          jobState = state,
-          jobId = id.jobId,
-          pollEmitTime = pollEmitTime,
-          pollAckTime = System.currentTimeMillis)
-      }
-    }
-    Some(jobState)
+    touchSessionAllJob(trackId)
+      .find(id => id._1.jobId == trackId.jobId && id._2.jobState != 
FlinkJobState.SILENT)
+      .map(_._2)
+      .orElse(inferState(trackId))
   }
 
   /**
@@ -215,28 +189,18 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    * This method can be called directly from outside, without affecting the 
current cachePool
    * result.
    */
-  private def touchSessionAllJob(
-      @Nonnull namespace: String,
-      @Nonnull clusterId: String,
-      @Nonnull appId: Long,
-      @Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
-
-    lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
+  private def touchSessionAllJob(trackId: TrackId): Map[TrackId, JobStatusCV] 
= {
     val pollEmitTime = System.currentTimeMillis
-
-    val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
-      .getOrElse(return defaultResult)
-      .jobs
-
-    if (jobDetails.isEmpty) {
-      defaultResult
-    } else {
-      jobDetails.map {
-        d =>
-          val trackId = TrackId.onSession(namespace, clusterId, appId, d.jid, 
groupId)
-          val jobStatus = d.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis)
-          trackId -> jobStatus
-      }
+    val jobDetails = listJobsDetails(ClusterKey.of(trackId))
+    jobDetails match {
+      case Some(details) if details.jobs.nonEmpty =>
+        details.jobs.map {
+          d =>
+            val jobStatus = d.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis)
+            val trackItem = trackId.copy(jobId = d.jid, appId = null)
+            trackItem -> jobStatus
+        }.toMap
+      case None => Map.empty[TrackId, JobStatusCV]
     }
   }
 
@@ -249,16 +213,61 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    */
   def touchApplicationJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
     implicit val pollEmitTime: Long = System.currentTimeMillis
-    val clusterId = trackId.clusterId
-    val namespace = trackId.namespace
-    val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, 
clusterId))
+    val jobDetails = listJobsDetails(ClusterKey.of(trackId))
     if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
-      inferJobStateFromK8sEvent(trackId)
+      inferStateFromK8sEvent(trackId)
     } else {
       Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis))
     }
   }
 
+  private[this] def updateState(trackId: TrackId, jobState: JobStatusCV): Unit 
= {
+    val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
+    if (jobState.diff(latest)) {
+      // put job status to cache
+      watchController.jobStatuses.put(trackId, jobState)
+      // set jobId to trackIds
+      watchController.trackIds.update(trackId)
+      eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
+    }
+
+    lazy val deployExists = KubernetesRetriever.isDeploymentExists(
+      trackId.namespace,
+      trackId.clusterId
+    )
+
+    if (FlinkJobState.isEndState(jobState.jobState) && !deployExists) {
+      // remove trackId from cache of job that needs to be untracked
+      watchController.unWatching(trackId)
+      if (trackId.executeMode == APPLICATION) {
+        watchController.endpoints.invalidate(trackId.toClusterKey)
+      }
+    }
+  }
+
+  private[this] def inferState(id: TrackId): Option[JobStatusCV] = {
+    lazy val pollEmitTime = System.currentTimeMillis
+    val preCache = watchController.jobStatuses.get(id)
+    val state = inferFromPreCache(preCache)
+    val nonFirstSilent = state == FlinkJobState.SILENT &&
+      preCache != null &&
+      preCache.jobState == FlinkJobState.SILENT
+    val jobState = if (nonFirstSilent) {
+      JobStatusCV(
+        jobState = state,
+        jobId = id.jobId,
+        pollEmitTime = preCache.pollEmitTime,
+        pollAckTime = preCache.pollAckTime)
+    } else {
+      JobStatusCV(
+        jobState = state,
+        jobId = id.jobId,
+        pollEmitTime = pollEmitTime,
+        pollAckTime = System.currentTimeMillis)
+    }
+    Option(jobState)
+  }
+
   /** list flink jobs details */
   private def listJobsDetails(clusterKey: ClusterKey): Option[JobDetails] = {
     // get flink rest api
@@ -284,27 +293,26 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
 
   /** list flink jobs details from rest api */
   private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
-    val jobDetails = JobDetails.as(
+    JobDetails.as(
       Request
         .get(s"$restUrl/jobs/overview")
         .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
         .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
         .execute
         .returnContent()
-        .asString(StandardCharsets.UTF_8))
-    jobDetails
+        .asString(StandardCharsets.UTF_8)
+    )
   }
 
   /**
    * Infer the current flink state from the last relevant k8s events. This 
method is only used for
    * application-mode job inference in case of a failed JM rest request.
    */
-  private def inferJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
+  private def inferStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
       pollEmitTime: Long): Option[JobStatusCV] = {
 
     // infer from k8s deployment and event
     val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
-
     val jobState = trackId match {
       case id if watchController.canceling.has(id) =>
         logger.info(s"trackId ${trackId.toString} is canceling")
@@ -334,13 +342,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
               trackId.jobId)
             FlinkJobState.FAILED
           } else {
-            inferSilentOrLostFromPreCache(latest)
+            inferFromPreCache(latest)
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure 
process.")
           
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
         } else {
-          inferSilentOrLostFromPreCache(latest)
+          inferFromPreCache(latest)
         }
     }
 
@@ -359,7 +367,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
     }
   }
 
-  private[this] def inferSilentOrLostFromPreCache(preCache: JobStatusCV) = 
preCache match {
+  private[this] def inferFromPreCache(preCache: JobStatusCV) = preCache match {
     case preCache if preCache == null => FlinkJobState.SILENT
     case preCache
         if preCache.jobState == FlinkJobState.SILENT &&
@@ -373,11 +381,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
 
 object FlinkJobStatusWatcher {
 
-  private val effectEndStates: Seq[FlinkJobState.Value] =
-    FlinkJobState.endingStates.filter(_ != FlinkJobState.LOST)
-
   /**
-   * infer flink job state before persistence. so drama, so sad.
+   * infer flink job state before persistence.
    *
    * @param current
    *   current flink job state
@@ -388,15 +393,15 @@ object FlinkJobStatusWatcher {
       current: FlinkJobState.Value,
       previous: FlinkJobState.Value): FlinkJobState.Value = {
     current match {
-      case FlinkJobState.LOST =>
-        if (effectEndStates.contains(current)) previous else 
FlinkJobState.TERMINATED
       case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED =>
         previous match {
           case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
           case FlinkJobState.FAILING => FlinkJobState.FAILED
           case _ =>
-            if (current == FlinkJobState.POS_TERMINATED) FlinkJobState.FINISHED
-            else FlinkJobState.TERMINATED
+            current match {
+              case FlinkJobState.POS_TERMINATED => FlinkJobState.FINISHED
+              case _ => FlinkJobState.TERMINATED
+            }
         }
       case _ => current
     }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 060209ff9..f0183e76b 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -127,7 +127,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
     val dockerConf = request.dockerConfig
     val baseImageTag = request.flinkBaseImage.trim
     val pushImageTag = {
-      val expectedImageTag = 
s"streamparkflinkjob-${request.k8sNamespace}-${request.clusterId}"
+      val expectedImageTag = 
s"streampark-${request.k8sNamespace}-${request.clusterId}"
       compileTag(expectedImageTag, dockerConf.registerAddress, 
dockerConf.imageNamespace)
     }
 
@@ -203,16 +203,15 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
     }.getOrElse(throw getError.exception)
 
     // Step-8:  init build workspace of ingress
-    val ingressOutputPath = request.ingressTemplate match {
-      case ingress if StringUtils.isBlank(ingress) =>
-        skipStep(8)
-        ""
+    request.ingressTemplate match {
+      case ingress if StringUtils.isBlank(ingress) => skipStep(8)
       case _ =>
         execStep(8) {
-          val ingressOutputPath =
-            IngressController.prepareIngressTemplateFiles(buildWorkspace, 
request.ingressTemplate)
-          logInfo(s"export flink ingress: $ingressOutputPath")
-          ingressOutputPath
+          val path = IngressController.prepareIngressTemplateFiles(
+            buildWorkspace,
+            request.ingressTemplate
+          )
+          logInfo(s"export flink ingress: $path")
         }.getOrElse(throw getError.exception)
     }
 

Reply via email to