This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new c3c6f3dd4 [Feat][Flink-K8s-V2] Adaptation of snapshot tracking for 
Flink on Kubernetes (#3048)
c3c6f3dd4 is described below

commit c3c6f3dd4063a505ab926c3c209f390266695bf1
Author: Linying Assad <[email protected]>
AuthorDate: Wed Sep 13 01:12:20 2023 -0500

    [Feat][Flink-K8s-V2] Adaptation of snapshot tracking for Flink on 
Kubernetes (#3048)
    
    * [Feat][flink-k8s-v2] Optimize the Flink state subscription code. #2885
    
    * [Feat][flink-k8s-v2] Parallel group processing of Flink K8s change status 
to alleviate backpressure. #2885
    
    * [Feat][flink-k8s-v2] Optimize the persistent record update logic of Flink 
job status. #2885
    
    * [Feat][flink-k8s-v2] interrupt all subscription fibers when the bean is 
destroyed . #2885
    
    * [Feat][flink-k8s-v2] Adapt to obtain aggregated Flink metrics by teamId. 
#2885
    
    * [Feat][flink-k8s-v2] Format code #2885
    
    * [Feat][flink-k8s-v2] Subscribe FlinkCluster status #2885
    
    * [Feat][flink-k8s-v2] Solve the issues of cross-compiling Java and Scala 
codes in streampark-service-console module. #2885
    
    * [Feat][flink-k8s-v2] Provide tracking adaptation for FlinkCuster on K8s. 
(#2885)
    
    * [Feat][flink-k8s-v2] Add field to Application to identify K8s CR name. 
(#2885)
    
    * [Feat][flink-k8s-v2] Restore track keys from persistent storage when 
FlinkK8sObserverBroker initializing. (#2885)
    
    * [Feat][flink-k8s-v2] Watch the mapping flink job. (#2885)
    
    * [Improve] EnvUtils minor improvement (#3040)
    
    * Flink yarn application support pyflink maven dependency (#3042)
    
    * [Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink 
session-mode jobs on Kubernetes  (#3041)
    
    * [Feature] [Flink-K8s-V2] Refactor the lifecycle control of 
Flinksession-mode jobs on Kubernetes
    
    * add Apache license header
    
    * [Improvement] add support for SinkRequest that has multiple statements 
(#3004)
    
    * add support for SinkRequest that has multiple statement
    
    * Update imports of ClickHouseWriterTask.scala
    
    * add unit test for SinkRequest
    
    * add Apache license
    
    ---------
    
    Co-authored-by: lenon <[email protected]>
    
    * [Feat][flink-k8s-v2] Add field to Application to identify K8s CR name. 
(#2885)
    
    * [Feat][flink-k8s-v2] Correct the code issues at PR ##3041
    
    * Remove zio-cache dependency.
    
    * resolve some compile issues
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
    Co-authored-by: ChengJie1053 <[email protected]>
    Co-authored-by: caicancai <[email protected]>
    Co-authored-by: lenonhere <[email protected]>
    Co-authored-by: lenon <[email protected]>
---
 pom.xml                                            |  12 +-
 .../org/apache/streampark/common/zio/ZIOExt.scala  |  12 +-
 .../org/apache/streampark/common/zio/package.scala |   9 +-
 .../streampark-console-service/pom.xml             |  31 ++
 .../main/assembly/script/schema/mysql-schema.sql   |   1 +
 .../main/assembly/script/schema/pgsql-schema.sql   |   1 +
 .../main/assembly/script/upgrade/mysql/2.2.2.sql   |  23 +-
 .../main/assembly/script/upgrade/pgsql/2.2.2.sql   |  19 +
 .../console/core/bean/AlertTemplate.java           |   2 +-
 .../console/core/entity/Application.java           |  31 +-
 .../console/core/enums/FlinkAppState.java          |   2 +-
 .../impl/ApplicationActionServiceImpl.java         |   1 +
 .../impl/ApplicationInfoServiceImpl.java           |  15 +-
 .../impl/ApplicationManageServiceImpl.java         |  40 ++-
 .../core/service/impl/FlinkClusterServiceImpl.java |  37 +-
 .../console/core/task/FlinkClusterWatcher.java     |   4 +-
 .../core/task/FlinkK8sChangeEventListener.java     |   9 +-
 .../console/core/task/FlinkK8sObserverStub.java    |  61 ++++
 .../console/core/task/FlinkK8sWatcherWrapper.java  |   2 -
 .../core/utils/FlinkK8sDataTypeConverterStub.java} |  22 +-
 .../src/main/resources/db/schema-h2.sql            |   1 +
 .../resources/mapper/core/ApplicationMapper.xml    |   1 +
 .../console/core/task/FlinkK8sChangeListener.scala | 231 ------------
 .../console/core/task/FlinkK8sObserverBroker.scala | 389 +++++++++++++++++++++
 .../core/task/FlinkK8sObserverBrokerSidecar.scala  |  99 ++++++
 .../core/utils/FlinkK8sDataTypeConverter.scala     | 123 +++++++
 .../console/core/utils/MybatisScalaExt.scala       |  59 ++++
 .../flink/client/bean/KubernetesSubmitParam.scala  |   7 +-
 .../flink/client/FlinkClientHandler.scala          |   5 +-
 .../impl/KubernetesApplicationClientV2.scala       |   7 +-
 .../client/impl/KubernetesSessionClientV2.scala    |  93 ++---
 .../streampark-flink-kubernetes-v2/pom.xml         |   2 +-
 .../flink/kubernetes/v2/model/ClusterMetrics.scala |  22 +-
 .../flink/kubernetes/v2/model/JobState.scala       |  11 +-
 .../flink/kubernetes/v2/model/JobStatus.scala      |   8 +-
 .../kubernetes/v2/observer/FlinkK8sObserver.scala  |  55 ++-
 .../v2/observer/RawClusterObserver.scala           |  19 +-
 .../flink/kubernetes/v2/operator/OprError.scala    |   3 -
 .../impl/FlinkK8sApplicationBuildPipelineV2.scala  |   3 +-
 39 files changed, 1080 insertions(+), 392 deletions(-)

diff --git a/pom.xml b/pom.xml
index fef4ec416..d23981124 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,8 @@
     <url>https://streampark.apache.org/</url>
 
     <description>
-        StreamPark, Make stream processing easier! easy-to-use streaming 
application development framework and operation platform
+        StreamPark, Make stream processing easier! easy-to-use streaming 
application development framework and operation
+        platform
     </description>
 
     <organization>
@@ -597,7 +598,9 @@
                                 <style>GOOGLE</style>
                             </googleJavaFormat>
                             <importOrder>
-                                
<order>org.apache.streampark,org.apache.streampark.shaded,org.apache,,javax,java,scala,\#</order>
+                                <order>
+                                    
org.apache.streampark,org.apache.streampark.shaded,org.apache,,javax,java,scala,\#
+                                </order>
                             </importOrder>
                             <replaceRegex>
                                 <name>Remove wildcard imports</name>
@@ -614,7 +617,7 @@
                                 
<searchRegex>import\s+org\.junit\.[^jupiter][^\*\s]*(|\*);(\r\n|\r|\n)</searchRegex>
                                 <replacement>$1</replacement>
                             </replaceRegex>
-                            <removeUnusedImports />
+                            <removeUnusedImports/>
                         </java>
                         <scala>
                             <scalafmt>
@@ -690,7 +693,8 @@
                     <configuration>
                         <shadedArtifactId>true</shadedArtifactId>
                         
<createDependencyReducedPom>true</createDependencyReducedPom>
-                        
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                        
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml
+                        </dependencyReducedPomLocation>
                         <artifactSet>
                             <includes>
                                 <!--
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
index 7baa195d3..1aa2a7f40 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.common.zio
 
 import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO}
-import zio.stream.ZStream
+import zio.stream.{UStream, ZStream}
 
 import scala.util.Try
 
@@ -110,4 +110,14 @@ object ZIOExt {
       .map { case (_, cur) => cur }
   }
 
+  implicit class ZStreamOptionEffectOps[R, E, A](zstream: ZStream[R, E, 
Option[A]]) {
+
+    /** Filter Some value and flatten the value */
+    @inline def filterSome: ZStream[R, E, A] = 
zstream.filter(_.isDefined).map(_.get)
+  }
+
+  implicit class IterableZStreamConverter[A](iter: Iterable[A]) {
+    @inline def asZStream: UStream[A] = ZStream.fromIterable(iter)
+  }
+
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
index 673773b9e..82ff0f360 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
@@ -18,14 +18,15 @@
 package org.apache.streampark.common
 
 import scala.language.implicitConversions
+import scala.util.Try
 
 package object zio {
 
   /** Similar to python's pprint, format print any type of instance. */
-  @inline def toPrettyString(value: Any): String = value match {
+  @inline def toPrettyString(value: Any): String = Try(value match {
     case v: String => v
     case v => pprint.apply(v, height = 2000, showFieldNames = true).render
-  }
+  }).getOrElse(value.toString)
 
   implicit class PrettyStringOps(value: Any) {
     @inline def prettyStr: String = toPrettyString(value)
@@ -34,4 +35,8 @@ package object zio {
   /** Automatically converts value to Some value. */
   implicit def liftValueAsSome[A](value: A): Option[A] = Some(value)
 
+  implicit class OptionTraversableOps[A](iter: Traversable[Option[A]]) {
+    def filterSome: Traversable[A] = iter.filter(_.isDefined).map(_.get)
+  }
+
 }
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index 2e6156603..090b8ea76 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -588,6 +588,37 @@
                 </executions>
             </plugin>
 
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <configuration>
+                    <compileOrder>JavaThenScala</compileOrder>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${maven-spotless-plugin.version}</version>
+                <configuration>
+                    <scala>
+                        <scalafmt>
+                            <version>${spotless.scalafmt.version}</version>
+                            
<file>${maven.multiModuleProjectDirectory}/.scalafmt-fp.conf</file>
+                        </scalafmt>
+                    </scala>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>spotless-check</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
         </plugins>
     </build>
 
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 52b885c7d..08a1693a3 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -64,6 +64,7 @@ create table `t_flink_app` (
   `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
   `version_id` bigint default null,
   `cluster_id` varchar(45) collate utf8mb4_general_ci default null,
+  `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
   `k8s_namespace` varchar(63) collate utf8mb4_general_ci default null,
   `flink_image` varchar(128) collate utf8mb4_general_ci default null,
   `state` int default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index a9e5ffa24..eef89390c 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -212,6 +212,7 @@ create table "public"."t_flink_app" (
   "job_manager_url" varchar(255) collate "pg_catalog"."default",
   "version_id" int8,
   "cluster_id" varchar(45) collate "pg_catalog"."default",
+  "k8s_name" varchar(63) collate "pg_catalog"."default",
   "k8s_namespace" varchar(63) collate "pg_catalog"."default",
   "flink_image" varchar(128) collate "pg_catalog"."default",
   "state" int2,
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql
similarity index 63%
copy from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
copy to 
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql
index 7ef4bb7da..9fe81443d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql
@@ -15,20 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.kubernetes.v2.model
+use streampark;
+
+set names utf8mb4;
+set foreign_key_checks = 0;
+
+alter table `t_flink_app`
+    add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null;
 
-/**
- * Flink custer metrics.
- *
- * see: [[org.apache.streampark.flink.kubernetes.model.FlinkMetricCV]]
- */
-case class ClusterMetrics(
-    totalJmMemory: Integer = 0,
-    totalTmMemory: Integer = 0,
-    totalTm: Integer = 0,
-    totalSlot: Integer = 0,
-    availableSlot: Integer = 0,
-    runningJob: Integer = 0,
-    finishedJob: Integer = 0,
-    cancelledJob: Integer = 0,
-    failedJob: Integer = 0)
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.2.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.2.sql
new file mode 100644
index 000000000..b535a7832
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.2.sql
@@ -0,0 +1,19 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+alter table "public"."t_flink_app"
+    add column "k8s_name" varchar(63) collate "pg_catalog"."default";
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 1607ccb1b..733fa020c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -89,7 +89,7 @@ public class AlertTemplate implements Serializable {
     return new AlertTemplateBuilder()
         .setDuration(cluster.getStartTime(), cluster.getEndTime())
         .setJobName(cluster.getClusterName())
-        .setLink(ExecutionMode.YARN_SESSION, cluster.getClusterId())
+        .setLink(cluster.getExecutionModeEnum(), cluster.getClusterId())
         .setStartTime(cluster.getStartTime())
         .setEndTime(cluster.getEndTime())
         .setType(3)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8eb0e39ed..049af6f13 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -100,9 +100,26 @@ public class Application implements Serializable {
   /** flink docker base image */
   private String flinkImage;
 
+  /** The resource name of the flink job on k8s, equivalent to clusterId in 
application mode. */
+  private String k8sName;
+
   /** k8s namespace */
   private String k8sNamespace = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE();
 
+  /** The exposed type of the rest service of 
K8s(kubernetes.rest-service.exposed.type) */
+  private Integer k8sRestExposedType;
+  /** flink kubernetes pod template */
+  private String k8sPodTemplate;
+
+  private String k8sJmPodTemplate;
+  private String k8sTmPodTemplate;
+
+  private String ingressTemplate;
+  private String defaultModeIngress;
+
+  /** flink-hadoop integration on flink-k8s mode */
+  private Boolean k8sHadoopIntegration;
+
   private Integer state;
   /** task release status */
   @TableField("`release`")
@@ -189,23 +206,9 @@ public class Application implements Serializable {
 
   private Date modifyTime;
 
-  /** The exposed type of the rest service of 
K8s(kubernetes.rest-service.exposed.type) */
-  private Integer k8sRestExposedType;
-  /** flink kubernetes pod template */
-  private String k8sPodTemplate;
-
-  private String k8sJmPodTemplate;
-  private String k8sTmPodTemplate;
-
-  private String ingressTemplate;
-  private String defaultModeIngress;
-
   /** 1: cicd (build from csv) 2: upload (upload local jar job) */
   private Integer resourceFrom;
 
-  /** flink-hadoop integration on flink-k8s mode */
-  private Boolean k8sHadoopIntegration;
-
   private String tags;
 
   /** running job */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 87c7493f3..84bf447f3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -139,7 +139,7 @@ public enum FlinkAppState {
 
   /**
    * Type conversion bridging Deprecated, see {@link
-   * org.apache.streampark.console.core.utils.FlinkAppStateConverter}
+   * org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter}
    */
   @Deprecated
   public static class Bridge {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 4c836469f..e9d5d6906 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -419,6 +419,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     KubernetesSubmitParam kubernetesSubmitParam =
         KubernetesSubmitParam.apply(
             application.getClusterId(),
+            application.getK8sName(),
             application.getK8sNamespace(),
             application.getFlinkImage(),
             application.getK8sRestExposedTypeEnum());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index b26de031f..41930d903 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.application.impl;
 
+import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.LfsOperator;
@@ -41,6 +42,7 @@ import 
org.apache.streampark.console.core.service.SavePointService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import org.apache.streampark.console.core.task.FlinkClusterWatcher;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import 
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
@@ -96,6 +98,8 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
 
   @Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
 
+  @Autowired private FlinkK8sObserverStub flinkK8sObserver;
+
   @Autowired private FlinkClusterService flinkClusterService;
 
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
@@ -149,7 +153,12 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
     }
 
     // merge metrics from flink kubernetes cluster
-    FlinkMetricCV k8sMetric = 
k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
+    FlinkMetricCV k8sMetric;
+    if (K8sFlinkConfig.isV2Enabled()) {
+      k8sMetric = flinkK8sObserver.getAggClusterMetricCV(teamId);
+    } else {
+      k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
+    }
     if (k8sMetric != null) {
       totalJmMemory += k8sMetric.totalJmMemory();
       totalTmMemory += k8sMetric.totalTmMemory();
@@ -452,7 +461,11 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
     boolean mapping = this.baseMapper.mapping(appParam);
     Application application = getById(appParam.getId());
     if (isKubernetesApp(application)) {
+      // todo mark
       k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+      if (K8sFlinkConfig.isV2Enabled()) {
+        flinkK8sObserver.watchApplication(application);
+      }
     } else {
       FlinkHttpWatcher.doWatching(application);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 11b0f59a1..8f8ada3ac 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.zio.ZIOJavaUtil;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -55,8 +54,9 @@ import 
org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
+import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.commons.lang3.StringUtils;
@@ -127,6 +127,10 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
   @Autowired private ResourceService resourceService;
 
+  @Autowired private FlinkK8sObserverStub flinkK8sObserver;
+
+  @Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
@@ -182,7 +186,7 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     if (isKubernetesApp(application)) {
       k8SFlinkTrackMonitor.unWatching(toTrackId(application));
       if (K8sFlinkConfig.isV2Enabled()) {
-        ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId()));
+        flinkK8sObserver.unWatchById(application.getId());
       }
     } else {
       FlinkHttpWatcher.unWatching(appParam.getId());
@@ -314,6 +318,18 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
       
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new 
File(jarPath)));
     }
 
+    if (shouldHandleK8sName(appParam)) {
+      switch (appParam.getExecutionModeEnum()) {
+        case KUBERNETES_NATIVE_APPLICATION:
+          appParam.setK8sName(appParam.getClusterId());
+          break;
+        case KUBERNETES_NATIVE_SESSION:
+          appParam.setK8sName(
+              
flinkK8sDataTypeConverter.genSessionJobK8sCRName(appParam.getClusterId()));
+          break;
+      }
+    }
+
     if (save(appParam)) {
       if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
         FlinkSql flinkSql = new FlinkSql(appParam);
@@ -328,6 +344,10 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     }
   }
 
+  private boolean shouldHandleK8sName(Application app) {
+    return K8sFlinkConfig.isV2Enabled() && 
ExecutionMode.isKubernetesMode(app.getExecutionMode());
+  }
+
   private boolean existsByJobName(String jobName) {
     return baseMapper.exists(
         new LambdaQueryWrapper<Application>().eq(Application::getJobName, 
jobName));
@@ -528,6 +548,20 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
         break;
     }
 
+    if (shouldHandleK8sName(appParam)) {
+      switch (appParam.getExecutionModeEnum()) {
+        case KUBERNETES_NATIVE_APPLICATION:
+          application.setK8sName(appParam.getClusterId());
+          break;
+        case KUBERNETES_NATIVE_SESSION:
+          if (!Objects.equals(application.getClusterId(), 
appParam.getClusterId())) {
+            application.setK8sName(
+                
flinkK8sDataTypeConverter.genSessionJobK8sCRName(appParam.getClusterId()));
+          }
+          break;
+      }
+    }
+
     // Flink Sql job...
     if (application.isFlinkSqlJob()) {
       updateFlinkSqlJob(application, appParam);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 3bbe93d94..cf6221cb7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -32,6 +33,7 @@ import 
org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import org.apache.streampark.console.core.task.FlinkClusterWatcher;
+import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
 import org.apache.streampark.flink.client.bean.DeployResponse;
@@ -96,6 +98,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private FlinkClusterWatcher flinkClusterWatcher;
 
+  @Autowired private FlinkK8sObserverStub flinkK8sObserver;
+
   @Override
   public ResponseResult check(FlinkCluster cluster) {
     ResponseResult result = new ResponseResult();
@@ -159,6 +163,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
       FlinkClusterWatcher.addWatching(flinkCluster);
     }
+    if (shouldWatchForK8s(flinkCluster)) {
+      flinkK8sObserver.watchFlinkCluster(flinkCluster);
+    }
     return ret;
   }
 
@@ -186,6 +193,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setEndTime(null);
       updateById(flinkCluster);
       FlinkClusterWatcher.addWatching(flinkCluster);
+      if (shouldWatchForK8s(flinkCluster)) {
+        flinkK8sObserver.watchFlinkCluster(flinkCluster);
+      }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setClusterState(ClusterState.FAILED.getValue());
@@ -225,6 +235,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
       flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
     }
+    if (shouldWatchForK8s(flinkCluster)) {
+      flinkK8sObserver.watchFlinkCluster(flinkCluster);
+    }
     updateById(flinkCluster);
   }
 
@@ -260,11 +273,17 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     checkActiveIfNeeded(flinkCluster);
 
     // 3) check job if running on cluster
-    boolean existsRunningJob =
-        applicationInfoService.existsRunningByClusterId(flinkCluster.getId());
-    ApiAlertException.throwIfTrue(
-        existsRunningJob, "Some app is running on this cluster, the cluster 
cannot be shutdown");
-
+    if (shouldWatchForK8s(cluster)) {
+      boolean existActiveJobs = 
flinkK8sObserver.existActiveJobsOnFlinkCluster(flinkCluster);
+      ApiAlertException.throwIfTrue(
+          existActiveJobs,
+          "Due to the presence of active jobs on the cluster, the cluster 
should not be shutdown");
+    } else {
+      boolean existsRunningJob =
+          
applicationInfoService.existsRunningByClusterId(flinkCluster.getId());
+      ApiAlertException.throwIfTrue(
+          existsRunningJob, "Some app is running on this cluster, the cluster 
cannot be shutdown");
+    }
     return true;
   }
 
@@ -334,6 +353,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
           ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Flink cluster is running, cannot be delete, please check.");
     }
+    if (shouldWatchForK8s(flinkCluster)) {
+      flinkK8sObserver.unwatchFlinkCluster(flinkCluster);
+    }
 
     ApiAlertException.throwIfTrue(
         applicationInfoService.existsByClusterId(id),
@@ -454,4 +476,9 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
     return null;
   }
+
+  private boolean shouldWatchForK8s(FlinkCluster flinkCluster) {
+    return K8sFlinkConfig.isV2Enabled()
+        && 
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode());
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 5924aa815..200a548a8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -100,6 +100,7 @@ public class FlinkClusterWatcher {
         flinkClusterService.list(
             new LambdaQueryWrapper<FlinkCluster>()
                 .eq(FlinkCluster::getClusterState, 
ClusterState.RUNNING.getValue())
+                // excluding flink clusters on kubernetes
                 .notIn(FlinkCluster::getExecutionMode, 
ExecutionMode.getKubernetesMode()));
     flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), 
cluster));
   }
@@ -264,7 +265,8 @@ public class FlinkClusterWatcher {
    * @param flinkCluster
    */
   public static void addWatching(FlinkCluster flinkCluster) {
-    if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+    if (!ExecutionMode.isKubernetesMode(flinkCluster.getExecutionModeEnum())
+        && !WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
       log.info("add the cluster with id:{} to watcher cluster cache", 
flinkCluster.getId());
       WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 5da80337b..550a9a41e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -54,7 +54,14 @@ import scala.Enumeration;
 import static 
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.fromK8sFlinkJobState;
 import static 
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8sFlinkJobState;
 
-/** Event Listener for K8sFlinkTrackMonitor */
+/**
+ * Event Listener for K8sFlinkTrackMonitor。
+ *
+ * <p>Use FlinkK8sChangeListenerV2 listeners instead:
+ *
+ * @link org.apache.streampark.console.core.task.FlinkK8sChangeListenerV2
+ */
+@Deprecated
 @Component
 public class FlinkK8sChangeEventListener {
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sObserverStub.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sObserverStub.java
new file mode 100644
index 000000000..ffc282026
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sObserverStub.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
+import org.apache.streampark.flink.kubernetes.v2.model.ClusterMetrics;
+
+/**
+ * Stub for exposing methods of FlinkK8sObserver used to adapt the Java code 
invocation on
+ * streampark-console-service.
+ */
+public interface FlinkK8sObserverStub {
+
+  /** Stub method: Get aggregated metrics of all flink jobs on k8s cluster by 
team-id */
+  ClusterMetrics getAggClusterMetric(Long teamId);
+
+  /** Stub method: Compatible with old code of flink-k8s-v1. */
+  FlinkMetricCV getAggClusterMetricCV(Long teamId);
+
+  /** Stub method: Whether there are active jobs on the Flink cluster. */
+  boolean existActiveJobsOnFlinkCluster(FlinkCluster flinkCluster);
+
+  /**
+   * Stub method: Add FlinkCluster to the watchlist. Normally, after a 
successful deployment of a
+   * Flink Cluster, the relevant resources would be self-tracked
+   */
+  void watchFlinkCluster(FlinkCluster flinkCluster);
+
+  /**
+   * Stub method: Add Application to the watchlist. Normally, after a 
successful deployment of a
+   * Flink App, the relevant resources would be self-tracked
+   */
+  void watchApplication(Application app);
+
+  /**
+   * Stub method: Notify FlinkK8sObserver to remove FlinkCluster from the 
watchlist. When there are
+   * associated SessionJobs with FlinkCluster, the tracking of FlinkCluster 
will not be removed in
+   * reality.
+   */
+  void unwatchFlinkCluster(FlinkCluster flinkCluster);
+
+  /** Stub method: Notify FlinkK8sObserver to remove tracking resources by 
TrackKey.id. */
+  void unWatchById(Long id);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 32f7a2391..00901367e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -89,8 +89,6 @@ public class FlinkK8sWatcherWrapper {
       // recovery tracking list
       List<TrackId> k8sApp = getK8sWatchingApps();
       k8sApp.forEach(trackMonitor::doWatching);
-    } else {
-      // TODO [flink-k8s-v2] Recovery tracking list and invoke 
FlinkK8sObserver.track()
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
similarity index 50%
rename from 
streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
index fc9449a97..c9d9bf8a8 100644
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
@@ -15,24 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.core.utils
+package org.apache.streampark.console.core.utils;
 
-import org.apache.streampark.console.core.enums.FlinkAppState
-import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState
-import 
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
-
-import scala.util.Try
-
-object FlinkAppStateConverter {
-
-  /** Convert [[EvalJobState]] to [[FlinkAppState]]. */
-  def k8sEvalJobStateToFlinkAppState(jobState: EvalJobState): FlinkAppState = {
-    
Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER)
-  }
-
-  /** Convert [[FlinkAppState]] to [[EvalJobState]]. */
-  def flinkAppStateToK8sEvalJobState(jobState: FlinkAppState): EvalJobState = {
-    EvalJobState.values.find(e => e.toString == 
jobState.toString).getOrElse(EvalJobState.OTHER)
-  }
+public interface FlinkK8sDataTypeConverterStub {
 
+  /** Create default name for Flink SessionJob CR for k8s-native 
compatibility. */
+  String genSessionJobK8sCRName(String clusterId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 16b6503f1..f6b88d4b4 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ 
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -57,6 +57,7 @@ create table if not exists `t_flink_app` (
   `job_manager_url` varchar(255)  default null,
   `version_id` bigint default null,
   `cluster_id` varchar(45)  default null,
+  `k8s_name` varchar(63)  default null,
   `k8s_namespace` varchar(63)  default null,
   `flink_image` varchar(128)  default null,
   `state` int default null,
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 1e4cdb585..045a8f850 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -32,6 +32,7 @@
         <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
         <result column="flink_cluster_id" jdbcType="BIGINT" 
property="flinkClusterId"/>
         <result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
+        <result column="k8s_name" jdbcType="VARCHAR" property="k8sName"/>
         <result column="k8s_namespace" jdbcType="VARCHAR" 
property="k8sNamespace"/>
         <result column="app_type" jdbcType="INTEGER" property="appType"/>
         <result column="job_type" jdbcType="INTEGER" property="jobType"/>
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
deleted file mode 100644
index f4e00effe..000000000
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.task
-
-import 
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
 RefMapExtension}
-import org.apache.streampark.common.zio.ZIOExt.IOOps
-import org.apache.streampark.console.core.bean.AlertTemplate
-import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
-import org.apache.streampark.console.core.enums.{FlinkAppState, OptionState}
-import org.apache.streampark.console.core.service.FlinkClusterService
-import org.apache.streampark.console.core.service.alert.AlertService
-import 
org.apache.streampark.console.core.service.application.{ApplicationInfoService, 
ApplicationManageService}
-import org.apache.streampark.console.core.utils.FlinkAppStateConverter
-import org.apache.streampark.flink.kubernetes.v2.model._
-import org.apache.streampark.flink.kubernetes.v2.observer.{FlinkK8sObserver, 
Name, Namespace}
-import 
org.apache.streampark.flink.kubernetes.v2.operator.OprError.TrackKeyNotFound
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.context.annotation.Lazy
-import org.springframework.stereotype.Component
-import zio.{UIO, ZIO}
-import zio.ZIO.logError
-import zio.ZIOAspect.annotated
-
-import java.util.Date
-
-@Component
-class FlinkK8sChangeListener {
-  @Lazy @Autowired
-  private var applicationManageService: ApplicationManageService = _
-
-  @Lazy
-  @Autowired
-  private var applicationInfoService: ApplicationInfoService = _
-
-  @Lazy @Autowired
-  private var flinkClusterService: FlinkClusterService = _
-  @Lazy @Autowired
-  private var alertService: AlertService = _
-
-  subscribeJobStatusChange.forkDaemon.runIO
-  subscribeMetricsChange.forkDaemon.runIO
-  subscribeRestSvcEndpointChange.forkDaemon.runIO
-
-  private val alterStateList =
-    Array(
-      FlinkAppState.FAILED,
-      FlinkAppState.LOST,
-      FlinkAppState.RESTARTING,
-      FlinkAppState.FINISHED)
-
-  def subscribeJobStatusChange: UIO[Unit] = {
-    FlinkK8sObserver.evaluatedJobSnaps
-      .flatSubscribeValues()
-      // Get Application records and convert JobSnapshot to Application
-      .mapZIO {
-        jobSnap =>
-          ZIO
-            .attemptBlocking {
-              Option(applicationManageService.getById(jobSnap.appId))
-                .map(app => setByJobStatusCV(app, jobSnap))
-            }
-            .catchAll {
-              err =>
-                logError(s"Fail to get Application records: ${err.getMessage}")
-                  .as(None) @@ annotated("appId" -> jobSnap.appId.toString)
-            }
-      }
-      .filter(_.nonEmpty)
-      .map(_.get)
-      // Save Application records
-      .tap {
-        app =>
-          ZIO
-            .attemptBlocking(applicationInfoService.persistMetrics(app))
-            .retryN(3)
-            .tapError(err => logError(s"Fail to persist Application status: 
${err.getMessage}"))
-            .ignore @@ annotated("appId" -> app.getAppId)
-      }
-      // Alert for unhealthy states in parallel
-      .mapZIOPar(10) {
-        app =>
-          val state = FlinkAppState.of(app.getState)
-          ZIO
-            .attemptBlocking(alertService.alert(app.getAlertId(), 
AlertTemplate.of(app, state)))
-            .when(alterStateList.contains(state))
-            .retryN(3)
-            .tapError(
-              err => logError(s"Fail to alter unhealthy application state: 
${err.getMessage}"))
-            .ignore @@ annotated("appId" -> app.getAppId, "state" -> 
state.toString)
-      }
-      .runDrain
-  }
-
-  def subscribeMetricsChange: UIO[Unit] = {
-    FlinkK8sObserver.clusterMetricsSnaps
-      .flatSubscribe()
-      .mapZIO {
-        metricsSnap =>
-          ZIO
-            .attemptBlocking {
-              val namespaceAndName: (Namespace, Name) = metricsSnap._1
-              val trackKey: ZIO[Any, TrackKeyNotFound, TrackKey] = 
FlinkK8sObserver.trackedKeys
-                .find(
-                  trackedKey =>
-                    trackedKey.clusterNamespace == namespaceAndName._1 && 
trackedKey.clusterName == namespaceAndName._2)
-                .someOrFail(TrackKeyNotFound(namespaceAndName._1, 
namespaceAndName._2))
-
-              Option(applicationManageService.getById(trackKey.map(_.id)), 
metricsSnap._2)
-            }
-            .catchAll {
-              err =>
-                logError(s"Fail to get Application records: ${err.getMessage}")
-                  .as(None) @@ annotated("name" -> metricsSnap._1._2)
-            }
-      }
-      .filter(_.nonEmpty)
-      .map(_.get)
-      .tap {
-        appAndMetrics =>
-          ZIO
-            .attemptBlocking {
-              val app: Application = appAndMetrics._1
-              val clusterMetrics: ClusterMetrics = appAndMetrics._2
-              app.setJmMemory(clusterMetrics.totalJmMemory)
-              app.setTmMemory(clusterMetrics.totalTmMemory)
-              app.setTotalTM(clusterMetrics.totalTm)
-              app.setTotalSlot(clusterMetrics.totalSlot)
-              app.setAvailableSlot(clusterMetrics.availableSlot)
-              applicationInfoService.persistMetrics(app)
-            }
-            .retryN(3)
-            .tapError(err => logError(s"Fail to persist Application Metrics: 
${err.getMessage}"))
-            .ignore @@ annotated("appId" -> appAndMetrics._1.getAppId)
-      }
-      .runDrain
-  }
-
-  def subscribeRestSvcEndpointChange: UIO[Unit] = {
-    FlinkK8sObserver.restSvcEndpointSnaps
-      .flatSubscribe()
-      .foreach {
-        restSvcEndpointSnap =>
-          ZIO
-            .attempt {
-
-              val namespaceAndName: (Namespace, Name) = restSvcEndpointSnap._1
-              val trackKey: ZIO[Any, TrackKeyNotFound, TrackKey] = 
FlinkK8sObserver.trackedKeys
-                .find(
-                  trackedKey =>
-                    trackedKey.clusterNamespace == namespaceAndName._1 && 
trackedKey.clusterName == namespaceAndName._2)
-                .someOrFail(TrackKeyNotFound(namespaceAndName._1, 
namespaceAndName._2))
-              val restSvcEndpoint: RestSvcEndpoint = restSvcEndpointSnap._2
-
-              val app: Application = 
applicationManageService.getById(trackKey.map(_.id))
-
-              val flinkCluster: FlinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId)
-
-              if (restSvcEndpoint == null || restSvcEndpoint.ipRest == null) 
return ZIO.unit
-              val url = restSvcEndpoint.ipRest
-              app.setFlinkRestUrl(url)
-              applicationInfoService.persistMetrics(app)
-
-              flinkCluster.setAddress(url)
-              flinkClusterService.update(flinkCluster)
-
-            }
-            .retryN(3)
-            .ignore
-      }
-  }
-
-  private def setByJobStatusCV(app: Application, jobSnapshot: JobSnapshot): 
Application = { // infer the final flink job state
-    val state: FlinkAppState =
-      
FlinkAppStateConverter.k8sEvalJobStateToFlinkAppState(jobSnapshot.evalState)
-    val jobStatusOption: Option[JobStatus] = jobSnapshot.jobStatus
-
-    if (jobStatusOption.nonEmpty) {
-      val jobStatus: JobStatus = jobStatusOption.get
-      // corrective start-time / end-time / duration
-      val preStartTime: Long =
-        if (app.getStartTime != null) app.getStartTime.getTime
-        else 0
-
-      val startTime: Long = Math.max(jobStatus.startTs, preStartTime)
-      val preEndTime: Long =
-        if (app.getEndTime != null) app.getEndTime.getTime
-        else 0
-      var endTime: Long = Math.max(jobStatus.endTs.getOrElse(-1L), preEndTime)
-      var duration: Long = if (app.getDuration != null) app.getDuration else 0
-      if (FlinkAppState.isEndState(state.getValue)) {
-        if (endTime < startTime) endTime = System.currentTimeMillis
-        if (duration <= 0) duration = endTime - startTime
-      }
-      app.setJobId(jobStatus.jobId)
-      val totalTask = if (jobStatus.tasks.nonEmpty) jobStatus.tasks.get.total 
else 0
-      app.setTotalTask(totalTask)
-      app.setStartTime(
-        new Date(
-          if (startTime > 0) startTime
-          else 0))
-      app.setEndTime(
-        if (endTime > 0 && endTime >= startTime) new Date(endTime)
-        else null)
-      app.setDuration(
-        if (duration > 0) duration
-        else 0)
-    }
-
-    app.setState(state.getValue)
-    // when a flink job status change event can be received, it means
-    // that the operation command sent by streampark has been completed.
-    app.setOptionState(OptionState.NONE.getValue)
-    app
-  }
-}
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
new file mode 100644
index 000000000..66251914c
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task
+
+import org.apache.streampark.common.conf.K8sFlinkConfig
+import org.apache.streampark.common.enums.{ClusterState, ExecutionMode}
+import org.apache.streampark.common.zio.{OptionTraversableOps, PrettyStringOps}
+import 
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
 RefMapExtension}
+import org.apache.streampark.common.zio.ZIOExt.{IterableZStreamConverter, 
OptionZIOOps, UIOOps, ZStreamOptionEffectOps}
+import org.apache.streampark.console.core.bean.AlertTemplate
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.enums.{FlinkAppState, OptionState}
+import org.apache.streampark.console.core.service.FlinkClusterService
+import org.apache.streampark.console.core.service.alert.AlertService
+import 
org.apache.streampark.console.core.service.application.ApplicationInfoService
+import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter
+import 
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.{applicationToTrackKey,
 clusterMetricsToFlinkMetricCV, flinkClusterToClusterKey, 
k8sDeployStateToClusterState}
+import org.apache.streampark.console.core.utils.MybatisScalaExt.{lambdaQuery, 
lambdaUpdate, LambdaQueryOps, LambdaUpdateOps}
+import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
+import org.apache.streampark.flink.kubernetes.v2.model._
+import 
org.apache.streampark.flink.kubernetes.v2.model.TrackKey.{ApplicationJobKey, 
ClusterKey}
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
+import 
org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserverSnapSubscriptionHelper.{ClusterMetricsSnapsSubscriptionOps,
 DeployCRSnapsSubscriptionOps, RestSvcEndpointSnapsSubscriptionOps}
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Component
+import zio.{Fiber, Ref, UIO, ZIO}
+import zio.ZIO.{interruptible, logInfo}
+import zio.ZIOAspect.annotated
+import zio.stream.UStream
+
+import javax.annotation.{PostConstruct, PreDestroy}
+
+import java.lang
+import java.util.Date
+
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
+/** Broker of FlinkK8sObserver which is the observer for Flink on Kubernetes */
+@Component
+class FlinkK8sObserverBroker @Autowired() (
+    var applicationInfoService: ApplicationInfoService,
+    var flinkClusterService: FlinkClusterService,
+    var alertService: AlertService)
+  extends FlinkK8sObserverStub
+  with FlinkK8sObserverBrokerSidecar { it =>
+
+  private val observer = FlinkK8sObserver
+
+  private val alertJobStateList: Array[FlinkAppState] = Array(
+    FlinkAppState.FAILED,
+    FlinkAppState.LOST,
+    FlinkAppState.RESTARTING,
+    FlinkAppState.FINISHED
+  )
+
+  private val alertClusterStateList = Array(
+    ClusterState.FAILED,
+    ClusterState.UNKNOWN,
+    ClusterState.LOST,
+    ClusterState.KILLED
+  )
+
+  private lazy val allDaemonEffects: Array[UIO[Unit]] = Array(
+    subscribeJobStatusChange,
+    subscribeApplicationJobMetricsChange,
+    subscribeApplicationJobRestSvcEndpointChange,
+    subscribeGlobalClusterMetricChange,
+    subscribeClusterRestSvcEndpointChange,
+    subscribeClusterStateChange
+  )
+
+  // All fibers running on the daemon。
+  private val daemonFibers   = Ref.make(Array.empty[Fiber.Runtime[Nothing, 
Unit]]).runUIO
+  // Aggregated flink cluster metrics by teamId
+  private val aggFlinkMetric = Ref.make(Map.empty[Long, ClusterMetrics]).runUIO
+
+  @PostConstruct
+  def init(): Unit = {
+    val effect: UIO[Unit] = for {
+      // launch all subscription fibers
+      fibers       <- 
ZIO.foreach(allDaemonEffects)(interruptible(_).forkDaemon)
+      // async restore track keys from persistent storage
+      restoreFiber <- restoreTrackKeyRecords.forkDaemon
+      _            <- daemonFibers.set(fibers :+ restoreFiber)
+      _            <- logInfo("Launch FlinkK8sChangeListenerV2.")
+    } yield ()
+    effect.when(K8sFlinkConfig.isV2Enabled).runUIO
+  }
+
+  @PreDestroy
+  def destroy(): Unit = {
+    daemonFibers.get.flatMap(fibers => ZIO.foreach(fibers)(_.interrupt)).runUIO
+  }
+
+  /** Restore track list from persistent storage into FlinkK8sObserver. */
+  private def restoreTrackKeyRecords: UIO[Unit] = {
+
+    val fromApplicationRecords: UIO[Unit] = it
+      .safeFindApplication(
+        lambdaQuery[Application].typedIn(_.getExecutionMode, 
ExecutionMode.getKubernetesMode.asScala)
+      )(10)
+      .map(apps => apps.map(app => 
applicationToTrackKey(app)).filterSome.toVector)
+      .tap(keys => logInfo(s"Restore Flink K8s track-keys from Application 
records:\n${keys.prettyStr}"))
+      .flatMap(keys => ZIO.foreachDiscard(keys)(observer.track))
+
+    val fromFlinkClusterRecords: UIO[Unit] = it
+      .safeFindFlinkClusterRecord(
+        lambdaQuery[FlinkCluster].typedEq(_.getExecutionMode, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getMode)
+      )(10)
+      .map(clusters => clusters.map(e => TrackKey.cluster(e.getId, 
e.getK8sNamespace, e.getClusterId)))
+      .tap(keys => logInfo(s"Restore Flink K8s track-keys from FlinkCluster 
records:\n${keys.prettyStr}"))
+      .flatMap(keys => ZIO.foreachDiscard(keys)(observer.track))
+
+    fromApplicationRecords <&> fromFlinkClusterRecords
+  }
+
+  /** Subscribe Flink job status change from FlinkK8sObserver */
+  private def subscribeJobStatusChange: UIO[Unit] = {
+
+    def process(subStream: UStream[JobSnapshot]): UStream[Unit] = subStream
+      // Convert EvalJobState to FlinkAppState
+      .map(snap => snap -> 
FlinkK8sDataTypeConverter.k8sEvalJobStateToFlinkAppState(snap.evalState))
+      // Update the corresponding columns of Application record
+      .tap { case (snap: JobSnapshot, convertedState: FlinkAppState) =>
+        safeUpdateApplicationRecord(snap.appId) {
+
+          var update = lambdaUpdate[Application]
+            .typedSet(_.getState, convertedState.getValue)
+            .typedSet(_.getOptions, OptionState.NONE.getValue)
+          // update JobStatus related columns
+          snap.jobStatus.foreach { status =>
+            update = update
+              .typedSet(_.getJobId, status.jobId)
+              .typedSet(_.getStartTime, new Date(status.startTs))
+              .typedSet(_.getEndTime, status.endTs.map(new Date(_)).orNull)
+              .typedSet(_.getDuration, status.duration)
+              .typedSet(_.getTotalTask, status.tasks.map(_.total).getOrElse(0))
+          }
+          // Copy the logic from 
resources/mapper/core/ApplicationMapper.xml:persistMetrics
+          if (FlinkAppState.isEndState(convertedState.getValue)) {
+            update = update
+              .typedSet(_.getTotalTM, null)
+              .typedSet(_.getTotalSlot, null)
+              .typedSet(_.getTotalSlot, null)
+              .typedSet(_.getAvailableSlot, null)
+              .typedSet(_.getJmMemory, null)
+              .typedSet(_.getTmMemory, null)
+          }
+          update
+        }
+      }
+      // Alert for unhealthy job states in parallel
+      .filter { case (_, state) => alertJobStateList.contains(state) }
+      .mapZIOPar(5) { case (snap, state) =>
+        safeGetApplicationRecord(snap.appId).someOrUnitZIO { app =>
+          ZIO
+            .attemptBlocking(alertService.alert(app.getAlertId, 
AlertTemplate.of(app, state)))
+            .retryN(3)
+            .tapError(err => logInfo(s"Fail to alter unhealthy state: 
${err.getMessage}"))
+            .ignore @@
+          annotated("appId" -> app.getId.toString, "state" -> state.toString)
+        }
+      }
+
+    observer.evaluatedJobSnaps
+      .flatSubscribeValues()
+      // Handle events grouped by appId in parallel while each appId group 
would be executed in serial.
+      .groupByKey(_.appId) { case (_, substream) => process(substream) }
+      .runDrain
+  }
+
+  /** Subscribe Flink application job metrics change from FlinkK8sObserver */
+  private def subscribeApplicationJobMetricsChange: UIO[Unit] = {
+    observer.clusterMetricsSnaps
+      .flatSubscribe()
+      // Combine with the corresponding ApplicationJobKey
+      .combineWithTypedTrackKey[ApplicationJobKey]
+      .filterSome
+      .groupByKey(_._1.id) { case (_, substream) =>
+        // Update metrics info of the corresponding Application record
+        substream.mapZIO { case (trackKey: ApplicationJobKey, metrics: 
ClusterMetrics) =>
+          safeUpdateApplicationRecord(trackKey.id)(
+            lambdaUpdate[Application]
+              .typedSet(_.getJmMemory, metrics.totalJmMemory)
+              .typedSet(_.getTmMemory, metrics.totalTmMemory)
+              .typedSet(_.getTotalTM, metrics.totalTm)
+              .typedSet(_.getTotalSlot, metrics.totalSlot)
+              .typedSet(_.getAvailableSlot, metrics.availableSlot))
+        }
+      }
+      .runDrain
+  }
+
+  /** Subscribe Flink cluster status change from FlinkK8sObserver */
+  private def subscribeClusterStateChange: UIO[Unit] = {
+
+    def process(substream: UStream[(ClusterKey, (DeployCRStatus, 
Option[JobStatus]))]): UStream[Unit] = {
+      substream
+        // Convert K8s CR status to ClusterState
+        .map { case (key, (crStatus, _)) =>
+          (key.id, k8sDeployStateToClusterState(crStatus), crStatus.error)
+        }
+        // Update the corresponding FlinkCluster record
+        .tap { case (id, state, error) =>
+          safeUpdateFlinkClusterRecord(id)(
+            lambdaUpdate[FlinkCluster]
+              .typedSet(_.getClusterState, state.getValue)
+              .typedSet(error.isDefined, _.getException, error.get))
+        }
+        // Alter for unhealthy state in parallel
+        .filter { case (_, state, _) => alertClusterStateList.contains(state) }
+        .mapZIOPar(5) { case (id, state, _) =>
+          safeGetFlinkClusterRecord(id).someOrUnitZIO { flinkCluster =>
+            ZIO
+              .attemptBlocking(alertService
+                .alert(flinkCluster.getAlertId, AlertTemplate.of(flinkCluster, 
state)))
+              .retryN(5)
+              .tapError(err => logInfo(s"Fail to alter unhealthy state: 
${err.getMessage}"))
+              .ignore @@
+            annotated("FlinkCluster.id" -> id.toString, "state" -> 
state.toString)
+          }
+        }
+    }
+
+    observer.deployCRSnaps
+      .flatSubscribe()
+      // Combine with the corresponding ClusterKey
+      .combineWithTypedTrackKey[ClusterKey]
+      .filterSome
+      // Handle events grouped by id in parallel while each group would be 
executed in serial.
+      .groupByKey(_._1) { case (_, substream) => process(substream) }
+      .runDrain
+  }
+
+  /** Subscribe K8s rest endpoint of Flink Application mode job from 
FlinkK8sObserver. */
+  private def subscribeApplicationJobRestSvcEndpointChange: UIO[Unit] = {
+    observer.restSvcEndpointSnaps
+      .flatSubscribe()
+      // Combine with the corresponding ApplicationJobKey
+      .combineWithTypedTrackKey[ApplicationJobKey]
+      .filterSome
+      .groupByKey(_._1.id) { case (_, substream) =>
+        // Update jobManagerUrl of the corresponding Application record
+        substream.mapZIO { case (key: ApplicationJobKey, endpoint: 
RestSvcEndpoint) =>
+          
safeUpdateApplicationRecord(key.id)(lambdaUpdate[Application].typedSet(_.getJobManagerUrl,
 endpoint.ipRest))
+        }
+      }
+      .runDrain
+  }
+
+  /** Subscribe K8s rest endpoint of Flink cluster from FlinkK8sObserver. */
+  private def subscribeClusterRestSvcEndpointChange: UIO[Unit] = {
+    observer.restSvcEndpointSnaps
+      .flatSubscribe()
+      // Combine with the corresponding ClusterKey
+      .combineWithTypedTrackKey[ClusterKey]
+      .filterSome
+      .groupByKey(_._1) { case (_, substream) =>
+        substream.mapZIO { case (key: ClusterKey, endpoint: RestSvcEndpoint) =>
+          safeUpdateFlinkClusterRecord(key.id)(
+            lambdaUpdate[FlinkCluster]
+              .typedSet(_.getAddress, endpoint.ipRest)
+              .typedSet(_.getJobManagerUrl, endpoint.ipRest))
+        }
+      }
+      .runDrain
+  }
+
+  /** Subscribe Flink cluster metrics change from FlinkK8sObserver and 
aggregate it by teamId */
+  private def subscribeGlobalClusterMetricChange: UIO[Unit] = {
+    observer.clusterMetricsSnaps
+      .subscribe()
+      .mapZIO { metricItems =>
+        for {
+          // Combine with appIds
+          trackKeys         <- FlinkK8sObserver.trackedKeys.toSet
+          metricWithAppIds   = metricItems.map { case ((ns, name), metric) =>
+                                 metric ->
+                                 trackKeys.filter(k => k.clusterNamespace == 
ns && k.clusterName == name).map(_.id)
+                               }
+          // Combine with teamId from persistent application records in 
parallel
+          itemIdWithMetrics <- metricWithAppIds.asZStream
+                                 .flatMapPar(10) { case (metric, appIds) =>
+                                   appIds.asZStream
+                                     .mapZIOParUnordered(10)(appId => 
safeGetApplicationRecord(appId))
+                                     .map(_.flatMap(app => 
Option(app.getTeamId)))
+                                     .filterSome
+                                     .map(teamId => teamId.toLong -> metric)
+                                 }
+                                 .runCollect
+          // Groups ClusterMetrics by teamId and aggregate for each grouping
+          aggMetricsMap      = itemIdWithMetrics
+                                 .groupBy { case (itemId, _) => itemId }
+                                 .map { case (itemId, metric) =>
+                                   itemId -> 
metric.map(_._2).foldLeft(ClusterMetrics.empty)((acc, e) => acc + e)
+                                 }
+          // Update aggFlinkMetric cache
+          _                 <- aggFlinkMetric.set(aggMetricsMap)
+        } yield ()
+      }
+      .runDrain
+  }
+
+  /** Stub method: Get aggregated metrics of all flink jobs on k8s cluster by 
team-id */
+  override def getAggClusterMetric(teamId: lang.Long): ClusterMetrics = {
+    for {
+      metrics <- aggFlinkMetric.get
+      result   = metrics.get(teamId)
+    } yield result.getOrElse(ClusterMetrics.empty)
+  }.runUIO
+
+  override def getAggClusterMetricCV(teamId: lang.Long): FlinkMetricCV = {
+    clusterMetricsToFlinkMetricCV(getAggClusterMetric(teamId))
+  }
+
+  /** Stub method: Add Application to the watchlist. */
+  override def watchApplication(app: Application): Unit = {
+    ZIO
+      .succeed(applicationToTrackKey(app))
+      .someOrUnitZIO { key =>
+        observer.track(key) *>
+        logInfo("Add Application into k8s observer tracking list") @@
+        annotated("jobId" -> app.getId.toString)
+      }
+      .runUIO
+  }
+
+  /** Stub method: Add FlinkCluster to the watchlist. */
+  override def watchFlinkCluster(flinkCluster: FlinkCluster): Unit = {
+    ZIO
+      .succeed(flinkClusterToClusterKey(flinkCluster))
+      .someOrUnitZIO { trackKey =>
+        observer.track(trackKey) *>
+        logInfo("Add FlinkCluster into k8s observer tracking list") @@
+        annotated("id" -> flinkCluster.getId.toString)
+      }
+      .runUIO
+  }
+
+  /**
+   * Stub method: Remove FlinkCluster from the watchlist. When there are 
associated SessionJobs with
+   * FlinkCluster, the tracking of FlinkCluster will not be removed in reality.
+   */
+  override def unwatchFlinkCluster(flinkCluster: FlinkCluster): Unit = {
+    ZIO
+      .succeed(flinkClusterToClusterKey(flinkCluster))
+      .someOrUnitZIO(trackKey => FlinkK8sObserver.untrack(trackKey))
+      .runUIO
+  }
+
+  /** Stub method: Notify FlinkK8sObserver to remove tracking resources by 
TrackKey.id. */
+  override def unWatchById(id: lang.Long): Unit = {
+    observer.trackedKeys
+      .find(_.id == id)
+      .someOrUnitZIO(key => observer.untrack(key))
+  }
+
+  /** Stub method: Whether there are active jobs on the Flink cluster. */
+  override def existActiveJobsOnFlinkCluster(flinkCluster: FlinkCluster): 
Boolean = {
+    if (flinkCluster.getClusterId == null) false
+    else {
+      observer.clusterJobStatusSnaps
+        .get(flinkCluster.getClusterId, 
Option(flinkCluster.getK8sNamespace).getOrElse("default"))
+        .map {
+          case None             => false
+          case Some(statusList) =>
+            statusList.exists(status => 
JobState.activeStates.contains(status.state))
+        }
+        .runUIO
+    }
+  }
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
new file mode 100644
index 000000000..69125acd8
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task
+
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.service.FlinkClusterService
+import 
org.apache.streampark.console.core.service.application.ApplicationInfoService
+import org.apache.streampark.console.core.utils.MybatisScalaExt.LambdaUpdateOps
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
+import zio.{UIO, ZIO}
+import zio.ZIO.logError
+import zio.ZIOAspect.annotated
+
+import scala.jdk.CollectionConverters._
+
+trait FlinkK8sObserverBrokerSidecar {
+
+  def applicationInfoService: ApplicationInfoService
+  def flinkClusterService: FlinkClusterService
+
+  // Get Application record by appId from persistent storage.
+  protected def safeGetApplicationRecord(appId: Long): 
UIO[Option[Application]] = {
+    ZIO
+      .attemptBlocking(Option(applicationInfoService.getById(appId)))
+      .retryN(2)
+      .catchAll(err => logError(s"Fail to get Application record: 
${err.getMessage}").as(None))
+  } @@ annotated("appId" -> appId.toString)
+
+  // Update Application record by appId into persistent storage.
+  protected def safeUpdateApplicationRecord(appId: Long)(update: 
LambdaUpdateWrapper[Application]): UIO[Unit] = {
+    ZIO
+      .attemptBlocking(applicationInfoService.update(null, 
update.typedEq(_.getId, appId)))
+      .retryN(2)
+      .tapError(err => logError(s"Fail to update Application record: 
${err.getMessage}"))
+      .ignore
+  } @@ annotated("appId" -> appId.toString)
+
+  // Get FlinkCluster record by appId from persistent storage.
+  protected def safeGetFlinkClusterRecord(id: Long): UIO[Option[FlinkCluster]] 
= {
+    ZIO
+      .attemptBlocking(Option(flinkClusterService.getById(id)))
+      .retryN(3)
+      .catchAll(err => logError(s"Fail to get FlinkCluster record: 
${err.getMessage}").as(None))
+  } @@ annotated("id" -> id.toString)
+
+  // Update FlinkCluster record by id into persistent storage.
+  protected def safeUpdateFlinkClusterRecord(id: Long)(update: 
LambdaUpdateWrapper[FlinkCluster]): UIO[Unit] = {
+    ZIO
+      .attemptBlocking(flinkClusterService.update(null, 
update.typedEq(_.getId, id)))
+      .retryN(3)
+      .tapError(err => logError(s"Fail to update FlinkCluster record: 
${err.getMessage}"))
+      .ignore
+  } @@ annotated("id" -> id.toString)
+
+  // Find Application record.
+  protected def safeFindApplication(query: 
LambdaQueryWrapper[Application])(retryN: Int): UIO[Vector[Application]] = {
+    ZIO
+      .attemptBlocking {
+        val result = applicationInfoService.getBaseMapper.selectList(query)
+        if (result == null) Vector.empty[Application] else 
result.asScala.toVector
+      }
+      .retryN(retryN)
+      .catchAll { err =>
+        logError(s"Fail to list Application records: 
${err.getMessage}").as(Vector.empty[Application])
+      }
+  }
+
+  // Find Application record.
+  protected def safeFindFlinkClusterRecord(query: 
LambdaQueryWrapper[FlinkCluster])(
+      retryN: Int): UIO[Vector[FlinkCluster]] = {
+    ZIO
+      .attemptBlocking {
+        val result = flinkClusterService.getBaseMapper.selectList(query)
+        if (result == null) Vector.empty[FlinkCluster] else 
result.asScala.toVector
+      }
+      .retryN(retryN)
+      .catchAll { err =>
+        logError(s"Fail to list Application records: 
${err.getMessage}").as(Vector.empty[FlinkCluster])
+      }
+  }
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
new file mode 100644
index 000000000..0813b5373
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.utils
+
+import org.apache.streampark.common.enums.{ClusterState, ExecutionMode}
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.enums.FlinkAppState
+import 
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.genSessionJobCRName
+import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
+import org.apache.streampark.flink.kubernetes.v2.model._
+import 
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
+import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
+
+import org.apache.commons.lang3.StringUtils
+import org.springframework.stereotype.Component
+
+import java.util.UUID
+
+import scala.util.Try
+
+@Component
+class FlinkK8sDataTypeConverter() extends FlinkK8sDataTypeConverterStub {
+  override def genSessionJobK8sCRName(clusterId: String): String = 
genSessionJobCRName(clusterId)
+}
+
+object FlinkK8sDataTypeConverter {
+
+  /** Create default name for Flink SessionJob CR for k8s-native 
compatibility. */
+  def genSessionJobCRName(clusterId: String): String = {
+    s"$clusterId-${UUID.randomUUID().toString.replace("-", "").take(8)}"
+  }
+
+  /** Convert [[EvalJobState]] to [[FlinkAppState]]. */
+  def k8sEvalJobStateToFlinkAppState(jobState: EvalJobState): FlinkAppState = {
+    
Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER)
+  }
+
+  /** Convert [[DeployCRStatus]] to [[ClusterState]]. */
+  def k8sDeployStateToClusterState(crState: DeployCRStatus): ClusterState = {
+    crState.evalState match {
+      case EvalState.DEPLOYING => ClusterState.STARTING
+      case EvalState.READY     => ClusterState.RUNNING
+      case EvalState.SUSPENDED => ClusterState.CANCELED
+      case EvalState.FAILED    => ClusterState.FAILED
+      case EvalState.DELETED   => ClusterState.KILLED
+      case _                   => ClusterState.UNKNOWN
+    }
+  }
+
+  /** Convert [[ClusterMetrics]] to [[FlinkMetricCV]]. */
+  def clusterMetricsToFlinkMetricCV(metrics: ClusterMetrics): FlinkMetricCV = {
+    FlinkMetricCV(
+      totalJmMemory = metrics.totalJmMemory,
+      totalTmMemory = metrics.totalTmMemory,
+      totalTm = metrics.totalTm,
+      totalSlot = metrics.totalTm,
+      availableSlot = metrics.availableSlot,
+      runningJob = metrics.runningJob,
+      finishedJob = metrics.finishedJob,
+      cancelledJob = metrics.cancelledJob,
+      failedJob = metrics.failedJob,
+      pollAckTime = 0L
+    )
+  }
+
+  /** Convert [[FlinkCluster]] to [[ClusterKey]]. */
+  def flinkClusterToClusterKey(flinkCluster: FlinkCluster): Option[ClusterKey] 
= {
+    val isLegal = {
+      flinkCluster != null &&
+      flinkCluster.getId != null &&
+      ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode) &&
+      StringUtils.isNoneBlank(flinkCluster.getClusterId) &&
+      StringUtils.isNoneBlank(flinkCluster.getK8sNamespace)
+    }
+    if (isLegal) Some(ClusterKey(flinkCluster.getId, 
flinkCluster.getK8sNamespace, flinkCluster.getClusterId))
+    else None
+  }
+
+  /** Convert [[Application]] to [[TrackKey]]. */
+  def applicationToTrackKey(app: Application): Option[TrackKey] = {
+    import ExecutionMode._
+
+    val isLegal = {
+      app != null &&
+      app.getId != null &&
+      ExecutionMode.isKubernetesSessionMode(app.getExecutionMode) &&
+      StringUtils.isNoneBlank(app.getClusterId) &&
+      StringUtils.isNoneBlank(app.getK8sNamespace)
+    }
+
+    if (isLegal) None
+    else
+      app.getExecutionModeEnum match {
+        case KUBERNETES_NATIVE_APPLICATION => Some(TrackKey.appJob(app.getId, 
app.getK8sNamespace, app.getClusterId))
+        case KUBERNETES_NATIVE_SESSION     =>
+          Option(app.getK8sName) match {
+            case Some(name) => Some(TrackKey.sessionJob(app.getId, 
app.getK8sNamespace, name, app.getClusterId))
+            case None       =>
+              Option(app.getJobId) match {
+                case Some(jid) =>
+                  Some(TrackKey.unmanagedSessionJob(app.getId, 
app.getK8sNamespace, app.getClusterId, jid))
+                case None      => None
+              }
+          }
+      }
+  }
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
new file mode 100644
index 000000000..08a103b32
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.utils
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction
+
+import scala.jdk.CollectionConverters._
+import scala.language.implicitConversions
+
+/** MyBatis scala extension. */
+object MybatisScalaExt {
+
+  def lambdaQuery[E]: LambdaQueryWrapper[E]   = new LambdaQueryWrapper[E]()
+  def lambdaUpdate[E]: LambdaUpdateWrapper[E] = new LambdaUpdateWrapper[E]()
+
+  // noinspection DuplicatedCode
+  implicit class LambdaQueryOps[E](wrapper: LambdaQueryWrapper[E]) {
+    def typedIn[V](column: E => V, values: Iterable[V]): LambdaQueryWrapper[E] 
= {
+      wrapper.in(new SFunction[E, V] { override def apply(t: E): V = column(t) 
}, values.asJavaCollection)
+    }
+
+    def typedEq[V](column: E => V, value: V): LambdaQueryWrapper[E] = {
+      wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t) 
}, value)
+    }
+  }
+
+  // noinspection DuplicatedCode
+  implicit class LambdaUpdateOps[E](wrapper: LambdaUpdateWrapper[E]) {
+    def typedSet[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
+      wrapper.set((e: E) => column(e), value)
+    }
+
+    def typedSet[V](cond: Boolean, column: E => V, value: V): 
LambdaUpdateWrapper[E] = {
+      wrapper.set(cond, new SFunction[E, V] { override def apply(t: E): V = 
column(t) }, value)
+    }
+
+    def typedEq[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
+      wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t) 
}, value)
+    }
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
index 07832841c..f959b0c32 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
@@ -33,10 +33,10 @@ import java.util.{Map => JMap}
  * The logic of conversion is located at:
  * 
[[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]]
  */
-// todo split into Application mode and SessionJob mode
 case class KubernetesSubmitParam(
     clusterId: String,
     kubernetesNamespace: String,
+    kubernetesName: Option[String],
     baseImage: Option[String] = None,
     imagePullPolicy: Option[String] = None,
     serviceAccount: Option[String] = None,
@@ -67,12 +67,15 @@ object KubernetesSubmitParam {
    */
   def apply(
       clusterId: String,
+      kubernetesName: String,
       kubernetesNamespace: String,
       baseImage: String,
       @Nullable flinkRestExposedType: FlinkK8sRestExposedType): 
KubernetesSubmitParam =
     KubernetesSubmitParam(
       clusterId = clusterId,
       kubernetesNamespace = kubernetesNamespace,
+      kubernetesName = Option(kubernetesName),
       baseImage = Some(baseImage),
-      flinkRestExposedType = Option(flinkRestExposedType))
+      flinkRestExposedType = Option(flinkRestExposedType)
+    )
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index 3da974d7b..bdd1f1f2a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -80,7 +80,10 @@ object FlinkClientHandler {
   def shutdown(request: ShutDownRequest): ShutDownResponse = {
     request.executionMode match {
       case YARN_SESSION => YarnSessionClient.shutdown(request)
-      case KUBERNETES_NATIVE_SESSION => 
KubernetesNativeSessionClient.shutdown(request)
+      case KUBERNETES_NATIVE_SESSION => {
+        if (K8sFlinkConfig.isV2Enabled) 
KubernetesSessionClientV2.shutdown(request)
+        else KubernetesNativeSessionClient.shutdown(request)
+      }
       case _ =>
         throw new UnsupportedOperationException(
           s"Unsupported ${request.executionMode} shutdown cluster ")
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index 130dcd17e..20f8d6aa5 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -39,7 +39,7 @@ import scala.util.{Failure, Success, Try}
 /** Flink K8s application mode task operation client via Flink K8s Operator */
 object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with 
Logger {
 
-  @throws[Exception]
+  @throws[Throwable]
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
@@ -88,9 +88,10 @@ object KubernetesApplicationClientV2 extends 
KubernetesClientV2Trait with Logger
     val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
       .getOrElse("default")
 
-    val name = Option(submitReq.k8sSubmitParam.clusterId)
+    val name = submitReq.k8sSubmitParam.kubernetesName
+      .orElse(Option(submitReq.k8sSubmitParam.clusterId))
       .filter(str => StringUtils.isNotBlank(str))
-      .getOrElse(return Left("cluster-id should not be empty"))
+      .getOrElse(return Left("Kubernetes CR name should not be empty"))
 
     val image = submitReq.k8sSubmitParam.baseImage
       .orElse(Option(buildResult.flinkBaseImage))
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index d5a9f51db..0b0807a55 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -20,22 +20,21 @@ import org.apache.streampark.common.util.Logger
 import org.apache.streampark.common.zio.ZIOExt.IOOps
 import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.v2.model.{FlinkSessionJobDef, 
JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.model.FlinkSessionJobDef
 import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
-import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
+import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration._
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
 
-import scala.collection.mutable
 import scala.jdk.CollectionConverters.mapAsScalaMapConverter
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Success}
 
+/** Flink K8s session mode app operation client via Flink K8s Operator */
 object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {
-  @throws[Exception]
+  @throws[Throwable]
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
@@ -43,7 +42,7 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
     val richMsg: String => String = 
s"[flink-submit][appId=${submitRequest.id}] " + _
 
     submitRequest.checkBuildResult()
-    val buildResult = 
submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse]
+    val buildResult = 
submitRequest.buildResult.asInstanceOf[ShadedBuildResponse]
 
     // Convert to FlinkSessionJobDef CR definition
     val flinkSessionJobDef = genFlinkSessionJobDef(submitRequest, flinkConfig, 
buildResult) match {
@@ -76,82 +75,62 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
   private def genFlinkSessionJobDef(
       submitReq: SubmitRequest,
       originFlinkConfig: Configuration,
-      buildResult: K8sAppModeBuildResponse): Either[FailureMessage, 
FlinkSessionJobDef] = {
+      buildResult: ShadedBuildResponse): Either[FailureMessage, 
FlinkSessionJobDef] = {
 
     val flinkConfObj = originFlinkConfig.clone()
 
     val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
       .getOrElse("default")
 
-    val name = Option(submitReq.k8sSubmitParam.clusterId)
+    val name = submitReq.k8sSubmitParam.kubernetesName
       .filter(str => StringUtils.isNotBlank(str))
-      .getOrElse(return Left("cluster-id should not be empty"))
+      .getOrElse(return Left("Kubernetes CR name should not be empty"))
 
-    val deploymentName = Option(submitReq.developmentMode.name())
+    val deploymentName = Option(submitReq.k8sSubmitParam.clusterId)
       .filter(str => StringUtils.isNotBlank(str))
-      .getOrElse(return Left("deploymentName should not be empty"))
+      .getOrElse(return Left("Target FlinkDeployment CR name should not be 
empty"))
 
-    val jobDef = genJobDef(flinkConfObj, jarUriHint = 
Some(buildResult.mainJarPath))
+    val jobDef = genJobDef(flinkConfObj, jarUriHint = 
Some(buildResult.shadedJarPath))
       .getOrElse(return Left("Invalid job definition"))
 
-    val extraFlinkConfiguration = {
-      // Remove conflicting configuration items
-      val result: mutable.Map[String, String] = flinkConfObj
-        .remove(DeploymentOptions.TARGET)
-        .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
-        .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
-        .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
-        .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
-        .remove(PipelineOptions.JARS)
-        .remove(CoreOptions.DEFAULT_PARALLELISM)
-        .remove(ApplicationConfiguration.APPLICATION_ARGS)
-        .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
-        .remove(SavepointConfigOptions.SAVEPOINT_PATH)
-        .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
-        .toMap
-        .asScala
-        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
-        .removeKey(KUBERNETES_TM_CPU_KEY)
-        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
-        .removeKey(KUBERNETES_JM_CPU_KEY)
-      // Set kubernetes.rest-service.exposed.type configuration for 
compatibility with native-k8s
-      submitReq.k8sSubmitParam.flinkRestExposedType.foreach {
-        exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY -> 
exposedType.getName
-      }
-      result.toMap
-    }
+    // Remove conflicting configuration items
+    val extraFlinkConfiguration = flinkConfObj
+      .remove(DeploymentOptions.TARGET)
+      .remove(PipelineOptions.JARS)
+      .remove(CoreOptions.DEFAULT_PARALLELISM)
+      .remove(ApplicationConfiguration.APPLICATION_ARGS)
+      .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+      .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+      .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+      .toMap
+      .asScala
+      .toMap
 
-    // TODO Migrate the construction logic of ingress to here and set it into 
FlinkDeploymentDef.ingress
-    //  See: 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
 Step-8
     Right(
       FlinkSessionJobDef(
         namespace = namespace,
         name = name,
         deploymentName = deploymentName,
         flinkConfiguration = extraFlinkConfiguration,
-        job = jobDef,
-        restartNonce = None
+        job = jobDef
       ))
   }
 
+  /** Shutdown Flink cluster. */
+  @throws[Throwable]
   def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
-    def richMsg: String => String = 
s"[flink-delete-][appId=${shutDownRequest.clusterId}] " + _
-
-    FlinkK8sOperator.k8sCrOpr
-      .deleteSessionJob(
-        shutDownRequest.kubernetesDeployParam.kubernetesNamespace,
-        shutDownRequest.clusterId)
-      .runIOAsTry match {
-      case Success(rsp) =>
-        logInfo(richMsg("Delete flink job successfully."))
-        rsp
+    val name = shutDownRequest.clusterId
+    val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace
+    def richMsg: String => String = 
s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _
+
+    FlinkK8sOperator.k8sCrOpr.deleteSessionJob(namespace, name).runIOAsTry 
match {
+      case Success(_) =>
+        logInfo(richMsg("Shutdown Flink cluster successfully."))
+        ShutDownResponse()
       case Failure(err) =>
-        logError(
-          richMsg(s"delete flink job fail in 
${shutDownRequest.executionMode.getName}_V2 mode!"),
-          err)
+        logError(richMsg(s"Fail to shutdown Flink cluster"), err)
         throw err
     }
-    ShutDownResponse()
   }
 
 }
diff --git a/streampark-flink/streampark-flink-kubernetes-v2/pom.xml 
b/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
index cb9db9502..50a822401 100644
--- a/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
+++ b/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
@@ -47,7 +47,7 @@
                     <scala>
                         <scalafmt>
                             <version>${spotless.scalafmt.version}</version>
-                            <file>.scalafmt-fp.conf</file>
+                            
<file>${maven.multiModuleProjectDirectory}/.scalafmt-fp.conf</file>
                         </scalafmt>
                     </scala>
                 </configuration>
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
index 7ef4bb7da..326504f4f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
@@ -31,4 +31,24 @@ case class ClusterMetrics(
     runningJob: Integer = 0,
     finishedJob: Integer = 0,
     cancelledJob: Integer = 0,
-    failedJob: Integer = 0)
+    failedJob: Integer = 0) {
+
+  lazy val totalJob: Integer = runningJob + finishedJob + cancelledJob + 
failedJob
+
+  def +(another: ClusterMetrics): ClusterMetrics =
+    ClusterMetrics(
+      totalJmMemory = another.totalJmMemory + totalJmMemory,
+      totalTmMemory = another.totalTmMemory + totalTmMemory,
+      totalTm = another.totalTm + totalTm,
+      totalSlot = another.totalSlot + totalSlot,
+      availableSlot = another.availableSlot + availableSlot,
+      runningJob = another.runningJob + runningJob,
+      finishedJob = another.finishedJob + finishedJob,
+      cancelledJob = another.cancelledJob + cancelledJob,
+      failedJob = another.failedJob + failedJob
+    )
+}
+
+object ClusterMetrics {
+  lazy val empty: ClusterMetrics = ClusterMetrics()
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
index 300be154d..8fc355a8e 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
@@ -32,7 +32,11 @@ object JobState extends Enumeration {
   val UNKNOWN = Value
 
   def valueOf(raw: String): JobState = values.find(_.toString == 
raw).getOrElse(UNKNOWN)
-  val maybeDeploying                 = Set(INITIALIZING, CREATED, RESTARTING, 
RECONCILING)
+
+  val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING, RECONCILING)
+
+  lazy val activeStates =
+    Set(INITIALIZING, CREATED, RUNNING, FAILING, CANCELLING, RESTARTING, 
RECONCILING)
 }
 
 /**
@@ -50,11 +54,6 @@ object EvalJobState extends Enumeration {
   // copy from [[org.apache.streampark.console.core.enums.FlinkAppState]]
   val LOST, TERMINATED, OTHER = Value
 
-  // ending flink states, the tracking monitor will stop tracking these states 
of flink job.
-  val endingStates = Seq(FAILED, CANCELED, FINISHED, TERMINATED, LOST)
-
-  val effectEndStates = endingStates.filter(_ != LOST)
-
   def of(state: JobState): EvalJobState = values.find(e => e.toString == 
state.toString).getOrElse(OTHER)
 
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
index 54e5b04e3..5dce775df 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
@@ -43,7 +43,13 @@ case class JobStatus(
     startTs: Long,
     endTs: Option[Long] = None,
     tasks: Option[TaskStats] = None,
-    updatedTs: Long)
+    updatedTs: Long) {
+
+  lazy val duration: Long = {
+    if (startTs <= 0L) 0L
+    else endTs.getOrElse(System.currentTimeMillis()) - startTs
+  }
+}
 
 object JobStatus {
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
index 7f8496839..299062cee 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
@@ -26,7 +26,7 @@ import org.apache.flink.v1beta1.{FlinkDeployment, 
FlinkDeploymentSpec, FlinkSess
 import zio.{IO, Ref, Schedule, UIO, ZIO}
 import zio.ZIO.logInfo
 import zio.concurrent.{ConcurrentMap, ConcurrentSet}
-import zio.stream.ZStream
+import zio.stream.{UStream, ZStream}
 
 /** Flink Kubernetes resource observer. */
 sealed trait FlinkK8sObserverTrait {
@@ -37,14 +37,6 @@ sealed trait FlinkK8sObserverTrait {
   /** Stop tracking resources. */
   def untrack(key: TrackKey): UIO[Unit]
 
-  /** Stop tracking resources by TrackKey.id. */
-  def untrackById(appId: Long): UIO[Unit] = {
-    trackedKeys.find(_.id == appId).flatMap {
-      case Some(key) => untrack(key)
-      case None      => ZIO.unit
-    }
-  }
-
   /** All tracked key in observer. */
   def trackedKeys: ConcurrentSet[TrackKey]
 
@@ -289,3 +281,48 @@ object FlinkK8sObserver extends FlinkK8sObserverTrait {
   }
 
 }
+
+object FlinkK8sObserverSnapSubscriptionHelper {
+
+  implicit class ClusterMetricsSnapsSubscriptionOps(stream: 
UStream[((Namespace, Name), ClusterMetrics)]) {
+    def combineWithTrackKey: UStream[Option[(TrackKey, ClusterMetrics)]] =
+      combineValueWithTrackKey[ClusterMetrics](stream)
+
+    def combineWithTypedTrackKey[Key <: TrackKey]: UStream[Option[(Key, 
ClusterMetrics)]] =
+      combineValueWithTypedTrackKey[Key, ClusterMetrics](stream)
+  }
+
+  implicit class RestSvcEndpointSnapsSubscriptionOps(stream: 
UStream[((Namespace, Name), RestSvcEndpoint)]) {
+    def combineWithTrackKey: UStream[Option[(TrackKey, RestSvcEndpoint)]] =
+      combineValueWithTrackKey[RestSvcEndpoint](stream)
+
+    def combineWithTypedTrackKey[Key <: TrackKey]: UStream[Option[(Key, 
RestSvcEndpoint)]] =
+      combineValueWithTypedTrackKey[Key, RestSvcEndpoint](stream)
+  }
+
+  implicit class DeployCRSnapsSubscriptionOps(
+      stream: UStream[((Namespace, Name), (DeployCRStatus, 
Option[JobStatus]))]) {
+    def combineWithTrackKey: UStream[Option[(TrackKey, (DeployCRStatus, 
Option[JobStatus]))]] =
+      combineValueWithTrackKey[(DeployCRStatus, Option[JobStatus])](stream)
+
+    def combineWithTypedTrackKey[Key <: TrackKey]: UStream[Option[(Key, 
(DeployCRStatus, Option[JobStatus]))]] =
+      combineValueWithTypedTrackKey[Key, (DeployCRStatus, 
Option[JobStatus])](stream)
+  }
+
+  private[this] def combineValueWithTrackKey[Value](
+      stream: UStream[((Namespace, Name), Value)]): UStream[Option[(TrackKey, 
Value)]] = stream.mapZIO {
+    case ((namespace, name), value) =>
+      FlinkK8sObserver.trackedKeys
+        .find(key => key.clusterNamespace == namespace && key.clusterName == 
name)
+        .map(key => key.map(_ -> value))
+  }
+
+  private[this] def combineValueWithTypedTrackKey[Key <: TrackKey, Value](
+      stream: UStream[((Namespace, Name), Value)]): UStream[Option[(Key, 
Value)]] = stream.mapZIO {
+    case ((namespace, name), value) =>
+      FlinkK8sObserver.trackedKeys
+        .find(key => key.isInstanceOf[Key] && key.clusterNamespace == 
namespace && key.clusterName == name)
+        .map(key => key.map(_.asInstanceOf[Key] -> value))
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
index d4a7e12b7..7045d3bed 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
@@ -108,15 +108,22 @@ case class RawClusterObserver(
           .map(e => Try(e.toInt).getOrElse(0))
           .getOrElse(0)
 
+        val totalTm       = Option(clusterOv.taskManagers).getOrElse(0)
+        val totalSlot     = Option(clusterOv.slotsTotal).getOrElse(0)
+        val availableSlot = Option(clusterOv.slotsAvailable).getOrElse(0)
+        val runningJob    = Option(clusterOv.jobsRunning).getOrElse(0)
+        val cancelledJob  = Option(clusterOv.jobsFinished).getOrElse(0)
+        val failedJob     = Option(clusterOv.jobsFailed).getOrElse(0)
+
         ClusterMetrics(
           totalJmMemory = totalJmMemory,
           totalTmMemory = totalTmMemory,
-          totalTm = clusterOv.taskManagers,
-          totalSlot = clusterOv.slotsTotal,
-          availableSlot = clusterOv.slotsAvailable,
-          runningJob = clusterOv.jobsRunning,
-          cancelledJob = clusterOv.jobsFinished,
-          failedJob = clusterOv.jobsFailed
+          totalTm = totalTm,
+          totalSlot = totalSlot,
+          availableSlot = availableSlot,
+          runningJob = runningJob,
+          cancelledJob = cancelledJob,
+          failedJob = failedJob
         )
       }
       .tap(metrics => clusterMetricsSnaps.put((namespace, name), metrics))
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index 9d832e676..5225905f4 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -28,9 +28,6 @@ object OprError {
   case class FlinkRestEndpointNotFound(namespace: String, name: String)
     extends Exception(s"Flink cluster rest endpoint not found: 
namespace=$namespace, name=$name")
 
-  case class TrackKeyNotFound(namespace: String, name: String)
-    extends Exception(s"TrackKey not found: namespace=$namespace, name=$name")
-
   case class FlinkDeploymentCRDNotFound()
     extends Exception("The FlinkDeployment CRD is not currently deployed in 
the kubernetes cluster")
 }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
index 6f59f6f02..ba9b9c6bc 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
@@ -52,7 +52,8 @@ class FlinkK8sApplicationBuildPipelineV2(request: 
FlinkK8sApplicationBuildReques
         val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
         val extJarLibs = request.developmentMode match {
           case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
-          case DevelopmentMode.CUSTOM_CODE => Set[String]()
+          case DevelopmentMode.CUSTOM_CODE => Set.empty[String]
+          case _ => Set.empty[String]
         }
         val shadedJar =
           MavenTool.buildFatJar(request.mainClass, request.providedLibs, 
shadedJarOutputPath)

Reply via email to