This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 11c7fa837 [Feat][Flink-K8s-V2] Refactor the lifecycle control of Flink 
K8s application-mode jobs (#3037)
11c7fa837 is described below

commit 11c7fa8377398b09e052c046c16d209974e2fce0
Author: Linying Assad <[email protected]>
AuthorDate: Sat Sep 9 22:22:01 2023 -0500

    [Feat][Flink-K8s-V2] Refactor the lifecycle control of Flink K8s 
application-mode jobs (#3037)
    
    * [Feat] Refactor Flink resource build pipeline for fink-k8s-v2 module. 
#2882
    
    * [Feat] Adaptation of the submission client for flink-k8s-v2. #2882
    
    * [Feat][flink-k8s-v2] Migrate ENABLE_V2 to streampark-common module. #2882
    
    * [Feat][flink-k8s-v2] Disable Flink job tracking watcher at flink-k8s-v1. 
#2882
    
    * [Feat][flink-k8s-v2] Adaptation of Flink job canceling and triggering 
savepoint operations . #2882
    
    * [Feat][flink-k8s-v2] Untrack flink when it is deleted. #2882
---
 .../streampark/common/conf/K8sFlinkConfig.scala    |  20 +-
 .../org/apache/streampark/common/zio/ZIOExt.scala  |  22 +-
 .../apache/streampark/common/zio/ZIOJavaUtil.scala |  25 +--
 .../impl/ApplicationActionServiceImpl.java         |   6 +-
 .../impl/ApplicationManageServiceImpl.java         |   7 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |   8 +-
 .../core/service/impl/SavePointServiceImpl.java    |   1 +
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  15 +-
 .../flink/client/bean/CancelRequest.scala          |   1 +
 .../flink/client/bean/KubernetesSubmitParam.scala  |  78 +++++++
 .../flink/client/bean/SubmitRequest.scala          |  13 --
 .../client/bean/TriggerSavepointRequest.scala      |   1 +
 .../flink/client/FlinkClientHandler.scala          |   6 +-
 .../impl/KubernetesApplicationClientV2.scala       | 238 +++++++++++++++++++++
 .../impl/KubernetesNativeApplicationClientV2.scala | 121 -----------
 .../flink/client/trait/FlinkClientTrait.scala      |   7 +
 .../client/trait/KubernetesClientV2Trait.scala     | 178 +++++++++++++++
 .../client/trait/KubernetesNativeClientTrait.scala |   2 +-
 .../kubernetes/v2/model/FlinkDeploymentDef.scala   |   7 +-
 .../kubernetes/v2/observer/FlinkK8sObserver.scala  |  14 +-
 .../kubernetes/v2/operator/FlinkK8sOperator.scala  |   4 +-
 .../flink/kubernetes/DefaultFlinkK8sWatcher.scala  |   7 +-
 .../kubernetes/FlinkK8sWatcherLazyStartAop.scala   |  19 +-
 streampark-flink/streampark-flink-packer/pom.xml   |   6 +
 .../flink/packer/pipeline/PipelineType.java        |  13 +-
 .../flink/packer/pipeline/BuildResponse.scala      |  16 ++
 .../impl/FlinkK8sApplicationBuildPipelineV2.scala  |  77 +++++++
 27 files changed, 728 insertions(+), 184 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index 7313bf201..f5a20923a 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -18,9 +18,22 @@
 package org.apache.streampark.common.conf
 
 /** Flink kubernetes Configuration for v1 version */
-@deprecated("see: org.apache.streampark.flink.kubernetes.v2.Config")
+
 object K8sFlinkConfig {
 
+  lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2)
+
+  val ENABLE_V2: InternalOption = InternalOption(
+    key = "streampark.flink-k8s.enable-v2",
+    defaultValue = false,
+    classType = classOf[Boolean],
+    description =
+      "Whether to enable the v2 version(base on flink-kubernetes-operator) of 
flink kubernetes operation"
+  )
+
+  //  ======= deprecated =======
+
+  @deprecated
   val jobStatusTrackTaskTimeoutSec: InternalOption = InternalOption(
     key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.job-status",
     defaultValue = 120L,
@@ -28,6 +41,7 @@ object K8sFlinkConfig {
     description = "run timeout seconds of single flink-k8s metrics tracking 
task"
   )
 
+  @deprecated
   val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
     key = 
"streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
     defaultValue = 120L,
@@ -35,6 +49,7 @@ object K8sFlinkConfig {
     description = "run timeout seconds of single flink-k8s job status tracking 
task"
   )
 
+  @deprecated
   val jobStatueTrackTaskIntervalSec: InternalOption = InternalOption(
     key = "streampark.flink-k8s.tracking.polling-interval-sec.job-status",
     defaultValue = 5L,
@@ -42,6 +57,7 @@ object K8sFlinkConfig {
     description = "interval seconds between two single flink-k8s metrics 
tracking task"
   )
 
+  @deprecated
   val metricTrackTaskIntervalSec: InternalOption = InternalOption(
     key = "streampark.flink-k8s.tracking.polling-interval-sec.cluster-metric",
     defaultValue = 5L,
@@ -49,6 +65,7 @@ object K8sFlinkConfig {
     description = "interval seconds between two single flink-k8s metrics 
tracking task"
   )
 
+  @deprecated
   val silentStateJobKeepTrackingSec: InternalOption = InternalOption(
     key = "streampark.flink-k8s.tracking.silent-state-keep-sec",
     defaultValue = 60,
@@ -69,6 +86,7 @@ object K8sFlinkConfig {
   )
 
   /** kubernetes default namespace */
+  @deprecated
   val DEFAULT_KUBERNETES_NAMESPACE = "default"
 
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
index 6fb546c65..7baa195d3 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
@@ -17,14 +17,16 @@
 
 package org.apache.streampark.common.zio
 
-import zio.{IO, Runtime, Unsafe, ZIO}
+import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO}
 import zio.stream.ZStream
 
+import scala.util.Try
+
 /** ZIO extension */
 object ZIOExt {
 
   /* Unsafe run zio effect. */
-  @throws[Exception]
+  @throws[FiberFailure]
   @inline def unsafeRun[E, A](zio: IO[E, A]): A = Unsafe.unsafe {
     implicit u =>
       Runtime.default.unsafe
@@ -32,11 +34,25 @@ object ZIOExt {
         .getOrThrowFiberFailure()
   }
 
+  /** unsafe run IO to Either. */
+  @inline def unsafeRunToEither[E, A](zio: IO[E, A]): Either[Throwable, A] = 
Unsafe.unsafe {
+    implicit u =>
+      Runtime.default.unsafe
+        .run(zio.provideLayer(Runtime.removeDefaultLoggers >>> 
ZIOLogger.default))
+        .toEither
+  }
+
   implicit class IOOps[E, A](io: ZIO[Any, E, A]) {
 
     /** unsafe run IO */
-    @throws[Throwable]
+    @throws[FiberFailure]
     def runIO: A = ZIOExt.unsafeRun(io)
+
+    /** unsafe run IO to Try. */
+    def runIOAsTry: Try[A] = unsafeRunToEither(io).toTry
+
+    /** unsafe run IO to Either. */
+    def runIOAsEither: Either[Throwable, A] = unsafeRunToEither(io)
   }
 
   implicit class UIOOps[A](uio: ZIO[Any, Nothing, A]) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala
similarity index 55%
copy from 
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
copy to 
streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala
index 035c5348d..d7260b5d5 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala
@@ -15,23 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.client.bean
+package org.apache.streampark.common.zio
 
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
-import org.apache.streampark.common.enums.ExecutionMode
+import zio.{FiberFailure, IO, UIO}
 
-import javax.annotation.Nullable
+/** Util for running ZIO effects in Java. */
+object ZIOJavaUtil {
 
-import java.util.{Map => JavaMap}
+  @throws[FiberFailure]
+  def runIO[E, A](zio: IO[E, A]): A = ZIOExt.unsafeRun(zio)
 
-/** Trigger savepoint request. */
-case class TriggerSavepointRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    jobId: String,
-    savepointPath: String,
-    nativeFormat: Boolean,
-    override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
-  extends SavepointRequestTrait
+  def runUIO[A](uio: UIO[A]): A = ZIOExt.unsafeRun(uio)
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index ff69ed84b..4c836469f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -294,6 +294,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     CancelRequest cancelRequest =
         new CancelRequest(
+            application.getId(),
             flinkEnv.getFlinkVersion(),
             ExecutionMode.of(application.getExecutionMode()),
             properties,
@@ -413,10 +414,13 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
+    // TODO Need to display more K8s submission parameters in the front-end UI.
+    //      See: org.apache.streampark.flink.client.bean.KubernetesSubmitParam
     KubernetesSubmitParam kubernetesSubmitParam =
-        new KubernetesSubmitParam(
+        KubernetesSubmitParam.apply(
             application.getClusterId(),
             application.getK8sNamespace(),
+            application.getFlinkImage(),
             application.getK8sRestExposedTypeEnum());
 
     Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, 
application);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index eeabbaad8..ea8fc61c1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -17,11 +17,13 @@
 
 package org.apache.streampark.console.core.service.application.impl;
 
+import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.zio.ZIOJavaUtil;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -54,6 +56,7 @@ import 
org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 
 import org.apache.commons.lang3.StringUtils;
@@ -176,9 +179,11 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     // 8) remove app
     removeApp(application);
-
     if (isKubernetesApp(application)) {
       k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+      if (K8sFlinkConfig.isV2Enabled()) {
+        ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId()));
+      }
     } else {
       FlinkHttpWatcher.unWatching(appParam.getId());
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 7bbbb5afe..7260fc72f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.DevelopmentMode;
@@ -78,6 +79,7 @@ import 
org.apache.streampark.flink.packer.pipeline.PipeWatcher;
 import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
 import org.apache.streampark.flink.packer.pipeline.PipelineType;
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline;
+import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipelineV2;
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipeline;
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
 import 
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
@@ -511,7 +513,11 @@ public class AppBuildPipeServiceImpl
                     dockerConfig.getPassword()),
                 app.getIngressTemplate());
         log.info("Submit params to building pipeline : {}", 
k8sApplicationBuildRequest);
-        return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
+        if (K8sFlinkConfig.isV2Enabled()) {
+          return 
FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
+        } else {
+          return 
FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
+        }
       default:
         throw new UnsupportedOperationException(
             "Unsupported Building Application for ExecutionMode: " + 
app.getExecutionModeEnum());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 20f7c31f4..c85d0f1d4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -493,6 +493,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     Map<String, Object> properties = this.tryGetRestProps(application, 
cluster);
 
     return new TriggerSavepointRequest(
+        application.getId(),
         flinkEnv.getFlinkVersion(),
         application.getExecutionModeEnum(),
         properties,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index c385764ef..32f7a2391 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.task;
 
+import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.core.entity.Application;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
@@ -82,11 +83,15 @@ public class FlinkK8sWatcherWrapper {
   }
 
   private void initFlinkK8sWatcher(@Nonnull FlinkK8sWatcher trackMonitor) {
-    // register change event listener
-    trackMonitor.registerListener(flinkK8sChangeEventListener);
-    // recovery tracking list
-    List<TrackId> k8sApp = getK8sWatchingApps();
-    k8sApp.forEach(trackMonitor::doWatching);
+    if (!K8sFlinkConfig.isV2Enabled()) {
+      // register change event listener
+      trackMonitor.registerListener(flinkK8sChangeEventListener);
+      // recovery tracking list
+      List<TrackId> k8sApp = getK8sWatchingApps();
+      k8sApp.forEach(trackMonitor::doWatching);
+    } else {
+      // TODO [flink-k8s-v2] Recovery tracking list and invoke 
FlinkK8sObserver.track()
+    }
   }
 
   /** get flink-k8s job tracking application from db. */
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index dd5481dc4..f06db3d9d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -25,6 +25,7 @@ import javax.annotation.Nullable
 import java.util.{Map => JavaMap}
 
 case class CancelRequest(
+    id: Long,
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
     @Nullable properties: JavaMap[String, Any],
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
new file mode 100644
index 000000000..07832841c
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.bean
+
+import org.apache.streampark.common.enums.FlinkK8sRestExposedType
+
+import javax.annotation.Nullable
+
+import java.util
+import java.util.{Map => JMap}
+
+/**
+ * TODO Need to display more K8s submission parameters in the front-end UI.
+ *
+ * It will eventually be converted to
+ * [[org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef]]
+ *
+ * The logic of conversion is located at:
+ * 
[[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]]
+ */
+// todo split into Application mode and SessionJob mode
+case class KubernetesSubmitParam(
+    clusterId: String,
+    kubernetesNamespace: String,
+    baseImage: Option[String] = None,
+    imagePullPolicy: Option[String] = None,
+    serviceAccount: Option[String] = None,
+    podTemplate: Option[String] = None,
+    jobManagerCpu: Option[Double] = None,
+    jobManagerMemory: Option[String] = None,
+    jobManagerEphemeralStorage: Option[String] = None,
+    jobManagerPodTemplate: Option[String] = None,
+    taskManagerCpu: Option[Double] = None,
+    taskManagerMemory: Option[String] = None,
+    taskManagerEphemeralStorage: Option[String] = None,
+    taskManagerPodTemplate: Option[String] = None,
+    logConfiguration: JMap[String, String] = new util.HashMap[String, 
String](),
+    flinkRestExposedType: Option[FlinkK8sRestExposedType] = None
+)
+
+object KubernetesSubmitParam {
+
+  /**
+   * Compatible with streampark old native k8s submission parameters.
+   *
+   * @param clusterId
+   *   flink cluster id in k8s cluster.
+   * @param kubernetesNamespace
+   *   k8s namespace.
+   * @param flinkRestExposedType
+   *   flink rest-service exposed type on k8s cluster.
+   */
+  def apply(
+      clusterId: String,
+      kubernetesNamespace: String,
+      baseImage: String,
+      @Nullable flinkRestExposedType: FlinkK8sRestExposedType): 
KubernetesSubmitParam =
+    KubernetesSubmitParam(
+      clusterId = clusterId,
+      kubernetesNamespace = kubernetesNamespace,
+      baseImage = Some(baseImage),
+      flinkRestExposedType = Option(flinkRestExposedType))
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index d06c036c1..4989abaf8 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -36,19 +36,6 @@ import java.util.{Map => JavaMap}
 import scala.collection.convert.ImplicitConversions._
 import scala.util.Try
 
-/**
- * @param clusterId
- *   flink cluster id in k8s cluster.
- * @param kubernetesNamespace
- *   k8s namespace.
- * @param flinkRestExposedType
- *   flink rest-service exposed type on k8s cluster.
- */
-case class KubernetesSubmitParam(
-    clusterId: String,
-    kubernetesNamespace: String,
-    @Nullable flinkRestExposedType: FlinkK8sRestExposedType)
-
 case class SubmitRequest(
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 035c5348d..85c788da7 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -26,6 +26,7 @@ import java.util.{Map => JavaMap}
 
 /** Trigger savepoint request. */
 case class TriggerSavepointRequest(
+    id: Long,
     flinkVersion: FlinkVersion,
     executionMode: ExecutionMode,
     @Nullable properties: JavaMap[String, Any],
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index e85ceb6ee..810df34e3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.client
 
+import org.apache.streampark.common.conf.K8sFlinkConfig
 import org.apache.streampark.common.enums.ExecutionMode
 import org.apache.streampark.common.enums.ExecutionMode._
 import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
@@ -32,7 +33,10 @@ object FlinkClientHandler {
     YARN_SESSION -> YarnSessionClient,
     YARN_PER_JOB -> YarnPerJobClient,
     KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient,
-    KUBERNETES_NATIVE_APPLICATION -> KubernetesNativeApplicationClient
+    KUBERNETES_NATIVE_APPLICATION -> {
+      if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2
+      else KubernetesNativeApplicationClient
+    }
   )
 
   def submit(request: SubmitRequest): SubmitResponse = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
new file mode 100644
index 000000000..665159c03
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
+
+import org.apache.flink.client.deployment.application.ApplicationConfiguration
+import org.apache.flink.configuration._
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.mapAsScalaMapConverter
+import scala.util.{Failure, Success, Try}
+
+/** Flink K8s application mode task operation client via Flink K8s Operator */
+object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with 
Logger {
+
+  @throws[Exception]
+  override def doSubmit(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): SubmitResponse = {
+
+    val richMsg: String => String = 
s"[flink-submit][appId=${submitRequest.id}] " + _
+
+    submitRequest.checkBuildResult()
+    val buildResult = 
submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse]
+
+    // Convert to FlinkDeployment CR definition
+    val flinkDeployDef = genFlinkDeployDef(submitRequest, flinkConfig, 
buildResult) match {
+      case Right(result) => result
+      case Left(errMsg) =>
+        throw new IllegalArgumentException(
+          richMsg(s"Error occurred while parsing parameters: $errMsg"))
+    }
+
+    // Submit FlinkDeployment CR to Kubernetes
+    FlinkK8sOperator.deployApplicationJob(submitRequest.id, 
flinkDeployDef).runIOAsTry match {
+      case Success(_) =>
+        logInfo(richMsg("Flink job has been submitted successfully."))
+      case Failure(err) =>
+        logError(
+          richMsg(s"Submit Flink job fail in 
${submitRequest.executionMode.getName}_V2 mode!"),
+          err)
+        throw err
+    }
+
+    SubmitResponse(
+      clusterId = submitRequest.k8sSubmitParam.clusterId,
+      flinkConfig = flinkConfig.toMap,
+      jobId = submitRequest.jobId,
+      jobManagerUrl = null
+    )
+  }
+
+  // Generate FlinkDeployment CR definition, it is a pure effect function.
+  private def genFlinkDeployDef(
+      submitReq: SubmitRequest,
+      originFlinkConfig: Configuration,
+      buildResult: K8sAppModeBuildResponse): Either[FailureMessage, 
FlinkDeploymentDef] = {
+
+    val flinkConfObj = originFlinkConfig.clone()
+    val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
+
+    val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
+      .getOrElse("default")
+
+    val name = Option(submitReq.k8sSubmitParam.clusterId)
+      .filter(!_.isBlank)
+      .getOrElse(return Left("cluster-id should not be empty"))
+
+    val image = submitReq.k8sSubmitParam.baseImage
+      .orElse(Option(buildResult.flinkBaseImage))
+      .filter(!_.isBlank)
+      .getOrElse(return Left("Flink base image should not be empty"))
+
+    val imagePullPolicy = flinkConfObj
+      .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+      .map(_.toString)
+      .orElse(submitReq.k8sSubmitParam.imagePullPolicy)
+
+    val serviceAccount = flinkConfObj
+      .getOption(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+      .orElse(submitReq.k8sSubmitParam.serviceAccount)
+      .getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT)
+
+    val flinkVersion = Option(submitReq.flinkVersion.majorVersion)
+      .map(majorVer => "V" + majorVer.replace(".", "_"))
+      .flatMap(v => FlinkVersion.values().find(_.name() == v))
+      .getOrElse(return Left(s"Unsupported Flink version: 
${submitReq.flinkVersion.majorVersion}"))
+
+    val jobDef = genJobDef(flinkConfObj, jarUriHint = 
Some(buildResult.mainJarPath))
+      .getOrElse(return Left("Invalid job definition"))
+
+    val podTemplate = submitReq.k8sSubmitParam.podTemplate.map(
+      yaml =>
+        unmarshalPodTemplate(yaml)
+          .getOrElse(return Left(s"Invalid pod template: \n$yaml")))
+
+    val jobManager = {
+      val cpu = flinkConfMap
+        .get(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .orElse(flinkConfMap.get(KUBERNETES_JM_CPU_KEY))
+        .flatMap(value => Try(value.toDouble).toOption)
+        .orElse(submitReq.k8sSubmitParam.jobManagerCpu)
+        .getOrElse(KUBERNETES_JM_CPU_DEFAULT)
+
+      val mem = flinkConfObj
+        .getOption(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+        .map(_.toString)
+        .orElse(submitReq.k8sSubmitParam.jobManagerMemory)
+        .getOrElse(KUBERNETES_JM_MEMORY_DEFAULT)
+
+      val podTemplate = submitReq.k8sSubmitParam.jobManagerPodTemplate.map(
+        yaml =>
+          unmarshalPodTemplate(yaml)
+            .getOrElse(return Left(s"Invalid job manager pod template: 
\n$yaml")))
+      JobManagerDef(
+        cpu = cpu,
+        memory = mem,
+        ephemeralStorage = submitReq.k8sSubmitParam.jobManagerEphemeralStorage,
+        podTemplate = podTemplate)
+    }
+
+    val taskManager = {
+      val cpu = flinkConfMap
+        .get(KUBERNETES_TM_CPU_AMOUNT_KEY)
+        .orElse(flinkConfMap.get(KUBERNETES_TM_CPU_KEY))
+        .flatMap(value => Try(value.toDouble).toOption)
+        .orElse(submitReq.k8sSubmitParam.taskManagerCpu)
+        .getOrElse(KUBERNETES_TM_CPU_DEFAULT)
+
+      val mem = flinkConfObj
+        .getOption(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+        .map(_.toString)
+        .orElse(submitReq.k8sSubmitParam.taskManagerMemory)
+        .getOrElse(KUBERNETES_TM_MEMORY_DEFAULT)
+
+      val podTemplate = submitReq.k8sSubmitParam.taskManagerPodTemplate.map(
+        yaml =>
+          unmarshalPodTemplate(yaml)
+            .getOrElse(return Left(s"Invalid task manager pod template: 
\n$yaml")))
+      TaskManagerDef(
+        cpu = cpu,
+        memory = mem,
+        ephemeralStorage = 
submitReq.k8sSubmitParam.taskManagerEphemeralStorage,
+        podTemplate = podTemplate)
+    }
+
+    val logConfiguration = {
+      val items = submitReq.k8sSubmitParam.logConfiguration.asScala
+      if (items.isEmpty) {
+        // Get default log config from local target flink home
+        val logConfigs = Array(
+          "log4j.properties" -> 
s"${submitReq.flinkVersion.flinkHome}/conf/log4j-console.properties",
+          "logback.xml" -> 
s"${submitReq.flinkVersion.flinkHome}/conf/logback-console.xml"
+        )
+        logConfigs
+          .map { case (name, path) => name -> os.Path(path) }
+          .filter { case (_, path) => Try(os.exists(path) && 
os.isFile(path)).getOrElse(false) }
+          .map { case (name, path) => name -> 
Try(os.read(path)).toOption.filter(!_.isBlank) }
+          .filter { case (_, content) => content.isDefined }
+          .map { case (name, content) => name -> content.get }
+          .foreach { case (name, content) => items += name -> content }
+      }
+      items.toMap
+    }
+
+    val extraFlinkConfiguration = {
+      // Remove conflicting configuration items
+      val result: mutable.Map[String, String] = flinkConfObj
+        .remove(DeploymentOptions.TARGET)
+        .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+        .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+        .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+        .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+        .remove(PipelineOptions.JARS)
+        .remove(CoreOptions.DEFAULT_PARALLELISM)
+        .remove(ApplicationConfiguration.APPLICATION_ARGS)
+        .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+        .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+        .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+        .toMap
+        .asScala
+        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .removeKey(KUBERNETES_TM_CPU_KEY)
+        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .removeKey(KUBERNETES_JM_CPU_KEY)
+      // Set kubernetes.rest-service.exposed.type configuration for 
compatibility with native-k8s
+      submitReq.k8sSubmitParam.flinkRestExposedType.foreach {
+        exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY -> 
exposedType.getName
+      }
+      result.toMap
+    }
+
+    // TODO Migrate the construction logic of ingress to here and set it into 
FlinkDeploymentDef.ingress
+    //  See: 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
 Step-8
+    Right(
+      FlinkDeploymentDef(
+        namespace = namespace,
+        name = name,
+        image = image,
+        imagePullPolicy = imagePullPolicy,
+        serviceAccount = serviceAccount,
+        flinkVersion = flinkVersion,
+        jobManager = jobManager,
+        taskManager = taskManager,
+        flinkConfiguration = extraFlinkConfiguration,
+        logConfiguration = logConfiguration,
+        podTemplate = podTemplate,
+        job = Some(jobDef),
+        extJarPaths = Array.empty
+      ))
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
deleted file mode 100644
index f43e88f0b..000000000
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.client.impl
-
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
-import org.apache.streampark.common.zio.ZIOExt.IOOps
-import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
-import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
JobDef, JobManagerDef, TaskManagerDef}
-import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
-import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
-import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
-
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-
-object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait 
{
-  @throws[Exception]
-  override def doSubmit(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration): SubmitResponse = {
-
-    // require parameters
-    require(
-      StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
-      s"[flink-submit] submit flink job failed, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
-    )
-
-    // check the last building result
-    submitRequest.checkBuildResult()
-
-    try {
-      val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, 
flinkConfig)
-      FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
-      val result = SubmitResponse(null, flinkConfig.toMap, 
submitRequest.jobId, null)
-      logInfo(
-        s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}")
-      result
-    } catch {
-      case e: Exception =>
-        logError(s"submit flink job fail in ${submitRequest.executionMode} 
mode")
-        throw e
-    } finally {}
-  }
-
-  override def doCancel(
-      cancelRequest: CancelRequest,
-      flinkConfig: Configuration): CancelResponse = {
-    flinkConfig.safeSet(
-      DeploymentOptions.TARGET,
-      ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    super.doCancel(cancelRequest, flinkConfig)
-  }
-
-  override def doTriggerSavepoint(
-      request: TriggerSavepointRequest,
-      flinkConf: Configuration): SavepointResponse = {
-    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
-    super.doTriggerSavepoint(request, flinkConf)
-  }
-
-  private[this] def convertFlinkDeploymentDef(
-      submitRequest: SubmitRequest,
-      flinkConfig: Configuration): FlinkDeploymentDef = {
-    val spec = FlinkDeploymentDef(
-      name = submitRequest.appName,
-      namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
-      image = 
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag,
-      flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
-        .map(_.replace(".", "_"))
-        .map("V" + _)
-        .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
-        case Some(version) => version
-        case None => throw new IllegalArgumentException("Flink version not 
found")
-      },
-      jobManager = JobManagerDef(
-        cpu = 1,
-        memory = 
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
-      taskManager = TaskManagerDef(
-        cpu = 1,
-        memory = 
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
-      job = Option(
-        JobDef(
-          jarURI =
-            
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
-          parallelism = 1,
-          args = 
Array(flinkConfig.toMap.get("$internal.application.program-args")),
-          entryClass = Some(submitRequest.appMain),
-          initialSavepointPath = Some(submitRequest.savePoint),
-          allowNonRestoredState = Some(submitRequest.allowNonRestoredState)
-        )),
-      extJarPaths = submitRequest.userJarFile match {
-        case null => Array.empty[String]
-        case file => Array(file.getAbsolutePath)
-      },
-      flinkConfiguration = submitRequest.extraParameter match {
-        case null => Map.empty
-        case e => e.asScala.map { case (key, value) => key -> value.toString 
}.toMap
-      }
-    )
-    spec
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 59aa46cc3..e6b16e377 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -543,6 +543,13 @@ trait FlinkClientTrait extends Logger {
         case x => x
       }
     }
+    def getOption[T](key: ConfigOption[T]): Option[T] = {
+      Option(flinkConfig.get(key))
+    }
+    def remove[T](key: ConfigOption[T]): Configuration = {
+      flinkConfig.removeConfig(key)
+      flinkConfig
+    }
   }
 
   private[client] def cancelJob(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
new file mode 100644
index 000000000..75b979246
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.`trait`
+
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import 
org.apache.streampark.flink.client.`trait`.KubernetesClientV2.{StopJobFail, 
TriggerJobSavepointFail}
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{JobDef, 
JobSavepointDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.kubernetes.v2.yamlMapper
+
+import io.fabric8.kubernetes.api.model.Pod
+import org.apache.flink.client.deployment.application.ApplicationConfiguration
+import org.apache.flink.configuration.{Configuration, CoreOptions, 
PipelineOptions}
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import zio.ZIO
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+import scala.util.{Failure, Success, Try}
+
+trait KubernetesClientV2Trait extends FlinkClientTrait {
+
+  protected type FailureMessage = String
+
+  protected val KUBERNETES_JM_CPU_KEY = "kubernetes.jobmanager.cpu"
+  protected val KUBERNETES_JM_CPU_AMOUNT_KEY = 
"kubernetes.jobmanager.cpu.amount"
+  protected val KUBERNETES_JM_CPU_DEFAULT = 1.0
+  protected val KUBERNETES_JM_MEMORY_DEFAULT = "1600m"
+
+  protected val KUBERNETES_TM_CPU_KEY = "kubernetes.taskmanager.cpu"
+  protected val KUBERNETES_TM_CPU_AMOUNT_KEY = 
"kubernetes.taskmanager.cpu.amount"
+  protected val KUBERNETES_TM_CPU_DEFAULT = -1.0
+  protected val KUBERNETES_TM_MEMORY_DEFAULT = "1728m"
+
+  protected val KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY = 
"kubernetes.rest-service.exposed.type"
+
+  override def setConfig(submitRequest: SubmitRequest, flinkConf: 
Configuration): Unit = {}
+
+  implicit protected class FlinkConfMapOps(map: mutable.Map[String, String]) {
+    def removeKey(key: String): mutable.Map[String, String] = { map -= key; 
map }
+  }
+
+  protected def unmarshalPodTemplate(yaml: String): Try[Pod] = {
+    Try(yamlMapper.readValue(yaml, classOf[Pod]))
+  }
+
+  protected def genJobDef(
+      flinkConfObj: Configuration,
+      jarUriHint: Option[String]): Either[FailureMessage, JobDef] = {
+
+    val jarUri = jarUriHint
+      
.orElse(flinkConfObj.getOption(PipelineOptions.JARS).flatMap(_.asScala.headOption))
+      .getOrElse(return Left("Flink job uri should not be empty"))
+
+    val parallel = flinkConfObj
+      .getOption(CoreOptions.DEFAULT_PARALLELISM)
+      .getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue())
+
+    val args = flinkConfObj
+      .getOption(ApplicationConfiguration.APPLICATION_ARGS)
+      .map(_.asScala.toArray)
+      .getOrElse(Array.empty[String])
+
+    val entryClass = flinkConfObj
+      .getOption(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+
+    val savePointPath = flinkConfObj
+      .getOption(SavepointConfigOptions.SAVEPOINT_PATH)
+
+    val allowNonRestoredState = flinkConfObj
+      .getOption(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+      .map(_.booleanValue())
+
+    Right(
+      JobDef(
+        jarURI = jarUri,
+        parallelism = parallel,
+        entryClass = entryClass,
+        args = args,
+        initialSavepointPath = savePointPath,
+        allowNonRestoredState = allowNonRestoredState
+      ))
+  }
+
+  @throws[Exception]
+  override def doCancel(request: CancelRequest, flinkConf: Configuration): 
CancelResponse = {
+    val effect =
+      if (!request.withSavepoint) {
+        // cancel job
+        FlinkK8sOperator
+          .cancelJob(request.id)
+          .as(CancelResponse(null))
+      } else {
+        // stop job with savepoint
+        val savepointDef = JobSavepointDef(
+          drain = Option(request.withDrain).getOrElse(false),
+          savepointPath = Option(request.savepointPath),
+          formatType = Option(request.nativeFormat)
+            .map(if (_) JobSavepointDef.NATIVE_FORMAT else 
JobSavepointDef.CANONICAL_FORMAT)
+        )
+        FlinkK8sOperator
+          .stopJob(request.id, savepointDef)
+          .flatMap {
+            result =>
+              if (result.isFailed) 
ZIO.fail(StopJobFail(result.failureCause.get))
+              else ZIO.succeed(CancelResponse(result.location.orNull))
+          }
+      }
+
+    def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _
+
+    effect.runIOAsTry match {
+      case Success(rsp) =>
+        logInfo(richMsg("Cancel flink job successfully."))
+        rsp
+      case Failure(err) =>
+        logError(
+          richMsg(s"Cancel flink job fail in 
${request.executionMode.getName}_V2 mode!"),
+          err)
+        throw err
+    }
+  }
+
+  @throws[Exception]
+  override def doTriggerSavepoint(
+      request: TriggerSavepointRequest,
+      flinkConf: Configuration): SavepointResponse = {
+
+    val savepointDef = JobSavepointDef(
+      savepointPath = Option(request.savepointPath),
+      formatType = Option(request.nativeFormat)
+        .map(if (_) JobSavepointDef.NATIVE_FORMAT else 
JobSavepointDef.CANONICAL_FORMAT)
+    )
+
+    def richMsg: String => String = 
s"[flink-trigger-savepoint][appId=${request.id}] " + _
+
+    FlinkK8sOperator
+      .triggerJobSavepoint(request.id, savepointDef)
+      .flatMap {
+        result =>
+          if (result.isFailed) 
ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
+          else ZIO.succeed(SavepointResponse(result.location.orNull))
+      }
+      .runIOAsTry match {
+      case Success(rsp) =>
+        logInfo(richMsg("Cancel flink job successfully."))
+        rsp
+      case Failure(err) =>
+        logError(
+          richMsg(s"Cancel flink job fail in 
${request.executionMode.getName}_V2 mode!"),
+          err)
+        throw err
+    }
+  }
+
+}
+
+object KubernetesClientV2 {
+
+  case class StopJobFail(msg: String) extends Exception(msg)
+  case class TriggerJobSavepointFail(msg: String) extends Exception(msg)
+}
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 2dd756b8f..8ca6c518f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -44,7 +44,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
       .safeSet(KubernetesConfigOptions.NAMESPACE, 
submitRequest.k8sSubmitParam.kubernetesNamespace)
       .safeSet(
         KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-        
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType))
+        
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType.get))
 
     if (submitRequest.buildResult != null) {
       if (submitRequest.executionMode == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
index b3c039bf2..1637eb8da 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes.v2.model
 
 import org.apache.streampark.flink.kubernetes.v2.jacksonMapper
-import 
org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef.mapPodToPodTemplate
+import 
org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef.{mapPodToPodTemplate,
 DEFAULT_SERVICE_ACCOUNT}
 
 import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod}
 import org.apache.flink.v1beta1.{flinkdeploymentspec, FlinkDeployment, 
FlinkDeploymentSpec}
@@ -58,7 +58,7 @@ case class FlinkDeploymentDef(
     name: String,
     image: String,
     imagePullPolicy: Option[String] = None,
-    serviceAccount: String = "flink",
+    serviceAccount: String = DEFAULT_SERVICE_ACCOUNT,
     flinkVersion: FlinkVersion,
     jobManager: JobManagerDef,
     taskManager: TaskManagerDef,
@@ -108,6 +108,9 @@ case class FlinkDeploymentDef(
 }
 
 object FlinkDeploymentDef {
+
+  lazy val DEFAULT_SERVICE_ACCOUNT = "flink"
+
   def mapPodToPodTemplate[A: ClassTag](pod: Pod, clz: Class[A]): Try[A] = Try {
     val json = jacksonMapper.writeValueAsString(pod)
     jacksonMapper.readValue(json, clz)
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
index ee23bd49d..7f8496839 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
@@ -23,13 +23,13 @@ import org.apache.streampark.flink.kubernetes.v2.model._
 import org.apache.streampark.flink.kubernetes.v2.model.TrackKey._
 
 import org.apache.flink.v1beta1.{FlinkDeployment, FlinkDeploymentSpec, 
FlinkSessionJob, FlinkSessionJobSpec}
-import zio.{IO, Ref, Schedule, UIO}
+import zio.{IO, Ref, Schedule, UIO, ZIO}
 import zio.ZIO.logInfo
 import zio.concurrent.{ConcurrentMap, ConcurrentSet}
 import zio.stream.ZStream
 
 /** Flink Kubernetes resource observer. */
-sealed trait FlinkK8sObserver {
+sealed trait FlinkK8sObserverTrait {
 
   /** Start tracking resources. */
   def track(key: TrackKey): UIO[Unit]
@@ -37,6 +37,14 @@ sealed trait FlinkK8sObserver {
   /** Stop tracking resources. */
   def untrack(key: TrackKey): UIO[Unit]
 
+  /** Stop tracking resources by TrackKey.id. */
+  def untrackById(appId: Long): UIO[Unit] = {
+    trackedKeys.find(_.id == appId).flatMap {
+      case Some(key) => untrack(key)
+      case None      => ZIO.unit
+    }
+  }
+
   /** All tracked key in observer. */
   def trackedKeys: ConcurrentSet[TrackKey]
 
@@ -88,7 +96,7 @@ sealed trait FlinkK8sObserver {
 
 }
 
-object FlinkK8sObserver extends FlinkK8sObserver {
+object FlinkK8sObserver extends FlinkK8sObserverTrait {
 
   // The following is a visible external snapshot.
   val trackedKeys          = ConcurrentSet.empty[TrackKey].runUIO
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
index d07b60540..ac5d5b4e4 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
@@ -33,7 +33,7 @@ import zio.stream.ZStream
  * When deploying or deleting flink resources, the FlinkK8sOperator will 
automatically
  * handle the related tracing.
  */
-sealed trait FlinkK8sOperator {
+sealed trait FlinkK8sOperatorTrait {
 
   /** Directly operate Flink Kubernetes CR. */
   val k8sCrOpr: CROperator.type = CROperator
@@ -95,7 +95,7 @@ sealed trait FlinkK8sOperator {
 
 }
 
-object FlinkK8sOperator extends FlinkK8sOperator {
+object FlinkK8sOperator extends FlinkK8sOperatorTrait {
 
   private val obr       = FlinkK8sObserver
   private val flinkRest = FlinkRestRequest
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index e9a3b06e1..21975d248 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes
 
+import org.apache.streampark.common.conf.K8sFlinkConfig
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import 
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, 
SESSION}
 import org.apache.streampark.flink.kubernetes.event.{BuildInEvent, 
FlinkJobStateEvent, FlinkJobStatusChangeEvent}
@@ -64,13 +65,15 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = 
FlinkTrackConfig.defaultCo
   }
 
   def doWatching(trackId: TrackId): Unit = {
-    if (trackId.isLegal) {
+    if (!K8sFlinkConfig.isV2Enabled && trackId.isLegal) {
       watchController.trackIds.set(trackId)
     }
   }
 
   def unWatching(trackId: TrackId): Unit = {
-    watchController.canceling.set(trackId)
+    if (!K8sFlinkConfig.isV2Enabled) {
+      watchController.canceling.set(trackId)
+    }
   }
 
   override def isInWatching(trackId: TrackId): Boolean = 
watchController.isInWatching(trackId)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
index 1d8a1f554..f544ae4ff 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes
 
+import org.apache.streampark.common.conf.K8sFlinkConfig
 import org.apache.streampark.flink.kubernetes.event.BuildInEvent
 import org.apache.streampark.flink.kubernetes.model.{ClusterKey, 
FlinkMetricCV, JobStatusCV, TrackId}
 
@@ -27,37 +28,37 @@ import 
org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV,
 trait FlinkK8sWatcherLazyStartAop extends FlinkK8sWatcher {
 
   abstract override def doWatching(trackId: TrackId): Unit = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.doWatching(trackId)
   }
 
   abstract override def unWatching(trackId: TrackId): Unit = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.unWatching(trackId)
   }
 
   abstract override def isInWatching(trackId: TrackId): Boolean = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.isInWatching(trackId)
   }
 
   abstract override def getAllWatchingIds: Set[TrackId] = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.getAllWatchingIds
   }
 
   abstract override def getJobStatus(trackId: TrackId): Option[JobStatusCV] = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.getJobStatus(trackId)
   }
 
   abstract override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, 
JobStatusCV] = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.getJobStatus(trackIds)
   }
 
   abstract override def getAllJobStatus: Map[CacheKey, JobStatusCV] = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.getAllJobStatus
   }
 
@@ -72,12 +73,12 @@ trait FlinkK8sWatcherLazyStartAop extends FlinkK8sWatcher {
   }
 
   abstract override def checkIsInRemoteCluster(trackId: TrackId): Boolean = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.checkIsInRemoteCluster(trackId)
   }
 
   abstract override def postEvent(event: BuildInEvent, sync: Boolean): Unit = {
-    start()
+    if (!K8sFlinkConfig.isV2Enabled) start()
     super.postEvent(event, sync)
   }
 
diff --git a/streampark-flink/streampark-flink-packer/pom.xml 
b/streampark-flink/streampark-flink-packer/pom.xml
index a6e2a0716..d5377465a 100644
--- a/streampark-flink/streampark-flink-packer/pom.xml
+++ b/streampark-flink/streampark-flink-packer/pom.xml
@@ -48,6 +48,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-kubernetes-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- maven build tools -->
         <dependency>
             <groupId>org.apache.maven.plugins</groupId>
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
 
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
index 50c235e2d..0c0e8e4b1 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
+++ 
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
@@ -42,7 +42,7 @@ public enum PipelineType {
   /** flink native kubernetes application mode */
   FLINK_NATIVE_K8S_APPLICATION(
       2,
-      "flink native kubernetes session mode task building pipeline",
+      "flink native kubernetes application mode task building pipeline",
       ImmutableMap.<Integer, String>builder()
           .put(1, "Create building workspace")
           .put(2, "Export kubernetes pod template")
@@ -70,7 +70,16 @@ public enum PipelineType {
           .put(2, "Resolve maven dependencies")
           .put(3, "upload jar to yarn.provided.lib.dirs")
           .build(),
-      SimpleBuildResponse.class);
+      SimpleBuildResponse.class),
+
+  FLINK_K8S_APPLICATION_V2(
+      5,
+      "flink kubernetes application mode task building pipeline v2",
+      ImmutableMap.<Integer, String>builder()
+          .put(1, "Create building workspace")
+          .put(2, "Build shaded flink app jar")
+          .build(),
+      K8sAppModeBuildResponse.class);
 
   private final Integer code;
   /** short description of pipeline type. */
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
index e376d561f..7efc2d95b 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
@@ -66,3 +66,19 @@ case class DockerImageBuildResponse(
       s"dockerInnerMainJarPath: $dockerInnerMainJarPath, " +
       s"pass: $pass }"
 }
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+case class K8sAppModeBuildResponse(
+    workspacePath: String,
+    flinkBaseImage: String,
+    mainJarPath: String,
+    extraLibJarPaths: Set[String],
+    pass: Boolean = false
+) extends FlinkBuildResult {
+  override def toString: String =
+    s"{ workspacePath: $workspacePath, " +
+      s"flinkBaseImage: $flinkBaseImage, " +
+      s"mainJarPath: $mainJarPath, " +
+      s"extraLibJarPaths: ${extraLibJarPaths.mkString(",")}, " +
+      s"pass: $pass }"
+}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
new file mode 100644
index 000000000..6f59f6f02
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.pipeline.impl
+
+import org.apache.streampark.common.enums.DevelopmentMode
+import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.flink.packer.maven.MavenTool
+import org.apache.streampark.flink.packer.pipeline._
+
+import scala.language.postfixOps
+
+/**
+ * Building pipeline V2(base on kubernetes operator) for flink 
kubernetes-native application mode
+ */
+class FlinkK8sApplicationBuildPipelineV2(request: 
FlinkK8sApplicationBuildRequest)
+  extends BuildPipeline {
+
+  override def pipeType: PipelineType = PipelineType.FLINK_K8S_APPLICATION_V2
+
+  @throws[Throwable]
+  override protected def buildProcess(): K8sAppModeBuildResponse = {
+
+    // Step-1: init build workspace of flink job
+    // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
+    val buildWorkspace =
+      execStep(1) {
+        val buildWorkspace = 
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
+        LfsOperator.mkCleanDirs(buildWorkspace)
+        logInfo(s"recreate building workspace: $buildWorkspace")
+        buildWorkspace
+      }.getOrElse(throw getError.exception)
+
+    // Step-2: build shaded flink job jar and handle extra jars
+    // the output shaded jar file name like: streampark-flinkjob_myjob-test.jar
+    val (shadedJar, extJarLibs) =
+      execStep(2) {
+        val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
+        val extJarLibs = request.developmentMode match {
+          case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
+          case DevelopmentMode.CUSTOM_CODE => Set[String]()
+        }
+        val shadedJar =
+          MavenTool.buildFatJar(request.mainClass, request.providedLibs, 
shadedJarOutputPath)
+        logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
+        shadedJar -> extJarLibs
+      }.getOrElse(throw getError.exception)
+
+    K8sAppModeBuildResponse(
+      workspacePath = buildWorkspace,
+      flinkBaseImage = request.flinkBaseImage,
+      mainJarPath = shadedJar.getAbsolutePath,
+      extraLibJarPaths = extJarLibs)
+  }
+
+  override protected def offerBuildParam: FlinkK8sApplicationBuildRequest = 
request
+}
+
+object FlinkK8sApplicationBuildPipelineV2 {
+  def of(request: FlinkK8sApplicationBuildRequest): 
FlinkK8sApplicationBuildPipelineV2 =
+    new FlinkK8sApplicationBuildPipelineV2(request)
+
+}

Reply via email to