This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c3c6f3dd4 [Feat][Flink-K8s-V2] Adaptation of snapshot tracking for
Flink on Kubernetes (#3048)
c3c6f3dd4 is described below
commit c3c6f3dd4063a505ab926c3c209f390266695bf1
Author: Linying Assad <[email protected]>
AuthorDate: Wed Sep 13 01:12:20 2023 -0500
[Feat][Flink-K8s-V2] Adaptation of snapshot tracking for Flink on
Kubernetes (#3048)
* [Feat][flink-k8s-v2] Optimize the Flink state subscription code. #2885
* [Feat][flink-k8s-v2] Parallel group processing of Flink K8s change status
to alleviate backpressure. #2885
* [Feat][flink-k8s-v2] Optimize the persistent record update logic of Flink
job status. #2885
* [Feat][flink-k8s-v2] interrupt all subscription fibers when the bean is
destroyed . #2885
* [Feat][flink-k8s-v2] Adapt to obtain aggregated Flink metrics by teamId.
#2885
* [Feat][flink-k8s-v2] Format code #2885
* [Feat][flink-k8s-v2] Subscribe FlinkCluster status #2885
* [Feat][flink-k8s-v2] Solve the issues of cross-compiling Java and Scala
codes in streampark-service-console module. #2885
* [Feat][flink-k8s-v2] Provide tracking adaptation for FlinkCuster on K8s.
(#2885)
* [Feat][flink-k8s-v2] Add field to Application to identify K8s CR name.
(#2885)
* [Feat][flink-k8s-v2] Restore track keys from persistent storage when
FlinkK8sObserverBroker initializing. (#2885)
* [Feat][flink-k8s-v2] Watch the mapping flink job. (#2885)
* [Improve] EnvUtils minor improvement (#3040)
* Flink yarn application support pyflink maven dependency (#3042)
* [Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink
session-mode jobs on Kubernetes (#3041)
* [Feature] [Flink-K8s-V2] Refactor the lifecycle control of
Flinksession-mode jobs on Kubernetes
* add Apache license header
* [Improvement] add support for SinkRequest that has multiple statements
(#3004)
* add support for SinkRequest that has multiple statement
* Update imports of ClickHouseWriterTask.scala
* add unit test for SinkRequest
* add Apache license
---------
Co-authored-by: lenon <[email protected]>
* [Feat][flink-k8s-v2] Add field to Application to identify K8s CR name.
(#2885)
* [Feat][flink-k8s-v2] Correct the code issues at PR ##3041
* Remove zio-cache dependency.
* resolve some compile issues
---------
Co-authored-by: benjobs <[email protected]>
Co-authored-by: ChengJie1053 <[email protected]>
Co-authored-by: caicancai <[email protected]>
Co-authored-by: lenonhere <[email protected]>
Co-authored-by: lenon <[email protected]>
---
pom.xml | 12 +-
.../org/apache/streampark/common/zio/ZIOExt.scala | 12 +-
.../org/apache/streampark/common/zio/package.scala | 9 +-
.../streampark-console-service/pom.xml | 31 ++
.../main/assembly/script/schema/mysql-schema.sql | 1 +
.../main/assembly/script/schema/pgsql-schema.sql | 1 +
.../main/assembly/script/upgrade/mysql/2.2.2.sql | 23 +-
.../main/assembly/script/upgrade/pgsql/2.2.2.sql | 19 +
.../console/core/bean/AlertTemplate.java | 2 +-
.../console/core/entity/Application.java | 31 +-
.../console/core/enums/FlinkAppState.java | 2 +-
.../impl/ApplicationActionServiceImpl.java | 1 +
.../impl/ApplicationInfoServiceImpl.java | 15 +-
.../impl/ApplicationManageServiceImpl.java | 40 ++-
.../core/service/impl/FlinkClusterServiceImpl.java | 37 +-
.../console/core/task/FlinkClusterWatcher.java | 4 +-
.../core/task/FlinkK8sChangeEventListener.java | 9 +-
.../console/core/task/FlinkK8sObserverStub.java | 61 ++++
.../console/core/task/FlinkK8sWatcherWrapper.java | 2 -
.../core/utils/FlinkK8sDataTypeConverterStub.java} | 22 +-
.../src/main/resources/db/schema-h2.sql | 1 +
.../resources/mapper/core/ApplicationMapper.xml | 1 +
.../console/core/task/FlinkK8sChangeListener.scala | 231 ------------
.../console/core/task/FlinkK8sObserverBroker.scala | 389 +++++++++++++++++++++
.../core/task/FlinkK8sObserverBrokerSidecar.scala | 99 ++++++
.../core/utils/FlinkK8sDataTypeConverter.scala | 123 +++++++
.../console/core/utils/MybatisScalaExt.scala | 59 ++++
.../flink/client/bean/KubernetesSubmitParam.scala | 7 +-
.../flink/client/FlinkClientHandler.scala | 5 +-
.../impl/KubernetesApplicationClientV2.scala | 7 +-
.../client/impl/KubernetesSessionClientV2.scala | 93 ++---
.../streampark-flink-kubernetes-v2/pom.xml | 2 +-
.../flink/kubernetes/v2/model/ClusterMetrics.scala | 22 +-
.../flink/kubernetes/v2/model/JobState.scala | 11 +-
.../flink/kubernetes/v2/model/JobStatus.scala | 8 +-
.../kubernetes/v2/observer/FlinkK8sObserver.scala | 55 ++-
.../v2/observer/RawClusterObserver.scala | 19 +-
.../flink/kubernetes/v2/operator/OprError.scala | 3 -
.../impl/FlinkK8sApplicationBuildPipelineV2.scala | 3 +-
39 files changed, 1080 insertions(+), 392 deletions(-)
diff --git a/pom.xml b/pom.xml
index fef4ec416..d23981124 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,8 @@
<url>https://streampark.apache.org/</url>
<description>
- StreamPark, Make stream processing easier! easy-to-use streaming
application development framework and operation platform
+ StreamPark, Make stream processing easier! easy-to-use streaming
application development framework and operation
+ platform
</description>
<organization>
@@ -597,7 +598,9 @@
<style>GOOGLE</style>
</googleJavaFormat>
<importOrder>
-
<order>org.apache.streampark,org.apache.streampark.shaded,org.apache,,javax,java,scala,\#</order>
+ <order>
+
org.apache.streampark,org.apache.streampark.shaded,org.apache,,javax,java,scala,\#
+ </order>
</importOrder>
<replaceRegex>
<name>Remove wildcard imports</name>
@@ -614,7 +617,7 @@
<searchRegex>import\s+org\.junit\.[^jupiter][^\*\s]*(|\*);(\r\n|\r|\n)</searchRegex>
<replacement>$1</replacement>
</replaceRegex>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
<scala>
<scalafmt>
@@ -690,7 +693,8 @@
<configuration>
<shadedArtifactId>true</shadedArtifactId>
<createDependencyReducedPom>true</createDependencyReducedPom>
-
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml
+ </dependencyReducedPomLocation>
<artifactSet>
<includes>
<!--
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
index 7baa195d3..1aa2a7f40 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.common.zio
import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO}
-import zio.stream.ZStream
+import zio.stream.{UStream, ZStream}
import scala.util.Try
@@ -110,4 +110,14 @@ object ZIOExt {
.map { case (_, cur) => cur }
}
+ implicit class ZStreamOptionEffectOps[R, E, A](zstream: ZStream[R, E,
Option[A]]) {
+
+ /** Filter Some value and flatten the value */
+ @inline def filterSome: ZStream[R, E, A] =
zstream.filter(_.isDefined).map(_.get)
+ }
+
+ implicit class IterableZStreamConverter[A](iter: Iterable[A]) {
+ @inline def asZStream: UStream[A] = ZStream.fromIterable(iter)
+ }
+
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
index 673773b9e..82ff0f360 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/package.scala
@@ -18,14 +18,15 @@
package org.apache.streampark.common
import scala.language.implicitConversions
+import scala.util.Try
package object zio {
/** Similar to python's pprint, format print any type of instance. */
- @inline def toPrettyString(value: Any): String = value match {
+ @inline def toPrettyString(value: Any): String = Try(value match {
case v: String => v
case v => pprint.apply(v, height = 2000, showFieldNames = true).render
- }
+ }).getOrElse(value.toString)
implicit class PrettyStringOps(value: Any) {
@inline def prettyStr: String = toPrettyString(value)
@@ -34,4 +35,8 @@ package object zio {
/** Automatically converts value to Some value. */
implicit def liftValueAsSome[A](value: A): Option[A] = Some(value)
+ implicit class OptionTraversableOps[A](iter: Traversable[Option[A]]) {
+ def filterSome: Traversable[A] = iter.filter(_.isDefined).map(_.get)
+ }
+
}
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index 2e6156603..090b8ea76 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -588,6 +588,37 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <configuration>
+ <compileOrder>JavaThenScala</compileOrder>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${maven-spotless-plugin.version}</version>
+ <configuration>
+ <scala>
+ <scalafmt>
+ <version>${spotless.scalafmt.version}</version>
+
<file>${maven.multiModuleProjectDirectory}/.scalafmt-fp.conf</file>
+ </scalafmt>
+ </scala>
+ </configuration>
+ <executions>
+ <execution>
+ <id>spotless-check</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index 52b885c7d..08a1693a3 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -64,6 +64,7 @@ create table `t_flink_app` (
`job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
`version_id` bigint default null,
`cluster_id` varchar(45) collate utf8mb4_general_ci default null,
+ `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
`k8s_namespace` varchar(63) collate utf8mb4_general_ci default null,
`flink_image` varchar(128) collate utf8mb4_general_ci default null,
`state` int default null,
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index a9e5ffa24..eef89390c 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -212,6 +212,7 @@ create table "public"."t_flink_app" (
"job_manager_url" varchar(255) collate "pg_catalog"."default",
"version_id" int8,
"cluster_id" varchar(45) collate "pg_catalog"."default",
+ "k8s_name" varchar(63) collate "pg_catalog"."default",
"k8s_namespace" varchar(63) collate "pg_catalog"."default",
"flink_image" varchar(128) collate "pg_catalog"."default",
"state" int2,
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql
similarity index 63%
copy from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
copy to
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql
index 7ef4bb7da..9fe81443d 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql
@@ -15,20 +15,11 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.kubernetes.v2.model
+use streampark;
+
+set names utf8mb4;
+set foreign_key_checks = 0;
+
+alter table `t_flink_app`
+ add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null;
-/**
- * Flink custer metrics.
- *
- * see: [[org.apache.streampark.flink.kubernetes.model.FlinkMetricCV]]
- */
-case class ClusterMetrics(
- totalJmMemory: Integer = 0,
- totalTmMemory: Integer = 0,
- totalTm: Integer = 0,
- totalSlot: Integer = 0,
- availableSlot: Integer = 0,
- runningJob: Integer = 0,
- finishedJob: Integer = 0,
- cancelledJob: Integer = 0,
- failedJob: Integer = 0)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.2.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.2.sql
new file mode 100644
index 000000000..b535a7832
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.2.sql
@@ -0,0 +1,19 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+alter table "public"."t_flink_app"
+ add column "k8s_name" varchar(63) collate "pg_catalog"."default";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 1607ccb1b..733fa020c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -89,7 +89,7 @@ public class AlertTemplate implements Serializable {
return new AlertTemplateBuilder()
.setDuration(cluster.getStartTime(), cluster.getEndTime())
.setJobName(cluster.getClusterName())
- .setLink(ExecutionMode.YARN_SESSION, cluster.getClusterId())
+ .setLink(cluster.getExecutionModeEnum(), cluster.getClusterId())
.setStartTime(cluster.getStartTime())
.setEndTime(cluster.getEndTime())
.setType(3)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 8eb0e39ed..049af6f13 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -100,9 +100,26 @@ public class Application implements Serializable {
/** flink docker base image */
private String flinkImage;
+ /** The resource name of the flink job on k8s, equivalent to clusterId in
application mode. */
+ private String k8sName;
+
/** k8s namespace */
private String k8sNamespace = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE();
+ /** The exposed type of the rest service of
K8s(kubernetes.rest-service.exposed.type) */
+ private Integer k8sRestExposedType;
+ /** flink kubernetes pod template */
+ private String k8sPodTemplate;
+
+ private String k8sJmPodTemplate;
+ private String k8sTmPodTemplate;
+
+ private String ingressTemplate;
+ private String defaultModeIngress;
+
+ /** flink-hadoop integration on flink-k8s mode */
+ private Boolean k8sHadoopIntegration;
+
private Integer state;
/** task release status */
@TableField("`release`")
@@ -189,23 +206,9 @@ public class Application implements Serializable {
private Date modifyTime;
- /** The exposed type of the rest service of
K8s(kubernetes.rest-service.exposed.type) */
- private Integer k8sRestExposedType;
- /** flink kubernetes pod template */
- private String k8sPodTemplate;
-
- private String k8sJmPodTemplate;
- private String k8sTmPodTemplate;
-
- private String ingressTemplate;
- private String defaultModeIngress;
-
/** 1: cicd (build from csv) 2: upload (upload local jar job) */
private Integer resourceFrom;
- /** flink-hadoop integration on flink-k8s mode */
- private Boolean k8sHadoopIntegration;
-
private String tags;
/** running job */
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 87c7493f3..84bf447f3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -139,7 +139,7 @@ public enum FlinkAppState {
/**
* Type conversion bridging Deprecated, see {@link
- * org.apache.streampark.console.core.utils.FlinkAppStateConverter}
+ * org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter}
*/
@Deprecated
public static class Bridge {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 4c836469f..e9d5d6906 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -419,6 +419,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
KubernetesSubmitParam kubernetesSubmitParam =
KubernetesSubmitParam.apply(
application.getClusterId(),
+ application.getK8sName(),
application.getK8sNamespace(),
application.getFlinkImage(),
application.getK8sRestExposedTypeEnum());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index b26de031f..41930d903 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.application.impl;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.LfsOperator;
@@ -41,6 +42,7 @@ import
org.apache.streampark.console.core.service.SavePointService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.task.FlinkClusterWatcher;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
@@ -96,6 +98,8 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
@Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
+ @Autowired private FlinkK8sObserverStub flinkK8sObserver;
+
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
@@ -149,7 +153,12 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
}
// merge metrics from flink kubernetes cluster
- FlinkMetricCV k8sMetric =
k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
+ FlinkMetricCV k8sMetric;
+ if (K8sFlinkConfig.isV2Enabled()) {
+ k8sMetric = flinkK8sObserver.getAggClusterMetricCV(teamId);
+ } else {
+ k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
+ }
if (k8sMetric != null) {
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
@@ -452,7 +461,11 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
if (isKubernetesApp(application)) {
+ // todo mark
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ if (K8sFlinkConfig.isV2Enabled()) {
+ flinkK8sObserver.watchApplication(application);
+ }
} else {
FlinkHttpWatcher.doWatching(application);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 11b0f59a1..8f8ada3ac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.common.zio.ZIOJavaUtil;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -55,8 +54,9 @@ import
org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
+import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
-import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.commons.lang3.StringUtils;
@@ -127,6 +127,10 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
@Autowired private ResourceService resourceService;
+ @Autowired private FlinkK8sObserverStub flinkK8sObserver;
+
+ @Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -182,7 +186,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
if (K8sFlinkConfig.isV2Enabled()) {
- ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId()));
+ flinkK8sObserver.unWatchById(application.getId());
}
} else {
FlinkHttpWatcher.unWatching(appParam.getId());
@@ -314,6 +318,18 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new
File(jarPath)));
}
+ if (shouldHandleK8sName(appParam)) {
+ switch (appParam.getExecutionModeEnum()) {
+ case KUBERNETES_NATIVE_APPLICATION:
+ appParam.setK8sName(appParam.getClusterId());
+ break;
+ case KUBERNETES_NATIVE_SESSION:
+ appParam.setK8sName(
+
flinkK8sDataTypeConverter.genSessionJobK8sCRName(appParam.getClusterId()));
+ break;
+ }
+ }
+
if (save(appParam)) {
if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = new FlinkSql(appParam);
@@ -328,6 +344,10 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
}
+ private boolean shouldHandleK8sName(Application app) {
+ return K8sFlinkConfig.isV2Enabled() &&
ExecutionMode.isKubernetesMode(app.getExecutionMode());
+ }
+
private boolean existsByJobName(String jobName) {
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getJobName,
jobName));
@@ -528,6 +548,20 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
break;
}
+ if (shouldHandleK8sName(appParam)) {
+ switch (appParam.getExecutionModeEnum()) {
+ case KUBERNETES_NATIVE_APPLICATION:
+ application.setK8sName(appParam.getClusterId());
+ break;
+ case KUBERNETES_NATIVE_SESSION:
+ if (!Objects.equals(application.getClusterId(),
appParam.getClusterId())) {
+ application.setK8sName(
+
flinkK8sDataTypeConverter.genSessionJobK8sCRName(appParam.getClusterId()));
+ }
+ break;
+ }
+ }
+
// Flink Sql job...
if (application.isFlinkSqlJob()) {
updateFlinkSqlJob(application, appParam);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 3bbe93d94..cf6221cb7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
@@ -32,6 +33,7 @@ import
org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.task.FlinkClusterWatcher;
+import org.apache.streampark.console.core.task.FlinkK8sObserverStub;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.DeployRequest;
import org.apache.streampark.flink.client.bean.DeployResponse;
@@ -96,6 +98,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
+ @Autowired private FlinkK8sObserverStub flinkK8sObserver;
+
@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
@@ -159,6 +163,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
FlinkClusterWatcher.addWatching(flinkCluster);
}
+ if (shouldWatchForK8s(flinkCluster)) {
+ flinkK8sObserver.watchFlinkCluster(flinkCluster);
+ }
return ret;
}
@@ -186,6 +193,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setEndTime(null);
updateById(flinkCluster);
FlinkClusterWatcher.addWatching(flinkCluster);
+ if (shouldWatchForK8s(flinkCluster)) {
+ flinkK8sObserver.watchFlinkCluster(flinkCluster);
+ }
} catch (Exception e) {
log.error(e.getMessage(), e);
flinkCluster.setClusterState(ClusterState.FAILED.getValue());
@@ -225,6 +235,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
}
+ if (shouldWatchForK8s(flinkCluster)) {
+ flinkK8sObserver.watchFlinkCluster(flinkCluster);
+ }
updateById(flinkCluster);
}
@@ -260,11 +273,17 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
checkActiveIfNeeded(flinkCluster);
// 3) check job if running on cluster
- boolean existsRunningJob =
- applicationInfoService.existsRunningByClusterId(flinkCluster.getId());
- ApiAlertException.throwIfTrue(
- existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
-
+ if (shouldWatchForK8s(cluster)) {
+ boolean existActiveJobs =
flinkK8sObserver.existActiveJobsOnFlinkCluster(flinkCluster);
+ ApiAlertException.throwIfTrue(
+ existActiveJobs,
+ "Due to the presence of active jobs on the cluster, the cluster
should not be shutdown");
+ } else {
+ boolean existsRunningJob =
+
applicationInfoService.existsRunningByClusterId(flinkCluster.getId());
+ ApiAlertException.throwIfTrue(
+ existsRunningJob, "Some app is running on this cluster, the cluster
cannot be shutdown");
+ }
return true;
}
@@ -334,6 +353,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Flink cluster is running, cannot be delete, please check.");
}
+ if (shouldWatchForK8s(flinkCluster)) {
+ flinkK8sObserver.unwatchFlinkCluster(flinkCluster);
+ }
ApiAlertException.throwIfTrue(
applicationInfoService.existsByClusterId(id),
@@ -454,4 +476,9 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
return null;
}
+
+ private boolean shouldWatchForK8s(FlinkCluster flinkCluster) {
+ return K8sFlinkConfig.isV2Enabled()
+ &&
ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode());
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 5924aa815..200a548a8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -100,6 +100,7 @@ public class FlinkClusterWatcher {
flinkClusterService.list(
new LambdaQueryWrapper<FlinkCluster>()
.eq(FlinkCluster::getClusterState,
ClusterState.RUNNING.getValue())
+ // excluding flink clusters on kubernetes
.notIn(FlinkCluster::getExecutionMode,
ExecutionMode.getKubernetesMode()));
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(),
cluster));
}
@@ -264,7 +265,8 @@ public class FlinkClusterWatcher {
* @param flinkCluster
*/
public static void addWatching(FlinkCluster flinkCluster) {
- if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
+ if (!ExecutionMode.isKubernetesMode(flinkCluster.getExecutionModeEnum())
+ && !WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
log.info("add the cluster with id:{} to watcher cluster cache",
flinkCluster.getId());
WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index 5da80337b..550a9a41e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -54,7 +54,14 @@ import scala.Enumeration;
import static
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.fromK8sFlinkJobState;
import static
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8sFlinkJobState;
-/** Event Listener for K8sFlinkTrackMonitor */
+/**
+ * Event Listener for K8sFlinkTrackMonitor。
+ *
+ * <p>Use FlinkK8sChangeListenerV2 listeners instead:
+ *
+ * @link org.apache.streampark.console.core.task.FlinkK8sChangeListenerV2
+ */
+@Deprecated
@Component
public class FlinkK8sChangeEventListener {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sObserverStub.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sObserverStub.java
new file mode 100644
index 000000000..ffc282026
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sObserverStub.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
+import org.apache.streampark.flink.kubernetes.v2.model.ClusterMetrics;
+
+/**
+ * Stub for exposing methods of FlinkK8sObserver used to adapt the Java code
invocation on
+ * streampark-console-service.
+ */
+public interface FlinkK8sObserverStub {
+
+ /** Stub method: Get aggregated metrics of all flink jobs on k8s cluster by
team-id */
+ ClusterMetrics getAggClusterMetric(Long teamId);
+
+ /** Stub method: Compatible with old code of flink-k8s-v1. */
+ FlinkMetricCV getAggClusterMetricCV(Long teamId);
+
+ /** Stub method: Whether there are active jobs on the Flink cluster. */
+ boolean existActiveJobsOnFlinkCluster(FlinkCluster flinkCluster);
+
+ /**
+ * Stub method: Add FlinkCluster to the watchlist. Normally, after a
successful deployment of a
+ * Flink Cluster, the relevant resources would be self-tracked
+ */
+ void watchFlinkCluster(FlinkCluster flinkCluster);
+
+ /**
+ * Stub method: Add Application to the watchlist. Normally, after a
successful deployment of a
+ * Flink App, the relevant resources would be self-tracked
+ */
+ void watchApplication(Application app);
+
+ /**
+ * Stub method: Notify FlinkK8sObserver to remove FlinkCluster from the
watchlist. When there are
+ * associated SessionJobs with FlinkCluster, the tracking of FlinkCluster
will not be removed in
+ * reality.
+ */
+ void unwatchFlinkCluster(FlinkCluster flinkCluster);
+
+ /** Stub method: Notify FlinkK8sObserver to remove tracking resources by
TrackKey.id. */
+ void unWatchById(Long id);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 32f7a2391..00901367e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -89,8 +89,6 @@ public class FlinkK8sWatcherWrapper {
// recovery tracking list
List<TrackId> k8sApp = getK8sWatchingApps();
k8sApp.forEach(trackMonitor::doWatching);
- } else {
- // TODO [flink-k8s-v2] Recovery tracking list and invoke
FlinkK8sObserver.track()
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
similarity index 50%
rename from
streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
index fc9449a97..c9d9bf8a8 100644
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
@@ -15,24 +15,10 @@
* limitations under the License.
*/
-package org.apache.streampark.console.core.utils
+package org.apache.streampark.console.core.utils;
-import org.apache.streampark.console.core.enums.FlinkAppState
-import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState
-import
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
-
-import scala.util.Try
-
-object FlinkAppStateConverter {
-
- /** Convert [[EvalJobState]] to [[FlinkAppState]]. */
- def k8sEvalJobStateToFlinkAppState(jobState: EvalJobState): FlinkAppState = {
-
Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER)
- }
-
- /** Convert [[FlinkAppState]] to [[EvalJobState]]. */
- def flinkAppStateToK8sEvalJobState(jobState: FlinkAppState): EvalJobState = {
- EvalJobState.values.find(e => e.toString ==
jobState.toString).getOrElse(EvalJobState.OTHER)
- }
+public interface FlinkK8sDataTypeConverterStub {
+ /** Create default name for Flink SessionJob CR for k8s-native
compatibility. */
+ String genSessionJobK8sCRName(String clusterId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 16b6503f1..f6b88d4b4 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -57,6 +57,7 @@ create table if not exists `t_flink_app` (
`job_manager_url` varchar(255) default null,
`version_id` bigint default null,
`cluster_id` varchar(45) default null,
+ `k8s_name` varchar(63) default null,
`k8s_namespace` varchar(63) default null,
`flink_image` varchar(128) default null,
`state` int default null,
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 1e4cdb585..045a8f850 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -32,6 +32,7 @@
<result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
<result column="flink_cluster_id" jdbcType="BIGINT"
property="flinkClusterId"/>
<result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
+ <result column="k8s_name" jdbcType="VARCHAR" property="k8sName"/>
<result column="k8s_namespace" jdbcType="VARCHAR"
property="k8sNamespace"/>
<result column="app_type" jdbcType="INTEGER" property="appType"/>
<result column="job_type" jdbcType="INTEGER" property="jobType"/>
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
deleted file mode 100644
index f4e00effe..000000000
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.task
-
-import
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
RefMapExtension}
-import org.apache.streampark.common.zio.ZIOExt.IOOps
-import org.apache.streampark.console.core.bean.AlertTemplate
-import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
-import org.apache.streampark.console.core.enums.{FlinkAppState, OptionState}
-import org.apache.streampark.console.core.service.FlinkClusterService
-import org.apache.streampark.console.core.service.alert.AlertService
-import
org.apache.streampark.console.core.service.application.{ApplicationInfoService,
ApplicationManageService}
-import org.apache.streampark.console.core.utils.FlinkAppStateConverter
-import org.apache.streampark.flink.kubernetes.v2.model._
-import org.apache.streampark.flink.kubernetes.v2.observer.{FlinkK8sObserver,
Name, Namespace}
-import
org.apache.streampark.flink.kubernetes.v2.operator.OprError.TrackKeyNotFound
-
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.context.annotation.Lazy
-import org.springframework.stereotype.Component
-import zio.{UIO, ZIO}
-import zio.ZIO.logError
-import zio.ZIOAspect.annotated
-
-import java.util.Date
-
-@Component
-class FlinkK8sChangeListener {
- @Lazy @Autowired
- private var applicationManageService: ApplicationManageService = _
-
- @Lazy
- @Autowired
- private var applicationInfoService: ApplicationInfoService = _
-
- @Lazy @Autowired
- private var flinkClusterService: FlinkClusterService = _
- @Lazy @Autowired
- private var alertService: AlertService = _
-
- subscribeJobStatusChange.forkDaemon.runIO
- subscribeMetricsChange.forkDaemon.runIO
- subscribeRestSvcEndpointChange.forkDaemon.runIO
-
- private val alterStateList =
- Array(
- FlinkAppState.FAILED,
- FlinkAppState.LOST,
- FlinkAppState.RESTARTING,
- FlinkAppState.FINISHED)
-
- def subscribeJobStatusChange: UIO[Unit] = {
- FlinkK8sObserver.evaluatedJobSnaps
- .flatSubscribeValues()
- // Get Application records and convert JobSnapshot to Application
- .mapZIO {
- jobSnap =>
- ZIO
- .attemptBlocking {
- Option(applicationManageService.getById(jobSnap.appId))
- .map(app => setByJobStatusCV(app, jobSnap))
- }
- .catchAll {
- err =>
- logError(s"Fail to get Application records: ${err.getMessage}")
- .as(None) @@ annotated("appId" -> jobSnap.appId.toString)
- }
- }
- .filter(_.nonEmpty)
- .map(_.get)
- // Save Application records
- .tap {
- app =>
- ZIO
- .attemptBlocking(applicationInfoService.persistMetrics(app))
- .retryN(3)
- .tapError(err => logError(s"Fail to persist Application status:
${err.getMessage}"))
- .ignore @@ annotated("appId" -> app.getAppId)
- }
- // Alert for unhealthy states in parallel
- .mapZIOPar(10) {
- app =>
- val state = FlinkAppState.of(app.getState)
- ZIO
- .attemptBlocking(alertService.alert(app.getAlertId(),
AlertTemplate.of(app, state)))
- .when(alterStateList.contains(state))
- .retryN(3)
- .tapError(
- err => logError(s"Fail to alter unhealthy application state:
${err.getMessage}"))
- .ignore @@ annotated("appId" -> app.getAppId, "state" ->
state.toString)
- }
- .runDrain
- }
-
- def subscribeMetricsChange: UIO[Unit] = {
- FlinkK8sObserver.clusterMetricsSnaps
- .flatSubscribe()
- .mapZIO {
- metricsSnap =>
- ZIO
- .attemptBlocking {
- val namespaceAndName: (Namespace, Name) = metricsSnap._1
- val trackKey: ZIO[Any, TrackKeyNotFound, TrackKey] =
FlinkK8sObserver.trackedKeys
- .find(
- trackedKey =>
- trackedKey.clusterNamespace == namespaceAndName._1 &&
trackedKey.clusterName == namespaceAndName._2)
- .someOrFail(TrackKeyNotFound(namespaceAndName._1,
namespaceAndName._2))
-
- Option(applicationManageService.getById(trackKey.map(_.id)),
metricsSnap._2)
- }
- .catchAll {
- err =>
- logError(s"Fail to get Application records: ${err.getMessage}")
- .as(None) @@ annotated("name" -> metricsSnap._1._2)
- }
- }
- .filter(_.nonEmpty)
- .map(_.get)
- .tap {
- appAndMetrics =>
- ZIO
- .attemptBlocking {
- val app: Application = appAndMetrics._1
- val clusterMetrics: ClusterMetrics = appAndMetrics._2
- app.setJmMemory(clusterMetrics.totalJmMemory)
- app.setTmMemory(clusterMetrics.totalTmMemory)
- app.setTotalTM(clusterMetrics.totalTm)
- app.setTotalSlot(clusterMetrics.totalSlot)
- app.setAvailableSlot(clusterMetrics.availableSlot)
- applicationInfoService.persistMetrics(app)
- }
- .retryN(3)
- .tapError(err => logError(s"Fail to persist Application Metrics:
${err.getMessage}"))
- .ignore @@ annotated("appId" -> appAndMetrics._1.getAppId)
- }
- .runDrain
- }
-
- def subscribeRestSvcEndpointChange: UIO[Unit] = {
- FlinkK8sObserver.restSvcEndpointSnaps
- .flatSubscribe()
- .foreach {
- restSvcEndpointSnap =>
- ZIO
- .attempt {
-
- val namespaceAndName: (Namespace, Name) = restSvcEndpointSnap._1
- val trackKey: ZIO[Any, TrackKeyNotFound, TrackKey] =
FlinkK8sObserver.trackedKeys
- .find(
- trackedKey =>
- trackedKey.clusterNamespace == namespaceAndName._1 &&
trackedKey.clusterName == namespaceAndName._2)
- .someOrFail(TrackKeyNotFound(namespaceAndName._1,
namespaceAndName._2))
- val restSvcEndpoint: RestSvcEndpoint = restSvcEndpointSnap._2
-
- val app: Application =
applicationManageService.getById(trackKey.map(_.id))
-
- val flinkCluster: FlinkCluster =
flinkClusterService.getById(app.getFlinkClusterId)
-
- if (restSvcEndpoint == null || restSvcEndpoint.ipRest == null)
return ZIO.unit
- val url = restSvcEndpoint.ipRest
- app.setFlinkRestUrl(url)
- applicationInfoService.persistMetrics(app)
-
- flinkCluster.setAddress(url)
- flinkClusterService.update(flinkCluster)
-
- }
- .retryN(3)
- .ignore
- }
- }
-
- private def setByJobStatusCV(app: Application, jobSnapshot: JobSnapshot):
Application = { // infer the final flink job state
- val state: FlinkAppState =
-
FlinkAppStateConverter.k8sEvalJobStateToFlinkAppState(jobSnapshot.evalState)
- val jobStatusOption: Option[JobStatus] = jobSnapshot.jobStatus
-
- if (jobStatusOption.nonEmpty) {
- val jobStatus: JobStatus = jobStatusOption.get
- // corrective start-time / end-time / duration
- val preStartTime: Long =
- if (app.getStartTime != null) app.getStartTime.getTime
- else 0
-
- val startTime: Long = Math.max(jobStatus.startTs, preStartTime)
- val preEndTime: Long =
- if (app.getEndTime != null) app.getEndTime.getTime
- else 0
- var endTime: Long = Math.max(jobStatus.endTs.getOrElse(-1L), preEndTime)
- var duration: Long = if (app.getDuration != null) app.getDuration else 0
- if (FlinkAppState.isEndState(state.getValue)) {
- if (endTime < startTime) endTime = System.currentTimeMillis
- if (duration <= 0) duration = endTime - startTime
- }
- app.setJobId(jobStatus.jobId)
- val totalTask = if (jobStatus.tasks.nonEmpty) jobStatus.tasks.get.total
else 0
- app.setTotalTask(totalTask)
- app.setStartTime(
- new Date(
- if (startTime > 0) startTime
- else 0))
- app.setEndTime(
- if (endTime > 0 && endTime >= startTime) new Date(endTime)
- else null)
- app.setDuration(
- if (duration > 0) duration
- else 0)
- }
-
- app.setState(state.getValue)
- // when a flink job status change event can be received, it means
- // that the operation command sent by streampark has been completed.
- app.setOptionState(OptionState.NONE.getValue)
- app
- }
-}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
new file mode 100644
index 000000000..66251914c
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task
+
+import org.apache.streampark.common.conf.K8sFlinkConfig
+import org.apache.streampark.common.enums.{ClusterState, ExecutionMode}
+import org.apache.streampark.common.zio.{OptionTraversableOps, PrettyStringOps}
+import
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
RefMapExtension}
+import org.apache.streampark.common.zio.ZIOExt.{IterableZStreamConverter,
OptionZIOOps, UIOOps, ZStreamOptionEffectOps}
+import org.apache.streampark.console.core.bean.AlertTemplate
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.enums.{FlinkAppState, OptionState}
+import org.apache.streampark.console.core.service.FlinkClusterService
+import org.apache.streampark.console.core.service.alert.AlertService
+import
org.apache.streampark.console.core.service.application.ApplicationInfoService
+import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter
+import
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.{applicationToTrackKey,
clusterMetricsToFlinkMetricCV, flinkClusterToClusterKey,
k8sDeployStateToClusterState}
+import org.apache.streampark.console.core.utils.MybatisScalaExt.{lambdaQuery,
lambdaUpdate, LambdaQueryOps, LambdaUpdateOps}
+import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
+import org.apache.streampark.flink.kubernetes.v2.model._
+import
org.apache.streampark.flink.kubernetes.v2.model.TrackKey.{ApplicationJobKey,
ClusterKey}
+import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
+import
org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserverSnapSubscriptionHelper.{ClusterMetricsSnapsSubscriptionOps,
DeployCRSnapsSubscriptionOps, RestSvcEndpointSnapsSubscriptionOps}
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Component
+import zio.{Fiber, Ref, UIO, ZIO}
+import zio.ZIO.{interruptible, logInfo}
+import zio.ZIOAspect.annotated
+import zio.stream.UStream
+
+import javax.annotation.{PostConstruct, PreDestroy}
+
+import java.lang
+import java.util.Date
+
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
+/** Broker of FlinkK8sObserver which is the observer for Flink on Kubernetes */
+@Component
+class FlinkK8sObserverBroker @Autowired() (
+ var applicationInfoService: ApplicationInfoService,
+ var flinkClusterService: FlinkClusterService,
+ var alertService: AlertService)
+ extends FlinkK8sObserverStub
+ with FlinkK8sObserverBrokerSidecar { it =>
+
+ private val observer = FlinkK8sObserver
+
+ private val alertJobStateList: Array[FlinkAppState] = Array(
+ FlinkAppState.FAILED,
+ FlinkAppState.LOST,
+ FlinkAppState.RESTARTING,
+ FlinkAppState.FINISHED
+ )
+
+ private val alertClusterStateList = Array(
+ ClusterState.FAILED,
+ ClusterState.UNKNOWN,
+ ClusterState.LOST,
+ ClusterState.KILLED
+ )
+
+ private lazy val allDaemonEffects: Array[UIO[Unit]] = Array(
+ subscribeJobStatusChange,
+ subscribeApplicationJobMetricsChange,
+ subscribeApplicationJobRestSvcEndpointChange,
+ subscribeGlobalClusterMetricChange,
+ subscribeClusterRestSvcEndpointChange,
+ subscribeClusterStateChange
+ )
+
+ // All fibers running on the daemon。
+ private val daemonFibers = Ref.make(Array.empty[Fiber.Runtime[Nothing,
Unit]]).runUIO
+ // Aggregated flink cluster metrics by teamId
+ private val aggFlinkMetric = Ref.make(Map.empty[Long, ClusterMetrics]).runUIO
+
+ @PostConstruct
+ def init(): Unit = {
+ val effect: UIO[Unit] = for {
+ // launch all subscription fibers
+ fibers <-
ZIO.foreach(allDaemonEffects)(interruptible(_).forkDaemon)
+ // async restore track keys from persistent storage
+ restoreFiber <- restoreTrackKeyRecords.forkDaemon
+ _ <- daemonFibers.set(fibers :+ restoreFiber)
+ _ <- logInfo("Launch FlinkK8sChangeListenerV2.")
+ } yield ()
+ effect.when(K8sFlinkConfig.isV2Enabled).runUIO
+ }
+
+ @PreDestroy
+ def destroy(): Unit = {
+ daemonFibers.get.flatMap(fibers => ZIO.foreach(fibers)(_.interrupt)).runUIO
+ }
+
+ /** Restore track list from persistent storage into FlinkK8sObserver. */
+ private def restoreTrackKeyRecords: UIO[Unit] = {
+
+ val fromApplicationRecords: UIO[Unit] = it
+ .safeFindApplication(
+ lambdaQuery[Application].typedIn(_.getExecutionMode,
ExecutionMode.getKubernetesMode.asScala)
+ )(10)
+ .map(apps => apps.map(app =>
applicationToTrackKey(app)).filterSome.toVector)
+ .tap(keys => logInfo(s"Restore Flink K8s track-keys from Application
records:\n${keys.prettyStr}"))
+ .flatMap(keys => ZIO.foreachDiscard(keys)(observer.track))
+
+ val fromFlinkClusterRecords: UIO[Unit] = it
+ .safeFindFlinkClusterRecord(
+ lambdaQuery[FlinkCluster].typedEq(_.getExecutionMode,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getMode)
+ )(10)
+ .map(clusters => clusters.map(e => TrackKey.cluster(e.getId,
e.getK8sNamespace, e.getClusterId)))
+ .tap(keys => logInfo(s"Restore Flink K8s track-keys from FlinkCluster
records:\n${keys.prettyStr}"))
+ .flatMap(keys => ZIO.foreachDiscard(keys)(observer.track))
+
+ fromApplicationRecords <&> fromFlinkClusterRecords
+ }
+
+ /** Subscribe Flink job status change from FlinkK8sObserver */
+ private def subscribeJobStatusChange: UIO[Unit] = {
+
+ def process(subStream: UStream[JobSnapshot]): UStream[Unit] = subStream
+ // Convert EvalJobState to FlinkAppState
+ .map(snap => snap ->
FlinkK8sDataTypeConverter.k8sEvalJobStateToFlinkAppState(snap.evalState))
+ // Update the corresponding columns of Application record
+ .tap { case (snap: JobSnapshot, convertedState: FlinkAppState) =>
+ safeUpdateApplicationRecord(snap.appId) {
+
+ var update = lambdaUpdate[Application]
+ .typedSet(_.getState, convertedState.getValue)
+ .typedSet(_.getOptions, OptionState.NONE.getValue)
+ // update JobStatus related columns
+ snap.jobStatus.foreach { status =>
+ update = update
+ .typedSet(_.getJobId, status.jobId)
+ .typedSet(_.getStartTime, new Date(status.startTs))
+ .typedSet(_.getEndTime, status.endTs.map(new Date(_)).orNull)
+ .typedSet(_.getDuration, status.duration)
+ .typedSet(_.getTotalTask, status.tasks.map(_.total).getOrElse(0))
+ }
+ // Copy the logic from
resources/mapper/core/ApplicationMapper.xml:persistMetrics
+ if (FlinkAppState.isEndState(convertedState.getValue)) {
+ update = update
+ .typedSet(_.getTotalTM, null)
+ .typedSet(_.getTotalSlot, null)
+ .typedSet(_.getTotalSlot, null)
+ .typedSet(_.getAvailableSlot, null)
+ .typedSet(_.getJmMemory, null)
+ .typedSet(_.getTmMemory, null)
+ }
+ update
+ }
+ }
+ // Alert for unhealthy job states in parallel
+ .filter { case (_, state) => alertJobStateList.contains(state) }
+ .mapZIOPar(5) { case (snap, state) =>
+ safeGetApplicationRecord(snap.appId).someOrUnitZIO { app =>
+ ZIO
+ .attemptBlocking(alertService.alert(app.getAlertId,
AlertTemplate.of(app, state)))
+ .retryN(3)
+ .tapError(err => logInfo(s"Fail to alter unhealthy state:
${err.getMessage}"))
+ .ignore @@
+ annotated("appId" -> app.getId.toString, "state" -> state.toString)
+ }
+ }
+
+ observer.evaluatedJobSnaps
+ .flatSubscribeValues()
+ // Handle events grouped by appId in parallel while each appId group
would be executed in serial.
+ .groupByKey(_.appId) { case (_, substream) => process(substream) }
+ .runDrain
+ }
+
+ /** Subscribe Flink application job metrics change from FlinkK8sObserver */
+ private def subscribeApplicationJobMetricsChange: UIO[Unit] = {
+ observer.clusterMetricsSnaps
+ .flatSubscribe()
+ // Combine with the corresponding ApplicationJobKey
+ .combineWithTypedTrackKey[ApplicationJobKey]
+ .filterSome
+ .groupByKey(_._1.id) { case (_, substream) =>
+ // Update metrics info of the corresponding Application record
+ substream.mapZIO { case (trackKey: ApplicationJobKey, metrics:
ClusterMetrics) =>
+ safeUpdateApplicationRecord(trackKey.id)(
+ lambdaUpdate[Application]
+ .typedSet(_.getJmMemory, metrics.totalJmMemory)
+ .typedSet(_.getTmMemory, metrics.totalTmMemory)
+ .typedSet(_.getTotalTM, metrics.totalTm)
+ .typedSet(_.getTotalSlot, metrics.totalSlot)
+ .typedSet(_.getAvailableSlot, metrics.availableSlot))
+ }
+ }
+ .runDrain
+ }
+
+ /** Subscribe Flink cluster status change from FlinkK8sObserver */
+ private def subscribeClusterStateChange: UIO[Unit] = {
+
+ def process(substream: UStream[(ClusterKey, (DeployCRStatus,
Option[JobStatus]))]): UStream[Unit] = {
+ substream
+ // Convert K8s CR status to ClusterState
+ .map { case (key, (crStatus, _)) =>
+ (key.id, k8sDeployStateToClusterState(crStatus), crStatus.error)
+ }
+ // Update the corresponding FlinkCluster record
+ .tap { case (id, state, error) =>
+ safeUpdateFlinkClusterRecord(id)(
+ lambdaUpdate[FlinkCluster]
+ .typedSet(_.getClusterState, state.getValue)
+ .typedSet(error.isDefined, _.getException, error.get))
+ }
+ // Alter for unhealthy state in parallel
+ .filter { case (_, state, _) => alertClusterStateList.contains(state) }
+ .mapZIOPar(5) { case (id, state, _) =>
+ safeGetFlinkClusterRecord(id).someOrUnitZIO { flinkCluster =>
+ ZIO
+ .attemptBlocking(alertService
+ .alert(flinkCluster.getAlertId, AlertTemplate.of(flinkCluster,
state)))
+ .retryN(5)
+ .tapError(err => logInfo(s"Fail to alter unhealthy state:
${err.getMessage}"))
+ .ignore @@
+ annotated("FlinkCluster.id" -> id.toString, "state" ->
state.toString)
+ }
+ }
+ }
+
+ observer.deployCRSnaps
+ .flatSubscribe()
+ // Combine with the corresponding ClusterKey
+ .combineWithTypedTrackKey[ClusterKey]
+ .filterSome
+ // Handle events grouped by id in parallel while each group would be
executed in serial.
+ .groupByKey(_._1) { case (_, substream) => process(substream) }
+ .runDrain
+ }
+
+ /** Subscribe K8s rest endpoint of Flink Application mode job from
FlinkK8sObserver. */
+ private def subscribeApplicationJobRestSvcEndpointChange: UIO[Unit] = {
+ observer.restSvcEndpointSnaps
+ .flatSubscribe()
+ // Combine with the corresponding ApplicationJobKey
+ .combineWithTypedTrackKey[ApplicationJobKey]
+ .filterSome
+ .groupByKey(_._1.id) { case (_, substream) =>
+ // Update jobManagerUrl of the corresponding Application record
+ substream.mapZIO { case (key: ApplicationJobKey, endpoint:
RestSvcEndpoint) =>
+
safeUpdateApplicationRecord(key.id)(lambdaUpdate[Application].typedSet(_.getJobManagerUrl,
endpoint.ipRest))
+ }
+ }
+ .runDrain
+ }
+
+ /** Subscribe K8s rest endpoint of Flink cluster from FlinkK8sObserver. */
+ private def subscribeClusterRestSvcEndpointChange: UIO[Unit] = {
+ observer.restSvcEndpointSnaps
+ .flatSubscribe()
+ // Combine with the corresponding ClusterKey
+ .combineWithTypedTrackKey[ClusterKey]
+ .filterSome
+ .groupByKey(_._1) { case (_, substream) =>
+ substream.mapZIO { case (key: ClusterKey, endpoint: RestSvcEndpoint) =>
+ safeUpdateFlinkClusterRecord(key.id)(
+ lambdaUpdate[FlinkCluster]
+ .typedSet(_.getAddress, endpoint.ipRest)
+ .typedSet(_.getJobManagerUrl, endpoint.ipRest))
+ }
+ }
+ .runDrain
+ }
+
+ /** Subscribe Flink cluster metrics change from FlinkK8sObserver and
aggregate it by teamId */
+ private def subscribeGlobalClusterMetricChange: UIO[Unit] = {
+ observer.clusterMetricsSnaps
+ .subscribe()
+ .mapZIO { metricItems =>
+ for {
+ // Combine with appIds
+ trackKeys <- FlinkK8sObserver.trackedKeys.toSet
+ metricWithAppIds = metricItems.map { case ((ns, name), metric) =>
+ metric ->
+ trackKeys.filter(k => k.clusterNamespace ==
ns && k.clusterName == name).map(_.id)
+ }
+ // Combine with teamId from persistent application records in
parallel
+ itemIdWithMetrics <- metricWithAppIds.asZStream
+ .flatMapPar(10) { case (metric, appIds) =>
+ appIds.asZStream
+ .mapZIOParUnordered(10)(appId =>
safeGetApplicationRecord(appId))
+ .map(_.flatMap(app =>
Option(app.getTeamId)))
+ .filterSome
+ .map(teamId => teamId.toLong -> metric)
+ }
+ .runCollect
+ // Groups ClusterMetrics by teamId and aggregate for each grouping
+ aggMetricsMap = itemIdWithMetrics
+ .groupBy { case (itemId, _) => itemId }
+ .map { case (itemId, metric) =>
+ itemId ->
metric.map(_._2).foldLeft(ClusterMetrics.empty)((acc, e) => acc + e)
+ }
+ // Update aggFlinkMetric cache
+ _ <- aggFlinkMetric.set(aggMetricsMap)
+ } yield ()
+ }
+ .runDrain
+ }
+
+ /** Stub method: Get aggregated metrics of all flink jobs on k8s cluster by
team-id */
+ override def getAggClusterMetric(teamId: lang.Long): ClusterMetrics = {
+ for {
+ metrics <- aggFlinkMetric.get
+ result = metrics.get(teamId)
+ } yield result.getOrElse(ClusterMetrics.empty)
+ }.runUIO
+
+ override def getAggClusterMetricCV(teamId: lang.Long): FlinkMetricCV = {
+ clusterMetricsToFlinkMetricCV(getAggClusterMetric(teamId))
+ }
+
+ /** Stub method: Add Application to the watchlist. */
+ override def watchApplication(app: Application): Unit = {
+ ZIO
+ .succeed(applicationToTrackKey(app))
+ .someOrUnitZIO { key =>
+ observer.track(key) *>
+ logInfo("Add Application into k8s observer tracking list") @@
+ annotated("jobId" -> app.getId.toString)
+ }
+ .runUIO
+ }
+
+ /** Stub method: Add FlinkCluster to the watchlist. */
+ override def watchFlinkCluster(flinkCluster: FlinkCluster): Unit = {
+ ZIO
+ .succeed(flinkClusterToClusterKey(flinkCluster))
+ .someOrUnitZIO { trackKey =>
+ observer.track(trackKey) *>
+ logInfo("Add FlinkCluster into k8s observer tracking list") @@
+ annotated("id" -> flinkCluster.getId.toString)
+ }
+ .runUIO
+ }
+
+ /**
+ * Stub method: Remove FlinkCluster from the watchlist. When there are
associated SessionJobs with
+ * FlinkCluster, the tracking of FlinkCluster will not be removed in reality.
+ */
+ override def unwatchFlinkCluster(flinkCluster: FlinkCluster): Unit = {
+ ZIO
+ .succeed(flinkClusterToClusterKey(flinkCluster))
+ .someOrUnitZIO(trackKey => FlinkK8sObserver.untrack(trackKey))
+ .runUIO
+ }
+
+ /** Stub method: Notify FlinkK8sObserver to remove tracking resources by
TrackKey.id. */
+ override def unWatchById(id: lang.Long): Unit = {
+ observer.trackedKeys
+ .find(_.id == id)
+ .someOrUnitZIO(key => observer.untrack(key))
+ }
+
+ /** Stub method: Whether there are active jobs on the Flink cluster. */
+ override def existActiveJobsOnFlinkCluster(flinkCluster: FlinkCluster):
Boolean = {
+ if (flinkCluster.getClusterId == null) false
+ else {
+ observer.clusterJobStatusSnaps
+ .get(flinkCluster.getClusterId,
Option(flinkCluster.getK8sNamespace).getOrElse("default"))
+ .map {
+ case None => false
+ case Some(statusList) =>
+ statusList.exists(status =>
JobState.activeStates.contains(status.state))
+ }
+ .runUIO
+ }
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
new file mode 100644
index 000000000..69125acd8
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task
+
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.service.FlinkClusterService
+import
org.apache.streampark.console.core.service.application.ApplicationInfoService
+import org.apache.streampark.console.core.utils.MybatisScalaExt.LambdaUpdateOps
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
+import zio.{UIO, ZIO}
+import zio.ZIO.logError
+import zio.ZIOAspect.annotated
+
+import scala.jdk.CollectionConverters._
+
+trait FlinkK8sObserverBrokerSidecar {
+
+ def applicationInfoService: ApplicationInfoService
+ def flinkClusterService: FlinkClusterService
+
+ // Get Application record by appId from persistent storage.
+ protected def safeGetApplicationRecord(appId: Long):
UIO[Option[Application]] = {
+ ZIO
+ .attemptBlocking(Option(applicationInfoService.getById(appId)))
+ .retryN(2)
+ .catchAll(err => logError(s"Fail to get Application record:
${err.getMessage}").as(None))
+ } @@ annotated("appId" -> appId.toString)
+
+ // Update Application record by appId into persistent storage.
+ protected def safeUpdateApplicationRecord(appId: Long)(update:
LambdaUpdateWrapper[Application]): UIO[Unit] = {
+ ZIO
+ .attemptBlocking(applicationInfoService.update(null,
update.typedEq(_.getId, appId)))
+ .retryN(2)
+ .tapError(err => logError(s"Fail to update Application record:
${err.getMessage}"))
+ .ignore
+ } @@ annotated("appId" -> appId.toString)
+
+ // Get FlinkCluster record by appId from persistent storage.
+ protected def safeGetFlinkClusterRecord(id: Long): UIO[Option[FlinkCluster]]
= {
+ ZIO
+ .attemptBlocking(Option(flinkClusterService.getById(id)))
+ .retryN(3)
+ .catchAll(err => logError(s"Fail to get FlinkCluster record:
${err.getMessage}").as(None))
+ } @@ annotated("id" -> id.toString)
+
+ // Update FlinkCluster record by id into persistent storage.
+ protected def safeUpdateFlinkClusterRecord(id: Long)(update:
LambdaUpdateWrapper[FlinkCluster]): UIO[Unit] = {
+ ZIO
+ .attemptBlocking(flinkClusterService.update(null,
update.typedEq(_.getId, id)))
+ .retryN(3)
+ .tapError(err => logError(s"Fail to update FlinkCluster record:
${err.getMessage}"))
+ .ignore
+ } @@ annotated("id" -> id.toString)
+
+ // Find Application record.
+ protected def safeFindApplication(query:
LambdaQueryWrapper[Application])(retryN: Int): UIO[Vector[Application]] = {
+ ZIO
+ .attemptBlocking {
+ val result = applicationInfoService.getBaseMapper.selectList(query)
+ if (result == null) Vector.empty[Application] else
result.asScala.toVector
+ }
+ .retryN(retryN)
+ .catchAll { err =>
+ logError(s"Fail to list Application records:
${err.getMessage}").as(Vector.empty[Application])
+ }
+ }
+
+ // Find Application record.
+ protected def safeFindFlinkClusterRecord(query:
LambdaQueryWrapper[FlinkCluster])(
+ retryN: Int): UIO[Vector[FlinkCluster]] = {
+ ZIO
+ .attemptBlocking {
+ val result = flinkClusterService.getBaseMapper.selectList(query)
+ if (result == null) Vector.empty[FlinkCluster] else
result.asScala.toVector
+ }
+ .retryN(retryN)
+ .catchAll { err =>
+ logError(s"Fail to list Application records:
${err.getMessage}").as(Vector.empty[FlinkCluster])
+ }
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
new file mode 100644
index 000000000..0813b5373
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.utils
+
+import org.apache.streampark.common.enums.{ClusterState, ExecutionMode}
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.enums.FlinkAppState
+import
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.genSessionJobCRName
+import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
+import org.apache.streampark.flink.kubernetes.v2.model._
+import
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
+import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
+
+import org.apache.commons.lang3.StringUtils
+import org.springframework.stereotype.Component
+
+import java.util.UUID
+
+import scala.util.Try
+
+@Component
+class FlinkK8sDataTypeConverter() extends FlinkK8sDataTypeConverterStub {
+ override def genSessionJobK8sCRName(clusterId: String): String =
genSessionJobCRName(clusterId)
+}
+
+object FlinkK8sDataTypeConverter {
+
+ /** Create default name for Flink SessionJob CR for k8s-native
compatibility. */
+ def genSessionJobCRName(clusterId: String): String = {
+ s"$clusterId-${UUID.randomUUID().toString.replace("-", "").take(8)}"
+ }
+
+ /** Convert [[EvalJobState]] to [[FlinkAppState]]. */
+ def k8sEvalJobStateToFlinkAppState(jobState: EvalJobState): FlinkAppState = {
+
Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER)
+ }
+
+ /** Convert [[DeployCRStatus]] to [[ClusterState]]. */
+ def k8sDeployStateToClusterState(crState: DeployCRStatus): ClusterState = {
+ crState.evalState match {
+ case EvalState.DEPLOYING => ClusterState.STARTING
+ case EvalState.READY => ClusterState.RUNNING
+ case EvalState.SUSPENDED => ClusterState.CANCELED
+ case EvalState.FAILED => ClusterState.FAILED
+ case EvalState.DELETED => ClusterState.KILLED
+ case _ => ClusterState.UNKNOWN
+ }
+ }
+
+ /** Convert [[ClusterMetrics]] to [[FlinkMetricCV]]. */
+ def clusterMetricsToFlinkMetricCV(metrics: ClusterMetrics): FlinkMetricCV = {
+ FlinkMetricCV(
+ totalJmMemory = metrics.totalJmMemory,
+ totalTmMemory = metrics.totalTmMemory,
+ totalTm = metrics.totalTm,
+ totalSlot = metrics.totalTm,
+ availableSlot = metrics.availableSlot,
+ runningJob = metrics.runningJob,
+ finishedJob = metrics.finishedJob,
+ cancelledJob = metrics.cancelledJob,
+ failedJob = metrics.failedJob,
+ pollAckTime = 0L
+ )
+ }
+
+ /** Convert [[FlinkCluster]] to [[ClusterKey]]. */
+ def flinkClusterToClusterKey(flinkCluster: FlinkCluster): Option[ClusterKey]
= {
+ val isLegal = {
+ flinkCluster != null &&
+ flinkCluster.getId != null &&
+ ExecutionMode.isKubernetesSessionMode(flinkCluster.getExecutionMode) &&
+ StringUtils.isNoneBlank(flinkCluster.getClusterId) &&
+ StringUtils.isNoneBlank(flinkCluster.getK8sNamespace)
+ }
+ if (isLegal) Some(ClusterKey(flinkCluster.getId,
flinkCluster.getK8sNamespace, flinkCluster.getClusterId))
+ else None
+ }
+
+ /** Convert [[Application]] to [[TrackKey]]. */
+ def applicationToTrackKey(app: Application): Option[TrackKey] = {
+ import ExecutionMode._
+
+ val isLegal = {
+ app != null &&
+ app.getId != null &&
+ ExecutionMode.isKubernetesSessionMode(app.getExecutionMode) &&
+ StringUtils.isNoneBlank(app.getClusterId) &&
+ StringUtils.isNoneBlank(app.getK8sNamespace)
+ }
+
+ if (isLegal) None
+ else
+ app.getExecutionModeEnum match {
+ case KUBERNETES_NATIVE_APPLICATION => Some(TrackKey.appJob(app.getId,
app.getK8sNamespace, app.getClusterId))
+ case KUBERNETES_NATIVE_SESSION =>
+ Option(app.getK8sName) match {
+ case Some(name) => Some(TrackKey.sessionJob(app.getId,
app.getK8sNamespace, name, app.getClusterId))
+ case None =>
+ Option(app.getJobId) match {
+ case Some(jid) =>
+ Some(TrackKey.unmanagedSessionJob(app.getId,
app.getK8sNamespace, app.getClusterId, jid))
+ case None => None
+ }
+ }
+ }
+ }
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
new file mode 100644
index 000000000..08a103b32
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.utils
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction
+
+import scala.jdk.CollectionConverters._
+import scala.language.implicitConversions
+
+/** MyBatis scala extension. */
+object MybatisScalaExt {
+
+ def lambdaQuery[E]: LambdaQueryWrapper[E] = new LambdaQueryWrapper[E]()
+ def lambdaUpdate[E]: LambdaUpdateWrapper[E] = new LambdaUpdateWrapper[E]()
+
+ // noinspection DuplicatedCode
+ implicit class LambdaQueryOps[E](wrapper: LambdaQueryWrapper[E]) {
+ def typedIn[V](column: E => V, values: Iterable[V]): LambdaQueryWrapper[E]
= {
+ wrapper.in(new SFunction[E, V] { override def apply(t: E): V = column(t)
}, values.asJavaCollection)
+ }
+
+ def typedEq[V](column: E => V, value: V): LambdaQueryWrapper[E] = {
+ wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t)
}, value)
+ }
+ }
+
+ // noinspection DuplicatedCode
+ implicit class LambdaUpdateOps[E](wrapper: LambdaUpdateWrapper[E]) {
+ def typedSet[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
+ wrapper.set((e: E) => column(e), value)
+ }
+
+ def typedSet[V](cond: Boolean, column: E => V, value: V):
LambdaUpdateWrapper[E] = {
+ wrapper.set(cond, new SFunction[E, V] { override def apply(t: E): V =
column(t) }, value)
+ }
+
+ def typedEq[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
+ wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t)
}, value)
+ }
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
index 07832841c..f959b0c32 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
@@ -33,10 +33,10 @@ import java.util.{Map => JMap}
* The logic of conversion is located at:
*
[[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]]
*/
-// todo split into Application mode and SessionJob mode
case class KubernetesSubmitParam(
clusterId: String,
kubernetesNamespace: String,
+ kubernetesName: Option[String],
baseImage: Option[String] = None,
imagePullPolicy: Option[String] = None,
serviceAccount: Option[String] = None,
@@ -67,12 +67,15 @@ object KubernetesSubmitParam {
*/
def apply(
clusterId: String,
+ kubernetesName: String,
kubernetesNamespace: String,
baseImage: String,
@Nullable flinkRestExposedType: FlinkK8sRestExposedType):
KubernetesSubmitParam =
KubernetesSubmitParam(
clusterId = clusterId,
kubernetesNamespace = kubernetesNamespace,
+ kubernetesName = Option(kubernetesName),
baseImage = Some(baseImage),
- flinkRestExposedType = Option(flinkRestExposedType))
+ flinkRestExposedType = Option(flinkRestExposedType)
+ )
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index 3da974d7b..bdd1f1f2a 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -80,7 +80,10 @@ object FlinkClientHandler {
def shutdown(request: ShutDownRequest): ShutDownResponse = {
request.executionMode match {
case YARN_SESSION => YarnSessionClient.shutdown(request)
- case KUBERNETES_NATIVE_SESSION =>
KubernetesNativeSessionClient.shutdown(request)
+ case KUBERNETES_NATIVE_SESSION => {
+ if (K8sFlinkConfig.isV2Enabled)
KubernetesSessionClientV2.shutdown(request)
+ else KubernetesNativeSessionClient.shutdown(request)
+ }
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ${request.executionMode} shutdown cluster ")
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index 130dcd17e..20f8d6aa5 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -39,7 +39,7 @@ import scala.util.{Failure, Success, Try}
/** Flink K8s application mode task operation client via Flink K8s Operator */
object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with
Logger {
- @throws[Exception]
+ @throws[Throwable]
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
@@ -88,9 +88,10 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
.getOrElse("default")
- val name = Option(submitReq.k8sSubmitParam.clusterId)
+ val name = submitReq.k8sSubmitParam.kubernetesName
+ .orElse(Option(submitReq.k8sSubmitParam.clusterId))
.filter(str => StringUtils.isNotBlank(str))
- .getOrElse(return Left("cluster-id should not be empty"))
+ .getOrElse(return Left("Kubernetes CR name should not be empty"))
val image = submitReq.k8sSubmitParam.baseImage
.orElse(Option(buildResult.flinkBaseImage))
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index d5a9f51db..0b0807a55 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -20,22 +20,21 @@ import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.zio.ZIOExt.IOOps
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.v2.model.{FlinkSessionJobDef,
JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.model.FlinkSessionJobDef
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
-import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
+import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration._
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
-import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Success}
+/** Flink K8s session mode app operation client via Flink K8s Operator */
object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {
- @throws[Exception]
+ @throws[Throwable]
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
@@ -43,7 +42,7 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
val richMsg: String => String =
s"[flink-submit][appId=${submitRequest.id}] " + _
submitRequest.checkBuildResult()
- val buildResult =
submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse]
+ val buildResult =
submitRequest.buildResult.asInstanceOf[ShadedBuildResponse]
// Convert to FlinkSessionJobDef CR definition
val flinkSessionJobDef = genFlinkSessionJobDef(submitRequest, flinkConfig,
buildResult) match {
@@ -76,82 +75,62 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
private def genFlinkSessionJobDef(
submitReq: SubmitRequest,
originFlinkConfig: Configuration,
- buildResult: K8sAppModeBuildResponse): Either[FailureMessage,
FlinkSessionJobDef] = {
+ buildResult: ShadedBuildResponse): Either[FailureMessage,
FlinkSessionJobDef] = {
val flinkConfObj = originFlinkConfig.clone()
val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
.getOrElse("default")
- val name = Option(submitReq.k8sSubmitParam.clusterId)
+ val name = submitReq.k8sSubmitParam.kubernetesName
.filter(str => StringUtils.isNotBlank(str))
- .getOrElse(return Left("cluster-id should not be empty"))
+ .getOrElse(return Left("Kubernetes CR name should not be empty"))
- val deploymentName = Option(submitReq.developmentMode.name())
+ val deploymentName = Option(submitReq.k8sSubmitParam.clusterId)
.filter(str => StringUtils.isNotBlank(str))
- .getOrElse(return Left("deploymentName should not be empty"))
+ .getOrElse(return Left("Target FlinkDeployment CR name should not be
empty"))
- val jobDef = genJobDef(flinkConfObj, jarUriHint =
Some(buildResult.mainJarPath))
+ val jobDef = genJobDef(flinkConfObj, jarUriHint =
Some(buildResult.shadedJarPath))
.getOrElse(return Left("Invalid job definition"))
- val extraFlinkConfiguration = {
- // Remove conflicting configuration items
- val result: mutable.Map[String, String] = flinkConfObj
- .remove(DeploymentOptions.TARGET)
- .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
- .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
- .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
- .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
- .remove(PipelineOptions.JARS)
- .remove(CoreOptions.DEFAULT_PARALLELISM)
- .remove(ApplicationConfiguration.APPLICATION_ARGS)
- .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
- .remove(SavepointConfigOptions.SAVEPOINT_PATH)
- .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
- .toMap
- .asScala
- .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
- .removeKey(KUBERNETES_TM_CPU_KEY)
- .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
- .removeKey(KUBERNETES_JM_CPU_KEY)
- // Set kubernetes.rest-service.exposed.type configuration for
compatibility with native-k8s
- submitReq.k8sSubmitParam.flinkRestExposedType.foreach {
- exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY ->
exposedType.getName
- }
- result.toMap
- }
+ // Remove conflicting configuration items
+ val extraFlinkConfiguration = flinkConfObj
+ .remove(DeploymentOptions.TARGET)
+ .remove(PipelineOptions.JARS)
+ .remove(CoreOptions.DEFAULT_PARALLELISM)
+ .remove(ApplicationConfiguration.APPLICATION_ARGS)
+ .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+ .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+ .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+ .toMap
+ .asScala
+ .toMap
- // TODO Migrate the construction logic of ingress to here and set it into
FlinkDeploymentDef.ingress
- // See:
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
Step-8
Right(
FlinkSessionJobDef(
namespace = namespace,
name = name,
deploymentName = deploymentName,
flinkConfiguration = extraFlinkConfiguration,
- job = jobDef,
- restartNonce = None
+ job = jobDef
))
}
+ /** Shutdown Flink cluster. */
+ @throws[Throwable]
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
- def richMsg: String => String =
s"[flink-delete-][appId=${shutDownRequest.clusterId}] " + _
-
- FlinkK8sOperator.k8sCrOpr
- .deleteSessionJob(
- shutDownRequest.kubernetesDeployParam.kubernetesNamespace,
- shutDownRequest.clusterId)
- .runIOAsTry match {
- case Success(rsp) =>
- logInfo(richMsg("Delete flink job successfully."))
- rsp
+ val name = shutDownRequest.clusterId
+ val namespace = shutDownRequest.kubernetesDeployParam.kubernetesNamespace
+ def richMsg: String => String =
s"[flink-shutdown][clusterId=$name][namespace=$namespace] " + _
+
+ FlinkK8sOperator.k8sCrOpr.deleteSessionJob(namespace, name).runIOAsTry
match {
+ case Success(_) =>
+ logInfo(richMsg("Shutdown Flink cluster successfully."))
+ ShutDownResponse()
case Failure(err) =>
- logError(
- richMsg(s"delete flink job fail in
${shutDownRequest.executionMode.getName}_V2 mode!"),
- err)
+ logError(richMsg(s"Fail to shutdown Flink cluster"), err)
throw err
}
- ShutDownResponse()
}
}
diff --git a/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
b/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
index cb9db9502..50a822401 100644
--- a/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
+++ b/streampark-flink/streampark-flink-kubernetes-v2/pom.xml
@@ -47,7 +47,7 @@
<scala>
<scalafmt>
<version>${spotless.scalafmt.version}</version>
- <file>.scalafmt-fp.conf</file>
+
<file>${maven.multiModuleProjectDirectory}/.scalafmt-fp.conf</file>
</scalafmt>
</scala>
</configuration>
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
index 7ef4bb7da..326504f4f 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/ClusterMetrics.scala
@@ -31,4 +31,24 @@ case class ClusterMetrics(
runningJob: Integer = 0,
finishedJob: Integer = 0,
cancelledJob: Integer = 0,
- failedJob: Integer = 0)
+ failedJob: Integer = 0) {
+
+ lazy val totalJob: Integer = runningJob + finishedJob + cancelledJob +
failedJob
+
+ def +(another: ClusterMetrics): ClusterMetrics =
+ ClusterMetrics(
+ totalJmMemory = another.totalJmMemory + totalJmMemory,
+ totalTmMemory = another.totalTmMemory + totalTmMemory,
+ totalTm = another.totalTm + totalTm,
+ totalSlot = another.totalSlot + totalSlot,
+ availableSlot = another.availableSlot + availableSlot,
+ runningJob = another.runningJob + runningJob,
+ finishedJob = another.finishedJob + finishedJob,
+ cancelledJob = another.cancelledJob + cancelledJob,
+ failedJob = another.failedJob + failedJob
+ )
+}
+
+object ClusterMetrics {
+ lazy val empty: ClusterMetrics = ClusterMetrics()
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
index 300be154d..8fc355a8e 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
@@ -32,7 +32,11 @@ object JobState extends Enumeration {
val UNKNOWN = Value
def valueOf(raw: String): JobState = values.find(_.toString ==
raw).getOrElse(UNKNOWN)
- val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING,
RECONCILING)
+
+ val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING, RECONCILING)
+
+ lazy val activeStates =
+ Set(INITIALIZING, CREATED, RUNNING, FAILING, CANCELLING, RESTARTING,
RECONCILING)
}
/**
@@ -50,11 +54,6 @@ object EvalJobState extends Enumeration {
// copy from [[org.apache.streampark.console.core.enums.FlinkAppState]]
val LOST, TERMINATED, OTHER = Value
- // ending flink states, the tracking monitor will stop tracking these states
of flink job.
- val endingStates = Seq(FAILED, CANCELED, FINISHED, TERMINATED, LOST)
-
- val effectEndStates = endingStates.filter(_ != LOST)
-
def of(state: JobState): EvalJobState = values.find(e => e.toString ==
state.toString).getOrElse(OTHER)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
index 54e5b04e3..5dce775df 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala
@@ -43,7 +43,13 @@ case class JobStatus(
startTs: Long,
endTs: Option[Long] = None,
tasks: Option[TaskStats] = None,
- updatedTs: Long)
+ updatedTs: Long) {
+
+ lazy val duration: Long = {
+ if (startTs <= 0L) 0L
+ else endTs.getOrElse(System.currentTimeMillis()) - startTs
+ }
+}
object JobStatus {
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
index 7f8496839..299062cee 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
@@ -26,7 +26,7 @@ import org.apache.flink.v1beta1.{FlinkDeployment,
FlinkDeploymentSpec, FlinkSess
import zio.{IO, Ref, Schedule, UIO, ZIO}
import zio.ZIO.logInfo
import zio.concurrent.{ConcurrentMap, ConcurrentSet}
-import zio.stream.ZStream
+import zio.stream.{UStream, ZStream}
/** Flink Kubernetes resource observer. */
sealed trait FlinkK8sObserverTrait {
@@ -37,14 +37,6 @@ sealed trait FlinkK8sObserverTrait {
/** Stop tracking resources. */
def untrack(key: TrackKey): UIO[Unit]
- /** Stop tracking resources by TrackKey.id. */
- def untrackById(appId: Long): UIO[Unit] = {
- trackedKeys.find(_.id == appId).flatMap {
- case Some(key) => untrack(key)
- case None => ZIO.unit
- }
- }
-
/** All tracked key in observer. */
def trackedKeys: ConcurrentSet[TrackKey]
@@ -289,3 +281,48 @@ object FlinkK8sObserver extends FlinkK8sObserverTrait {
}
}
+
+object FlinkK8sObserverSnapSubscriptionHelper {
+
+ implicit class ClusterMetricsSnapsSubscriptionOps(stream:
UStream[((Namespace, Name), ClusterMetrics)]) {
+ def combineWithTrackKey: UStream[Option[(TrackKey, ClusterMetrics)]] =
+ combineValueWithTrackKey[ClusterMetrics](stream)
+
+ def combineWithTypedTrackKey[Key <: TrackKey]: UStream[Option[(Key,
ClusterMetrics)]] =
+ combineValueWithTypedTrackKey[Key, ClusterMetrics](stream)
+ }
+
+ implicit class RestSvcEndpointSnapsSubscriptionOps(stream:
UStream[((Namespace, Name), RestSvcEndpoint)]) {
+ def combineWithTrackKey: UStream[Option[(TrackKey, RestSvcEndpoint)]] =
+ combineValueWithTrackKey[RestSvcEndpoint](stream)
+
+ def combineWithTypedTrackKey[Key <: TrackKey]: UStream[Option[(Key,
RestSvcEndpoint)]] =
+ combineValueWithTypedTrackKey[Key, RestSvcEndpoint](stream)
+ }
+
+ implicit class DeployCRSnapsSubscriptionOps(
+ stream: UStream[((Namespace, Name), (DeployCRStatus,
Option[JobStatus]))]) {
+ def combineWithTrackKey: UStream[Option[(TrackKey, (DeployCRStatus,
Option[JobStatus]))]] =
+ combineValueWithTrackKey[(DeployCRStatus, Option[JobStatus])](stream)
+
+ def combineWithTypedTrackKey[Key <: TrackKey]: UStream[Option[(Key,
(DeployCRStatus, Option[JobStatus]))]] =
+ combineValueWithTypedTrackKey[Key, (DeployCRStatus,
Option[JobStatus])](stream)
+ }
+
+ private[this] def combineValueWithTrackKey[Value](
+ stream: UStream[((Namespace, Name), Value)]): UStream[Option[(TrackKey,
Value)]] = stream.mapZIO {
+ case ((namespace, name), value) =>
+ FlinkK8sObserver.trackedKeys
+ .find(key => key.clusterNamespace == namespace && key.clusterName ==
name)
+ .map(key => key.map(_ -> value))
+ }
+
+ private[this] def combineValueWithTypedTrackKey[Key <: TrackKey, Value](
+ stream: UStream[((Namespace, Name), Value)]): UStream[Option[(Key,
Value)]] = stream.mapZIO {
+ case ((namespace, name), value) =>
+ FlinkK8sObserver.trackedKeys
+ .find(key => key.isInstanceOf[Key] && key.clusterNamespace ==
namespace && key.clusterName == name)
+ .map(key => key.map(_.asInstanceOf[Key] -> value))
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
index d4a7e12b7..7045d3bed 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
@@ -108,15 +108,22 @@ case class RawClusterObserver(
.map(e => Try(e.toInt).getOrElse(0))
.getOrElse(0)
+ val totalTm = Option(clusterOv.taskManagers).getOrElse(0)
+ val totalSlot = Option(clusterOv.slotsTotal).getOrElse(0)
+ val availableSlot = Option(clusterOv.slotsAvailable).getOrElse(0)
+ val runningJob = Option(clusterOv.jobsRunning).getOrElse(0)
+ val cancelledJob = Option(clusterOv.jobsFinished).getOrElse(0)
+ val failedJob = Option(clusterOv.jobsFailed).getOrElse(0)
+
ClusterMetrics(
totalJmMemory = totalJmMemory,
totalTmMemory = totalTmMemory,
- totalTm = clusterOv.taskManagers,
- totalSlot = clusterOv.slotsTotal,
- availableSlot = clusterOv.slotsAvailable,
- runningJob = clusterOv.jobsRunning,
- cancelledJob = clusterOv.jobsFinished,
- failedJob = clusterOv.jobsFailed
+ totalTm = totalTm,
+ totalSlot = totalSlot,
+ availableSlot = availableSlot,
+ runningJob = runningJob,
+ cancelledJob = cancelledJob,
+ failedJob = failedJob
)
}
.tap(metrics => clusterMetricsSnaps.put((namespace, name), metrics))
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index 9d832e676..5225905f4 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -28,9 +28,6 @@ object OprError {
case class FlinkRestEndpointNotFound(namespace: String, name: String)
extends Exception(s"Flink cluster rest endpoint not found:
namespace=$namespace, name=$name")
- case class TrackKeyNotFound(namespace: String, name: String)
- extends Exception(s"TrackKey not found: namespace=$namespace, name=$name")
-
case class FlinkDeploymentCRDNotFound()
extends Exception("The FlinkDeployment CRD is not currently deployed in
the kubernetes cluster")
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
index 6f59f6f02..ba9b9c6bc 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
@@ -52,7 +52,8 @@ class FlinkK8sApplicationBuildPipelineV2(request:
FlinkK8sApplicationBuildReques
val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
val extJarLibs = request.developmentMode match {
case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs
- case DevelopmentMode.CUSTOM_CODE => Set[String]()
+ case DevelopmentMode.CUSTOM_CODE => Set.empty[String]
+ case _ => Set.empty[String]
}
val shadedJar =
MavenTool.buildFatJar(request.mainClass, request.providedLibs,
shadedJarOutputPath)