This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 11c7fa837 [Feat][Flink-K8s-V2] Refactor the lifecycle control of Flink
K8s application-mode jobs (#3037)
11c7fa837 is described below
commit 11c7fa8377398b09e052c046c16d209974e2fce0
Author: Linying Assad <[email protected]>
AuthorDate: Sat Sep 9 22:22:01 2023 -0500
[Feat][Flink-K8s-V2] Refactor the lifecycle control of Flink K8s
application-mode jobs (#3037)
* [Feat] Refactor Flink resource build pipeline for fink-k8s-v2 module.
#2882
* [Feat] Adaptation of the submission client for flink-k8s-v2. #2882
* [Feat][flink-k8s-v2] Migrate ENABLE_V2 to streampark-common module. #2882
* [Feat][flink-k8s-v2] Disable Flink job tracking watcher at flink-k8s-v1.
#2882
* [Feat][flink-k8s-v2] Adaptation of Flink job canceling and triggering
savepoint operations . #2882
* [Feat][flink-k8s-v2] Untrack flink when it is deleted. #2882
---
.../streampark/common/conf/K8sFlinkConfig.scala | 20 +-
.../org/apache/streampark/common/zio/ZIOExt.scala | 22 +-
.../apache/streampark/common/zio/ZIOJavaUtil.scala | 25 +--
.../impl/ApplicationActionServiceImpl.java | 6 +-
.../impl/ApplicationManageServiceImpl.java | 7 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 8 +-
.../core/service/impl/SavePointServiceImpl.java | 1 +
.../console/core/task/FlinkK8sWatcherWrapper.java | 15 +-
.../flink/client/bean/CancelRequest.scala | 1 +
.../flink/client/bean/KubernetesSubmitParam.scala | 78 +++++++
.../flink/client/bean/SubmitRequest.scala | 13 --
.../client/bean/TriggerSavepointRequest.scala | 1 +
.../flink/client/FlinkClientHandler.scala | 6 +-
.../impl/KubernetesApplicationClientV2.scala | 238 +++++++++++++++++++++
.../impl/KubernetesNativeApplicationClientV2.scala | 121 -----------
.../flink/client/trait/FlinkClientTrait.scala | 7 +
.../client/trait/KubernetesClientV2Trait.scala | 178 +++++++++++++++
.../client/trait/KubernetesNativeClientTrait.scala | 2 +-
.../kubernetes/v2/model/FlinkDeploymentDef.scala | 7 +-
.../kubernetes/v2/observer/FlinkK8sObserver.scala | 14 +-
.../kubernetes/v2/operator/FlinkK8sOperator.scala | 4 +-
.../flink/kubernetes/DefaultFlinkK8sWatcher.scala | 7 +-
.../kubernetes/FlinkK8sWatcherLazyStartAop.scala | 19 +-
streampark-flink/streampark-flink-packer/pom.xml | 6 +
.../flink/packer/pipeline/PipelineType.java | 13 +-
.../flink/packer/pipeline/BuildResponse.scala | 16 ++
.../impl/FlinkK8sApplicationBuildPipelineV2.scala | 77 +++++++
27 files changed, 728 insertions(+), 184 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index 7313bf201..f5a20923a 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -18,9 +18,22 @@
package org.apache.streampark.common.conf
/** Flink kubernetes Configuration for v1 version */
-@deprecated("see: org.apache.streampark.flink.kubernetes.v2.Config")
+
object K8sFlinkConfig {
+ lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2)
+
+ val ENABLE_V2: InternalOption = InternalOption(
+ key = "streampark.flink-k8s.enable-v2",
+ defaultValue = false,
+ classType = classOf[Boolean],
+ description =
+ "Whether to enable the v2 version(base on flink-kubernetes-operator) of
flink kubernetes operation"
+ )
+
+ // ======= deprecated =======
+
+ @deprecated
val jobStatusTrackTaskTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.job-status",
defaultValue = 120L,
@@ -28,6 +41,7 @@ object K8sFlinkConfig {
description = "run timeout seconds of single flink-k8s metrics tracking
task"
)
+ @deprecated
val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
key =
"streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
defaultValue = 120L,
@@ -35,6 +49,7 @@ object K8sFlinkConfig {
description = "run timeout seconds of single flink-k8s job status tracking
task"
)
+ @deprecated
val jobStatueTrackTaskIntervalSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-interval-sec.job-status",
defaultValue = 5L,
@@ -42,6 +57,7 @@ object K8sFlinkConfig {
description = "interval seconds between two single flink-k8s metrics
tracking task"
)
+ @deprecated
val metricTrackTaskIntervalSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-interval-sec.cluster-metric",
defaultValue = 5L,
@@ -49,6 +65,7 @@ object K8sFlinkConfig {
description = "interval seconds between two single flink-k8s metrics
tracking task"
)
+ @deprecated
val silentStateJobKeepTrackingSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.silent-state-keep-sec",
defaultValue = 60,
@@ -69,6 +86,7 @@ object K8sFlinkConfig {
)
/** kubernetes default namespace */
+ @deprecated
val DEFAULT_KUBERNETES_NAMESPACE = "default"
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
index 6fb546c65..7baa195d3 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
@@ -17,14 +17,16 @@
package org.apache.streampark.common.zio
-import zio.{IO, Runtime, Unsafe, ZIO}
+import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO}
import zio.stream.ZStream
+import scala.util.Try
+
/** ZIO extension */
object ZIOExt {
/* Unsafe run zio effect. */
- @throws[Exception]
+ @throws[FiberFailure]
@inline def unsafeRun[E, A](zio: IO[E, A]): A = Unsafe.unsafe {
implicit u =>
Runtime.default.unsafe
@@ -32,11 +34,25 @@ object ZIOExt {
.getOrThrowFiberFailure()
}
+ /** unsafe run IO to Either. */
+ @inline def unsafeRunToEither[E, A](zio: IO[E, A]): Either[Throwable, A] =
Unsafe.unsafe {
+ implicit u =>
+ Runtime.default.unsafe
+ .run(zio.provideLayer(Runtime.removeDefaultLoggers >>>
ZIOLogger.default))
+ .toEither
+ }
+
implicit class IOOps[E, A](io: ZIO[Any, E, A]) {
/** unsafe run IO */
- @throws[Throwable]
+ @throws[FiberFailure]
def runIO: A = ZIOExt.unsafeRun(io)
+
+ /** unsafe run IO to Try. */
+ def runIOAsTry: Try[A] = unsafeRunToEither(io).toTry
+
+ /** unsafe run IO to Either. */
+ def runIOAsEither: Either[Throwable, A] = unsafeRunToEither(io)
}
implicit class UIOOps[A](uio: ZIO[Any, Nothing, A]) {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala
similarity index 55%
copy from
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
copy to
streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala
index 035c5348d..d7260b5d5 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala
@@ -15,23 +15,16 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.client.bean
+package org.apache.streampark.common.zio
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
-import org.apache.streampark.common.enums.ExecutionMode
+import zio.{FiberFailure, IO, UIO}
-import javax.annotation.Nullable
+/** Util for running ZIO effects in Java. */
+object ZIOJavaUtil {
-import java.util.{Map => JavaMap}
+ @throws[FiberFailure]
+ def runIO[E, A](zio: IO[E, A]): A = ZIOExt.unsafeRun(zio)
-/** Trigger savepoint request. */
-case class TriggerSavepointRequest(
- flinkVersion: FlinkVersion,
- executionMode: ExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- jobId: String,
- savepointPath: String,
- nativeFormat: Boolean,
- override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
- extends SavepointRequestTrait
+ def runUIO[A](uio: UIO[A]): A = ZIOExt.unsafeRun(uio)
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index ff69ed84b..4c836469f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -294,6 +294,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
CancelRequest cancelRequest =
new CancelRequest(
+ application.getId(),
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
properties,
@@ -413,10 +414,13 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
+ // TODO Need to display more K8s submission parameters in the front-end UI.
+ // See: org.apache.streampark.flink.client.bean.KubernetesSubmitParam
KubernetesSubmitParam kubernetesSubmitParam =
- new KubernetesSubmitParam(
+ KubernetesSubmitParam.apply(
application.getClusterId(),
application.getK8sNamespace(),
+ application.getFlinkImage(),
application.getK8sRestExposedTypeEnum());
Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv,
application);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index eeabbaad8..ea8fc61c1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -17,11 +17,13 @@
package org.apache.streampark.console.core.service.application.impl;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.zio.ZIOJavaUtil;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -54,6 +56,7 @@ import
org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.commons.lang3.StringUtils;
@@ -176,9 +179,11 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
// 8) remove app
removeApp(application);
-
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+ if (K8sFlinkConfig.isV2Enabled()) {
+ ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId()));
+ }
} else {
FlinkHttpWatcher.unWatching(appParam.getId());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 7bbbb5afe..7260fc72f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
@@ -78,6 +79,7 @@ import
org.apache.streampark.flink.packer.pipeline.PipeWatcher;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.streampark.flink.packer.pipeline.PipelineType;
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline;
+import
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipelineV2;
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipeline;
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
import
org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
@@ -511,7 +513,11 @@ public class AppBuildPipeServiceImpl
dockerConfig.getPassword()),
app.getIngressTemplate());
log.info("Submit params to building pipeline : {}",
k8sApplicationBuildRequest);
- return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
+ if (K8sFlinkConfig.isV2Enabled()) {
+ return
FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
+ } else {
+ return
FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
+ }
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " +
app.getExecutionModeEnum());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 20f7c31f4..c85d0f1d4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -493,6 +493,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
Map<String, Object> properties = this.tryGetRestProps(application,
cluster);
return new TriggerSavepointRequest(
+ application.getId(),
flinkEnv.getFlinkVersion(),
application.getExecutionModeEnum(),
properties,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index c385764ef..32f7a2391 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.task;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.entity.Application;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
@@ -82,11 +83,15 @@ public class FlinkK8sWatcherWrapper {
}
private void initFlinkK8sWatcher(@Nonnull FlinkK8sWatcher trackMonitor) {
- // register change event listener
- trackMonitor.registerListener(flinkK8sChangeEventListener);
- // recovery tracking list
- List<TrackId> k8sApp = getK8sWatchingApps();
- k8sApp.forEach(trackMonitor::doWatching);
+ if (!K8sFlinkConfig.isV2Enabled()) {
+ // register change event listener
+ trackMonitor.registerListener(flinkK8sChangeEventListener);
+ // recovery tracking list
+ List<TrackId> k8sApp = getK8sWatchingApps();
+ k8sApp.forEach(trackMonitor::doWatching);
+ } else {
+ // TODO [flink-k8s-v2] Recovery tracking list and invoke
FlinkK8sObserver.track()
+ }
}
/** get flink-k8s job tracking application from db. */
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index dd5481dc4..f06db3d9d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -25,6 +25,7 @@ import javax.annotation.Nullable
import java.util.{Map => JavaMap}
case class CancelRequest(
+ id: Long,
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@Nullable properties: JavaMap[String, Any],
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
new file mode 100644
index 000000000..07832841c
--- /dev/null
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.bean
+
+import org.apache.streampark.common.enums.FlinkK8sRestExposedType
+
+import javax.annotation.Nullable
+
+import java.util
+import java.util.{Map => JMap}
+
+/**
+ * TODO Need to display more K8s submission parameters in the front-end UI.
+ *
+ * It will eventually be converted to
+ * [[org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef]]
+ *
+ * The logic of conversion is located at:
+ *
[[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]]
+ */
+// todo split into Application mode and SessionJob mode
+case class KubernetesSubmitParam(
+ clusterId: String,
+ kubernetesNamespace: String,
+ baseImage: Option[String] = None,
+ imagePullPolicy: Option[String] = None,
+ serviceAccount: Option[String] = None,
+ podTemplate: Option[String] = None,
+ jobManagerCpu: Option[Double] = None,
+ jobManagerMemory: Option[String] = None,
+ jobManagerEphemeralStorage: Option[String] = None,
+ jobManagerPodTemplate: Option[String] = None,
+ taskManagerCpu: Option[Double] = None,
+ taskManagerMemory: Option[String] = None,
+ taskManagerEphemeralStorage: Option[String] = None,
+ taskManagerPodTemplate: Option[String] = None,
+ logConfiguration: JMap[String, String] = new util.HashMap[String,
String](),
+ flinkRestExposedType: Option[FlinkK8sRestExposedType] = None
+)
+
+object KubernetesSubmitParam {
+
+ /**
+ * Compatible with streampark old native k8s submission parameters.
+ *
+ * @param clusterId
+ * flink cluster id in k8s cluster.
+ * @param kubernetesNamespace
+ * k8s namespace.
+ * @param flinkRestExposedType
+ * flink rest-service exposed type on k8s cluster.
+ */
+ def apply(
+ clusterId: String,
+ kubernetesNamespace: String,
+ baseImage: String,
+ @Nullable flinkRestExposedType: FlinkK8sRestExposedType):
KubernetesSubmitParam =
+ KubernetesSubmitParam(
+ clusterId = clusterId,
+ kubernetesNamespace = kubernetesNamespace,
+ baseImage = Some(baseImage),
+ flinkRestExposedType = Option(flinkRestExposedType))
+}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index d06c036c1..4989abaf8 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -36,19 +36,6 @@ import java.util.{Map => JavaMap}
import scala.collection.convert.ImplicitConversions._
import scala.util.Try
-/**
- * @param clusterId
- * flink cluster id in k8s cluster.
- * @param kubernetesNamespace
- * k8s namespace.
- * @param flinkRestExposedType
- * flink rest-service exposed type on k8s cluster.
- */
-case class KubernetesSubmitParam(
- clusterId: String,
- kubernetesNamespace: String,
- @Nullable flinkRestExposedType: FlinkK8sRestExposedType)
-
case class SubmitRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 035c5348d..85c788da7 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -26,6 +26,7 @@ import java.util.{Map => JavaMap}
/** Trigger savepoint request. */
case class TriggerSavepointRequest(
+ id: Long,
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@Nullable properties: JavaMap[String, Any],
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index e85ceb6ee..810df34e3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.client
+import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.enums.ExecutionMode._
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
@@ -32,7 +33,10 @@ object FlinkClientHandler {
YARN_SESSION -> YarnSessionClient,
YARN_PER_JOB -> YarnPerJobClient,
KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient,
- KUBERNETES_NATIVE_APPLICATION -> KubernetesNativeApplicationClient
+ KUBERNETES_NATIVE_APPLICATION -> {
+ if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2
+ else KubernetesNativeApplicationClient
+ }
)
def submit(request: SubmitRequest): SubmitResponse = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
new file mode 100644
index 000000000..665159c03
--- /dev/null
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
+
+import org.apache.flink.client.deployment.application.ApplicationConfiguration
+import org.apache.flink.configuration._
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.mapAsScalaMapConverter
+import scala.util.{Failure, Success, Try}
+
+/** Flink K8s application mode task operation client via Flink K8s Operator */
+object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with
Logger {
+
+ @throws[Exception]
+ override def doSubmit(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): SubmitResponse = {
+
+ val richMsg: String => String =
s"[flink-submit][appId=${submitRequest.id}] " + _
+
+ submitRequest.checkBuildResult()
+ val buildResult =
submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse]
+
+ // Convert to FlinkDeployment CR definition
+ val flinkDeployDef = genFlinkDeployDef(submitRequest, flinkConfig,
buildResult) match {
+ case Right(result) => result
+ case Left(errMsg) =>
+ throw new IllegalArgumentException(
+ richMsg(s"Error occurred while parsing parameters: $errMsg"))
+ }
+
+ // Submit FlinkDeployment CR to Kubernetes
+ FlinkK8sOperator.deployApplicationJob(submitRequest.id,
flinkDeployDef).runIOAsTry match {
+ case Success(_) =>
+ logInfo(richMsg("Flink job has been submitted successfully."))
+ case Failure(err) =>
+ logError(
+ richMsg(s"Submit Flink job fail in
${submitRequest.executionMode.getName}_V2 mode!"),
+ err)
+ throw err
+ }
+
+ SubmitResponse(
+ clusterId = submitRequest.k8sSubmitParam.clusterId,
+ flinkConfig = flinkConfig.toMap,
+ jobId = submitRequest.jobId,
+ jobManagerUrl = null
+ )
+ }
+
+ // Generate FlinkDeployment CR definition, it is a pure effect function.
+ private def genFlinkDeployDef(
+ submitReq: SubmitRequest,
+ originFlinkConfig: Configuration,
+ buildResult: K8sAppModeBuildResponse): Either[FailureMessage,
FlinkDeploymentDef] = {
+
+ val flinkConfObj = originFlinkConfig.clone()
+ val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
+
+ val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
+ .getOrElse("default")
+
+ val name = Option(submitReq.k8sSubmitParam.clusterId)
+ .filter(!_.isBlank)
+ .getOrElse(return Left("cluster-id should not be empty"))
+
+ val image = submitReq.k8sSubmitParam.baseImage
+ .orElse(Option(buildResult.flinkBaseImage))
+ .filter(!_.isBlank)
+ .getOrElse(return Left("Flink base image should not be empty"))
+
+ val imagePullPolicy = flinkConfObj
+ .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+ .map(_.toString)
+ .orElse(submitReq.k8sSubmitParam.imagePullPolicy)
+
+ val serviceAccount = flinkConfObj
+ .getOption(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+ .orElse(submitReq.k8sSubmitParam.serviceAccount)
+ .getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT)
+
+ val flinkVersion = Option(submitReq.flinkVersion.majorVersion)
+ .map(majorVer => "V" + majorVer.replace(".", "_"))
+ .flatMap(v => FlinkVersion.values().find(_.name() == v))
+ .getOrElse(return Left(s"Unsupported Flink version:
${submitReq.flinkVersion.majorVersion}"))
+
+ val jobDef = genJobDef(flinkConfObj, jarUriHint =
Some(buildResult.mainJarPath))
+ .getOrElse(return Left("Invalid job definition"))
+
+ val podTemplate = submitReq.k8sSubmitParam.podTemplate.map(
+ yaml =>
+ unmarshalPodTemplate(yaml)
+ .getOrElse(return Left(s"Invalid pod template: \n$yaml")))
+
+ val jobManager = {
+ val cpu = flinkConfMap
+ .get(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .orElse(flinkConfMap.get(KUBERNETES_JM_CPU_KEY))
+ .flatMap(value => Try(value.toDouble).toOption)
+ .orElse(submitReq.k8sSubmitParam.jobManagerCpu)
+ .getOrElse(KUBERNETES_JM_CPU_DEFAULT)
+
+ val mem = flinkConfObj
+ .getOption(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+ .map(_.toString)
+ .orElse(submitReq.k8sSubmitParam.jobManagerMemory)
+ .getOrElse(KUBERNETES_JM_MEMORY_DEFAULT)
+
+ val podTemplate = submitReq.k8sSubmitParam.jobManagerPodTemplate.map(
+ yaml =>
+ unmarshalPodTemplate(yaml)
+ .getOrElse(return Left(s"Invalid job manager pod template:
\n$yaml")))
+ JobManagerDef(
+ cpu = cpu,
+ memory = mem,
+ ephemeralStorage = submitReq.k8sSubmitParam.jobManagerEphemeralStorage,
+ podTemplate = podTemplate)
+ }
+
+ val taskManager = {
+ val cpu = flinkConfMap
+ .get(KUBERNETES_TM_CPU_AMOUNT_KEY)
+ .orElse(flinkConfMap.get(KUBERNETES_TM_CPU_KEY))
+ .flatMap(value => Try(value.toDouble).toOption)
+ .orElse(submitReq.k8sSubmitParam.taskManagerCpu)
+ .getOrElse(KUBERNETES_TM_CPU_DEFAULT)
+
+ val mem = flinkConfObj
+ .getOption(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+ .map(_.toString)
+ .orElse(submitReq.k8sSubmitParam.taskManagerMemory)
+ .getOrElse(KUBERNETES_TM_MEMORY_DEFAULT)
+
+ val podTemplate = submitReq.k8sSubmitParam.taskManagerPodTemplate.map(
+ yaml =>
+ unmarshalPodTemplate(yaml)
+ .getOrElse(return Left(s"Invalid task manager pod template:
\n$yaml")))
+ TaskManagerDef(
+ cpu = cpu,
+ memory = mem,
+ ephemeralStorage =
submitReq.k8sSubmitParam.taskManagerEphemeralStorage,
+ podTemplate = podTemplate)
+ }
+
+ val logConfiguration = {
+ val items = submitReq.k8sSubmitParam.logConfiguration.asScala
+ if (items.isEmpty) {
+ // Get default log config from local target flink home
+ val logConfigs = Array(
+ "log4j.properties" ->
s"${submitReq.flinkVersion.flinkHome}/conf/log4j-console.properties",
+ "logback.xml" ->
s"${submitReq.flinkVersion.flinkHome}/conf/logback-console.xml"
+ )
+ logConfigs
+ .map { case (name, path) => name -> os.Path(path) }
+ .filter { case (_, path) => Try(os.exists(path) &&
os.isFile(path)).getOrElse(false) }
+ .map { case (name, path) => name ->
Try(os.read(path)).toOption.filter(!_.isBlank) }
+ .filter { case (_, content) => content.isDefined }
+ .map { case (name, content) => name -> content.get }
+ .foreach { case (name, content) => items += name -> content }
+ }
+ items.toMap
+ }
+
+ val extraFlinkConfiguration = {
+ // Remove conflicting configuration items
+ val result: mutable.Map[String, String] = flinkConfObj
+ .remove(DeploymentOptions.TARGET)
+ .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+ .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+ .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+ .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+ .remove(PipelineOptions.JARS)
+ .remove(CoreOptions.DEFAULT_PARALLELISM)
+ .remove(ApplicationConfiguration.APPLICATION_ARGS)
+ .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+ .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+ .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+ .toMap
+ .asScala
+ .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .removeKey(KUBERNETES_TM_CPU_KEY)
+ .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .removeKey(KUBERNETES_JM_CPU_KEY)
+ // Set kubernetes.rest-service.exposed.type configuration for
compatibility with native-k8s
+ submitReq.k8sSubmitParam.flinkRestExposedType.foreach {
+ exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY ->
exposedType.getName
+ }
+ result.toMap
+ }
+
+ // TODO Migrate the construction logic of ingress to here and set it into
FlinkDeploymentDef.ingress
+ // See:
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
Step-8
+ Right(
+ FlinkDeploymentDef(
+ namespace = namespace,
+ name = name,
+ image = image,
+ imagePullPolicy = imagePullPolicy,
+ serviceAccount = serviceAccount,
+ flinkVersion = flinkVersion,
+ jobManager = jobManager,
+ taskManager = taskManager,
+ flinkConfiguration = extraFlinkConfiguration,
+ logConfiguration = logConfiguration,
+ podTemplate = podTemplate,
+ job = Some(jobDef),
+ extJarPaths = Array.empty
+ ))
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
deleted file mode 100644
index f43e88f0b..000000000
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.client.impl
-
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
-import org.apache.streampark.common.zio.ZIOExt.IOOps
-import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
-import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
JobDef, JobManagerDef, TaskManagerDef}
-import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
-import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.flink.configuration.{Configuration, DeploymentOptions,
ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
-import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
-
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-
-object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait
{
- @throws[Exception]
- override def doSubmit(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration): SubmitResponse = {
-
- // require parameters
- require(
- StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
- s"[flink-submit] submit flink job failed, clusterId is null,
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
- )
-
- // check the last building result
- submitRequest.checkBuildResult()
-
- try {
- val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest,
flinkConfig)
- FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
- val result = SubmitResponse(null, flinkConfig.toMap,
submitRequest.jobId, null)
- logInfo(
- s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}")
- result
- } catch {
- case e: Exception =>
- logError(s"submit flink job fail in ${submitRequest.executionMode}
mode")
- throw e
- } finally {}
- }
-
- override def doCancel(
- cancelRequest: CancelRequest,
- flinkConfig: Configuration): CancelResponse = {
- flinkConfig.safeSet(
- DeploymentOptions.TARGET,
- ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
- super.doCancel(cancelRequest, flinkConfig)
- }
-
- override def doTriggerSavepoint(
- request: TriggerSavepointRequest,
- flinkConf: Configuration): SavepointResponse = {
- flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
- super.doTriggerSavepoint(request, flinkConf)
- }
-
- private[this] def convertFlinkDeploymentDef(
- submitRequest: SubmitRequest,
- flinkConfig: Configuration): FlinkDeploymentDef = {
- val spec = FlinkDeploymentDef(
- name = submitRequest.appName,
- namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
- image =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag,
- flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
- .map(_.replace(".", "_"))
- .map("V" + _)
- .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
- case Some(version) => version
- case None => throw new IllegalArgumentException("Flink version not
found")
- },
- jobManager = JobManagerDef(
- cpu = 1,
- memory =
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
- taskManager = TaskManagerDef(
- cpu = 1,
- memory =
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
- job = Option(
- JobDef(
- jarURI =
-
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
- parallelism = 1,
- args =
Array(flinkConfig.toMap.get("$internal.application.program-args")),
- entryClass = Some(submitRequest.appMain),
- initialSavepointPath = Some(submitRequest.savePoint),
- allowNonRestoredState = Some(submitRequest.allowNonRestoredState)
- )),
- extJarPaths = submitRequest.userJarFile match {
- case null => Array.empty[String]
- case file => Array(file.getAbsolutePath)
- },
- flinkConfiguration = submitRequest.extraParameter match {
- case null => Map.empty
- case e => e.asScala.map { case (key, value) => key -> value.toString
}.toMap
- }
- )
- spec
- }
-}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 59aa46cc3..e6b16e377 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -543,6 +543,13 @@ trait FlinkClientTrait extends Logger {
case x => x
}
}
+ def getOption[T](key: ConfigOption[T]): Option[T] = {
+ Option(flinkConfig.get(key))
+ }
+ def remove[T](key: ConfigOption[T]): Configuration = {
+ flinkConfig.removeConfig(key)
+ flinkConfig
+ }
}
private[client] def cancelJob(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
new file mode 100644
index 000000000..75b979246
--- /dev/null
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.`trait`
+
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import
org.apache.streampark.flink.client.`trait`.KubernetesClientV2.{StopJobFail,
TriggerJobSavepointFail}
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{JobDef,
JobSavepointDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.kubernetes.v2.yamlMapper
+
+import io.fabric8.kubernetes.api.model.Pod
+import org.apache.flink.client.deployment.application.ApplicationConfiguration
+import org.apache.flink.configuration.{Configuration, CoreOptions,
PipelineOptions}
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import zio.ZIO
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+import scala.util.{Failure, Success, Try}
+
+trait KubernetesClientV2Trait extends FlinkClientTrait {
+
+ protected type FailureMessage = String
+
+ protected val KUBERNETES_JM_CPU_KEY = "kubernetes.jobmanager.cpu"
+ protected val KUBERNETES_JM_CPU_AMOUNT_KEY =
"kubernetes.jobmanager.cpu.amount"
+ protected val KUBERNETES_JM_CPU_DEFAULT = 1.0
+ protected val KUBERNETES_JM_MEMORY_DEFAULT = "1600m"
+
+ protected val KUBERNETES_TM_CPU_KEY = "kubernetes.taskmanager.cpu"
+ protected val KUBERNETES_TM_CPU_AMOUNT_KEY =
"kubernetes.taskmanager.cpu.amount"
+ protected val KUBERNETES_TM_CPU_DEFAULT = -1.0
+ protected val KUBERNETES_TM_MEMORY_DEFAULT = "1728m"
+
+ protected val KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY =
"kubernetes.rest-service.exposed.type"
+
+ override def setConfig(submitRequest: SubmitRequest, flinkConf:
Configuration): Unit = {}
+
+ implicit protected class FlinkConfMapOps(map: mutable.Map[String, String]) {
+ def removeKey(key: String): mutable.Map[String, String] = { map -= key;
map }
+ }
+
+ protected def unmarshalPodTemplate(yaml: String): Try[Pod] = {
+ Try(yamlMapper.readValue(yaml, classOf[Pod]))
+ }
+
+ protected def genJobDef(
+ flinkConfObj: Configuration,
+ jarUriHint: Option[String]): Either[FailureMessage, JobDef] = {
+
+ val jarUri = jarUriHint
+
.orElse(flinkConfObj.getOption(PipelineOptions.JARS).flatMap(_.asScala.headOption))
+ .getOrElse(return Left("Flink job uri should not be empty"))
+
+ val parallel = flinkConfObj
+ .getOption(CoreOptions.DEFAULT_PARALLELISM)
+ .getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue())
+
+ val args = flinkConfObj
+ .getOption(ApplicationConfiguration.APPLICATION_ARGS)
+ .map(_.asScala.toArray)
+ .getOrElse(Array.empty[String])
+
+ val entryClass = flinkConfObj
+ .getOption(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+
+ val savePointPath = flinkConfObj
+ .getOption(SavepointConfigOptions.SAVEPOINT_PATH)
+
+ val allowNonRestoredState = flinkConfObj
+ .getOption(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+ .map(_.booleanValue())
+
+ Right(
+ JobDef(
+ jarURI = jarUri,
+ parallelism = parallel,
+ entryClass = entryClass,
+ args = args,
+ initialSavepointPath = savePointPath,
+ allowNonRestoredState = allowNonRestoredState
+ ))
+ }
+
+ @throws[Exception]
+ override def doCancel(request: CancelRequest, flinkConf: Configuration):
CancelResponse = {
+ val effect =
+ if (!request.withSavepoint) {
+ // cancel job
+ FlinkK8sOperator
+ .cancelJob(request.id)
+ .as(CancelResponse(null))
+ } else {
+ // stop job with savepoint
+ val savepointDef = JobSavepointDef(
+ drain = Option(request.withDrain).getOrElse(false),
+ savepointPath = Option(request.savepointPath),
+ formatType = Option(request.nativeFormat)
+ .map(if (_) JobSavepointDef.NATIVE_FORMAT else
JobSavepointDef.CANONICAL_FORMAT)
+ )
+ FlinkK8sOperator
+ .stopJob(request.id, savepointDef)
+ .flatMap {
+ result =>
+ if (result.isFailed)
ZIO.fail(StopJobFail(result.failureCause.get))
+ else ZIO.succeed(CancelResponse(result.location.orNull))
+ }
+ }
+
+ def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _
+
+ effect.runIOAsTry match {
+ case Success(rsp) =>
+ logInfo(richMsg("Cancel flink job successfully."))
+ rsp
+ case Failure(err) =>
+ logError(
+ richMsg(s"Cancel flink job fail in
${request.executionMode.getName}_V2 mode!"),
+ err)
+ throw err
+ }
+ }
+
+ @throws[Exception]
+ override def doTriggerSavepoint(
+ request: TriggerSavepointRequest,
+ flinkConf: Configuration): SavepointResponse = {
+
+ val savepointDef = JobSavepointDef(
+ savepointPath = Option(request.savepointPath),
+ formatType = Option(request.nativeFormat)
+ .map(if (_) JobSavepointDef.NATIVE_FORMAT else
JobSavepointDef.CANONICAL_FORMAT)
+ )
+
+ def richMsg: String => String =
s"[flink-trigger-savepoint][appId=${request.id}] " + _
+
+ FlinkK8sOperator
+ .triggerJobSavepoint(request.id, savepointDef)
+ .flatMap {
+ result =>
+ if (result.isFailed)
ZIO.fail(TriggerJobSavepointFail(result.failureCause.get))
+ else ZIO.succeed(SavepointResponse(result.location.orNull))
+ }
+ .runIOAsTry match {
+ case Success(rsp) =>
+ logInfo(richMsg("Cancel flink job successfully."))
+ rsp
+ case Failure(err) =>
+ logError(
+ richMsg(s"Cancel flink job fail in
${request.executionMode.getName}_V2 mode!"),
+ err)
+ throw err
+ }
+ }
+
+}
+
+object KubernetesClientV2 {
+
+ case class StopJobFail(msg: String) extends Exception(msg)
+ case class TriggerJobSavepointFail(msg: String) extends Exception(msg)
+}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 2dd756b8f..8ca6c518f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -44,7 +44,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
.safeSet(KubernetesConfigOptions.NAMESPACE,
submitRequest.k8sSubmitParam.kubernetesNamespace)
.safeSet(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType))
+
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType.get))
if (submitRequest.buildResult != null) {
if (submitRequest.executionMode ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
index b3c039bf2..1637eb8da 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes.v2.model
import org.apache.streampark.flink.kubernetes.v2.jacksonMapper
-import
org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef.mapPodToPodTemplate
+import
org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef.{mapPodToPodTemplate,
DEFAULT_SERVICE_ACCOUNT}
import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod}
import org.apache.flink.v1beta1.{flinkdeploymentspec, FlinkDeployment,
FlinkDeploymentSpec}
@@ -58,7 +58,7 @@ case class FlinkDeploymentDef(
name: String,
image: String,
imagePullPolicy: Option[String] = None,
- serviceAccount: String = "flink",
+ serviceAccount: String = DEFAULT_SERVICE_ACCOUNT,
flinkVersion: FlinkVersion,
jobManager: JobManagerDef,
taskManager: TaskManagerDef,
@@ -108,6 +108,9 @@ case class FlinkDeploymentDef(
}
object FlinkDeploymentDef {
+
+ lazy val DEFAULT_SERVICE_ACCOUNT = "flink"
+
def mapPodToPodTemplate[A: ClassTag](pod: Pod, clz: Class[A]): Try[A] = Try {
val json = jacksonMapper.writeValueAsString(pod)
jacksonMapper.readValue(json, clz)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
index ee23bd49d..7f8496839 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
@@ -23,13 +23,13 @@ import org.apache.streampark.flink.kubernetes.v2.model._
import org.apache.streampark.flink.kubernetes.v2.model.TrackKey._
import org.apache.flink.v1beta1.{FlinkDeployment, FlinkDeploymentSpec,
FlinkSessionJob, FlinkSessionJobSpec}
-import zio.{IO, Ref, Schedule, UIO}
+import zio.{IO, Ref, Schedule, UIO, ZIO}
import zio.ZIO.logInfo
import zio.concurrent.{ConcurrentMap, ConcurrentSet}
import zio.stream.ZStream
/** Flink Kubernetes resource observer. */
-sealed trait FlinkK8sObserver {
+sealed trait FlinkK8sObserverTrait {
/** Start tracking resources. */
def track(key: TrackKey): UIO[Unit]
@@ -37,6 +37,14 @@ sealed trait FlinkK8sObserver {
/** Stop tracking resources. */
def untrack(key: TrackKey): UIO[Unit]
+ /** Stop tracking resources by TrackKey.id. */
+ def untrackById(appId: Long): UIO[Unit] = {
+ trackedKeys.find(_.id == appId).flatMap {
+ case Some(key) => untrack(key)
+ case None => ZIO.unit
+ }
+ }
+
/** All tracked key in observer. */
def trackedKeys: ConcurrentSet[TrackKey]
@@ -88,7 +96,7 @@ sealed trait FlinkK8sObserver {
}
-object FlinkK8sObserver extends FlinkK8sObserver {
+object FlinkK8sObserver extends FlinkK8sObserverTrait {
// The following is a visible external snapshot.
val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
index d07b60540..ac5d5b4e4 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
@@ -33,7 +33,7 @@ import zio.stream.ZStream
* When deploying or deleting flink resources, the FlinkK8sOperator will
automatically
* handle the related tracing.
*/
-sealed trait FlinkK8sOperator {
+sealed trait FlinkK8sOperatorTrait {
/** Directly operate Flink Kubernetes CR. */
val k8sCrOpr: CROperator.type = CROperator
@@ -95,7 +95,7 @@ sealed trait FlinkK8sOperator {
}
-object FlinkK8sOperator extends FlinkK8sOperator {
+object FlinkK8sOperator extends FlinkK8sOperatorTrait {
private val obr = FlinkK8sObserver
private val flinkRest = FlinkRestRequest
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
index e9a3b06e1..21975d248 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes
+import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION,
SESSION}
import org.apache.streampark.flink.kubernetes.event.{BuildInEvent,
FlinkJobStateEvent, FlinkJobStatusChangeEvent}
@@ -64,13 +65,15 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig =
FlinkTrackConfig.defaultCo
}
def doWatching(trackId: TrackId): Unit = {
- if (trackId.isLegal) {
+ if (!K8sFlinkConfig.isV2Enabled && trackId.isLegal) {
watchController.trackIds.set(trackId)
}
}
def unWatching(trackId: TrackId): Unit = {
- watchController.canceling.set(trackId)
+ if (!K8sFlinkConfig.isV2Enabled) {
+ watchController.canceling.set(trackId)
+ }
}
override def isInWatching(trackId: TrackId): Boolean =
watchController.isInWatching(trackId)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
index 1d8a1f554..f544ae4ff 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes
+import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.flink.kubernetes.event.BuildInEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey,
FlinkMetricCV, JobStatusCV, TrackId}
@@ -27,37 +28,37 @@ import
org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV,
trait FlinkK8sWatcherLazyStartAop extends FlinkK8sWatcher {
abstract override def doWatching(trackId: TrackId): Unit = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.doWatching(trackId)
}
abstract override def unWatching(trackId: TrackId): Unit = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.unWatching(trackId)
}
abstract override def isInWatching(trackId: TrackId): Boolean = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.isInWatching(trackId)
}
abstract override def getAllWatchingIds: Set[TrackId] = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.getAllWatchingIds
}
abstract override def getJobStatus(trackId: TrackId): Option[JobStatusCV] = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.getJobStatus(trackId)
}
abstract override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey,
JobStatusCV] = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.getJobStatus(trackIds)
}
abstract override def getAllJobStatus: Map[CacheKey, JobStatusCV] = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.getAllJobStatus
}
@@ -72,12 +73,12 @@ trait FlinkK8sWatcherLazyStartAop extends FlinkK8sWatcher {
}
abstract override def checkIsInRemoteCluster(trackId: TrackId): Boolean = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.checkIsInRemoteCluster(trackId)
}
abstract override def postEvent(event: BuildInEvent, sync: Boolean): Unit = {
- start()
+ if (!K8sFlinkConfig.isV2Enabled) start()
super.postEvent(event, sync)
}
diff --git a/streampark-flink/streampark-flink-packer/pom.xml
b/streampark-flink/streampark-flink-packer/pom.xml
index a6e2a0716..d5377465a 100644
--- a/streampark-flink/streampark-flink-packer/pom.xml
+++ b/streampark-flink/streampark-flink-packer/pom.xml
@@ -48,6 +48,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-kubernetes-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- maven build tools -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
diff --git
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
index 50c235e2d..0c0e8e4b1 100644
---
a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
+++
b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java
@@ -42,7 +42,7 @@ public enum PipelineType {
/** flink native kubernetes application mode */
FLINK_NATIVE_K8S_APPLICATION(
2,
- "flink native kubernetes session mode task building pipeline",
+ "flink native kubernetes application mode task building pipeline",
ImmutableMap.<Integer, String>builder()
.put(1, "Create building workspace")
.put(2, "Export kubernetes pod template")
@@ -70,7 +70,16 @@ public enum PipelineType {
.put(2, "Resolve maven dependencies")
.put(3, "upload jar to yarn.provided.lib.dirs")
.build(),
- SimpleBuildResponse.class);
+ SimpleBuildResponse.class),
+
+ FLINK_K8S_APPLICATION_V2(
+ 5,
+ "flink kubernetes application mode task building pipeline v2",
+ ImmutableMap.<Integer, String>builder()
+ .put(1, "Create building workspace")
+ .put(2, "Build shaded flink app jar")
+ .build(),
+ K8sAppModeBuildResponse.class);
private final Integer code;
/** short description of pipeline type. */
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
index e376d561f..7efc2d95b 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala
@@ -66,3 +66,19 @@ case class DockerImageBuildResponse(
s"dockerInnerMainJarPath: $dockerInnerMainJarPath, " +
s"pass: $pass }"
}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+case class K8sAppModeBuildResponse(
+ workspacePath: String,
+ flinkBaseImage: String,
+ mainJarPath: String,
+ extraLibJarPaths: Set[String],
+ pass: Boolean = false
+) extends FlinkBuildResult {
+ override def toString: String =
+ s"{ workspacePath: $workspacePath, " +
+ s"flinkBaseImage: $flinkBaseImage, " +
+ s"mainJarPath: $mainJarPath, " +
+ s"extraLibJarPaths: ${extraLibJarPaths.mkString(",")}, " +
+ s"pass: $pass }"
+}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
new file mode 100644
index 000000000..6f59f6f02
--- /dev/null
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.packer.pipeline.impl
+
+import org.apache.streampark.common.enums.DevelopmentMode
+import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.flink.packer.maven.MavenTool
+import org.apache.streampark.flink.packer.pipeline._
+
+import scala.language.postfixOps
+
+/**
+ * Building pipeline V2(base on kubernetes operator) for flink
kubernetes-native application mode
+ */
+class FlinkK8sApplicationBuildPipelineV2(request:
FlinkK8sApplicationBuildRequest)
+ extends BuildPipeline {
+
+ override def pipeType: PipelineType = PipelineType.FLINK_K8S_APPLICATION_V2
+
+ @throws[Throwable]
+ override protected def buildProcess(): K8sAppModeBuildResponse = {
+
+ // Step-1: init build workspace of flink job
+ // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
+ val buildWorkspace =
+ execStep(1) {
+ val buildWorkspace =
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
+ LfsOperator.mkCleanDirs(buildWorkspace)
+ logInfo(s"recreate building workspace: $buildWorkspace")
+ buildWorkspace
+ }.getOrElse(throw getError.exception)
+
+ // Step-2: build shaded flink job jar and handle extra jars
+ // the output shaded jar file name like: streampark-flinkjob_myjob-test.jar
+ val (shadedJar, extJarLibs) =
+ execStep(2) {
+ val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
+ val extJarLibs = request.developmentMode match {
+ case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
+ case DevelopmentMode.CUSTOM_CODE => Set[String]()
+ }
+ val shadedJar =
+ MavenTool.buildFatJar(request.mainClass, request.providedLibs,
shadedJarOutputPath)
+ logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
+ shadedJar -> extJarLibs
+ }.getOrElse(throw getError.exception)
+
+ K8sAppModeBuildResponse(
+ workspacePath = buildWorkspace,
+ flinkBaseImage = request.flinkBaseImage,
+ mainJarPath = shadedJar.getAbsolutePath,
+ extraLibJarPaths = extJarLibs)
+ }
+
+ override protected def offerBuildParam: FlinkK8sApplicationBuildRequest =
request
+}
+
+object FlinkK8sApplicationBuildPipelineV2 {
+ def of(request: FlinkK8sApplicationBuildRequest):
FlinkK8sApplicationBuildPipelineV2 =
+ new FlinkK8sApplicationBuildPipelineV2(request)
+
+}