This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new dd4061d72 [Feature] [Flink-K8s-V2] Refactor the state tracking of 
Flink on Kubernetes (#2989)
dd4061d72 is described below

commit dd4061d7214a42143f3265e967fde50f295a0d4a
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Sep 11 13:53:52 2023 +0800

    [Feature] [Flink-K8s-V2] Refactor the state tracking of Flink on Kubernetes 
(#2989)
    
    * Refactor the state tracking of Flink on Kubernetes
    
    * Refactor the state tracking of Flink on Kubernetes
    
    * Modify subscribeJobStatusChange
    
    * Modify subscribeMetricsChange
    
    * Merge dev
---
 .../console/core/task/FlinkK8sChangeListener.scala | 231 +++++++++++++++++++++
 .../core/utils/FlinkAppStateConverter.scala        |   6 +
 .../flink/kubernetes/v2/model/JobState.scala       |   5 +
 .../flink/kubernetes/v2/operator/OprError.scala    |   3 +
 4 files changed, 245 insertions(+)

diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
new file mode 100644
index 000000000..f4e00effe
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sChangeListener.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task
+
+import 
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
 RefMapExtension}
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.console.core.bean.AlertTemplate
+import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
+import org.apache.streampark.console.core.enums.{FlinkAppState, OptionState}
+import org.apache.streampark.console.core.service.FlinkClusterService
+import org.apache.streampark.console.core.service.alert.AlertService
+import 
org.apache.streampark.console.core.service.application.{ApplicationInfoService, 
ApplicationManageService}
+import org.apache.streampark.console.core.utils.FlinkAppStateConverter
+import org.apache.streampark.flink.kubernetes.v2.model._
+import org.apache.streampark.flink.kubernetes.v2.observer.{FlinkK8sObserver, 
Name, Namespace}
+import 
org.apache.streampark.flink.kubernetes.v2.operator.OprError.TrackKeyNotFound
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.context.annotation.Lazy
+import org.springframework.stereotype.Component
+import zio.{UIO, ZIO}
+import zio.ZIO.logError
+import zio.ZIOAspect.annotated
+
+import java.util.Date
+
+@Component
+class FlinkK8sChangeListener {
+  @Lazy @Autowired
+  private var applicationManageService: ApplicationManageService = _
+
+  @Lazy
+  @Autowired
+  private var applicationInfoService: ApplicationInfoService = _
+
+  @Lazy @Autowired
+  private var flinkClusterService: FlinkClusterService = _
+  @Lazy @Autowired
+  private var alertService: AlertService = _
+
+  subscribeJobStatusChange.forkDaemon.runIO
+  subscribeMetricsChange.forkDaemon.runIO
+  subscribeRestSvcEndpointChange.forkDaemon.runIO
+
+  private val alterStateList =
+    Array(
+      FlinkAppState.FAILED,
+      FlinkAppState.LOST,
+      FlinkAppState.RESTARTING,
+      FlinkAppState.FINISHED)
+
+  def subscribeJobStatusChange: UIO[Unit] = {
+    FlinkK8sObserver.evaluatedJobSnaps
+      .flatSubscribeValues()
+      // Get Application records and convert JobSnapshot to Application
+      .mapZIO {
+        jobSnap =>
+          ZIO
+            .attemptBlocking {
+              Option(applicationManageService.getById(jobSnap.appId))
+                .map(app => setByJobStatusCV(app, jobSnap))
+            }
+            .catchAll {
+              err =>
+                logError(s"Fail to get Application records: ${err.getMessage}")
+                  .as(None) @@ annotated("appId" -> jobSnap.appId.toString)
+            }
+      }
+      .filter(_.nonEmpty)
+      .map(_.get)
+      // Save Application records
+      .tap {
+        app =>
+          ZIO
+            .attemptBlocking(applicationInfoService.persistMetrics(app))
+            .retryN(3)
+            .tapError(err => logError(s"Fail to persist Application status: 
${err.getMessage}"))
+            .ignore @@ annotated("appId" -> app.getAppId)
+      }
+      // Alert for unhealthy states in parallel
+      .mapZIOPar(10) {
+        app =>
+          val state = FlinkAppState.of(app.getState)
+          ZIO
+            .attemptBlocking(alertService.alert(app.getAlertId(), 
AlertTemplate.of(app, state)))
+            .when(alterStateList.contains(state))
+            .retryN(3)
+            .tapError(
+              err => logError(s"Fail to alter unhealthy application state: 
${err.getMessage}"))
+            .ignore @@ annotated("appId" -> app.getAppId, "state" -> 
state.toString)
+      }
+      .runDrain
+  }
+
+  def subscribeMetricsChange: UIO[Unit] = {
+    FlinkK8sObserver.clusterMetricsSnaps
+      .flatSubscribe()
+      .mapZIO {
+        metricsSnap =>
+          ZIO
+            .attemptBlocking {
+              val namespaceAndName: (Namespace, Name) = metricsSnap._1
+              val trackKey: ZIO[Any, TrackKeyNotFound, TrackKey] = 
FlinkK8sObserver.trackedKeys
+                .find(
+                  trackedKey =>
+                    trackedKey.clusterNamespace == namespaceAndName._1 && 
trackedKey.clusterName == namespaceAndName._2)
+                .someOrFail(TrackKeyNotFound(namespaceAndName._1, 
namespaceAndName._2))
+
+              Option(applicationManageService.getById(trackKey.map(_.id)), 
metricsSnap._2)
+            }
+            .catchAll {
+              err =>
+                logError(s"Fail to get Application records: ${err.getMessage}")
+                  .as(None) @@ annotated("name" -> metricsSnap._1._2)
+            }
+      }
+      .filter(_.nonEmpty)
+      .map(_.get)
+      .tap {
+        appAndMetrics =>
+          ZIO
+            .attemptBlocking {
+              val app: Application = appAndMetrics._1
+              val clusterMetrics: ClusterMetrics = appAndMetrics._2
+              app.setJmMemory(clusterMetrics.totalJmMemory)
+              app.setTmMemory(clusterMetrics.totalTmMemory)
+              app.setTotalTM(clusterMetrics.totalTm)
+              app.setTotalSlot(clusterMetrics.totalSlot)
+              app.setAvailableSlot(clusterMetrics.availableSlot)
+              applicationInfoService.persistMetrics(app)
+            }
+            .retryN(3)
+            .tapError(err => logError(s"Fail to persist Application Metrics: 
${err.getMessage}"))
+            .ignore @@ annotated("appId" -> appAndMetrics._1.getAppId)
+      }
+      .runDrain
+  }
+
+  def subscribeRestSvcEndpointChange: UIO[Unit] = {
+    FlinkK8sObserver.restSvcEndpointSnaps
+      .flatSubscribe()
+      .foreach {
+        restSvcEndpointSnap =>
+          ZIO
+            .attempt {
+
+              val namespaceAndName: (Namespace, Name) = restSvcEndpointSnap._1
+              val trackKey: ZIO[Any, TrackKeyNotFound, TrackKey] = 
FlinkK8sObserver.trackedKeys
+                .find(
+                  trackedKey =>
+                    trackedKey.clusterNamespace == namespaceAndName._1 && 
trackedKey.clusterName == namespaceAndName._2)
+                .someOrFail(TrackKeyNotFound(namespaceAndName._1, 
namespaceAndName._2))
+              val restSvcEndpoint: RestSvcEndpoint = restSvcEndpointSnap._2
+
+              val app: Application = 
applicationManageService.getById(trackKey.map(_.id))
+
+              val flinkCluster: FlinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId)
+
+              if (restSvcEndpoint == null || restSvcEndpoint.ipRest == null) 
return ZIO.unit
+              val url = restSvcEndpoint.ipRest
+              app.setFlinkRestUrl(url)
+              applicationInfoService.persistMetrics(app)
+
+              flinkCluster.setAddress(url)
+              flinkClusterService.update(flinkCluster)
+
+            }
+            .retryN(3)
+            .ignore
+      }
+  }
+
+  private def setByJobStatusCV(app: Application, jobSnapshot: JobSnapshot): 
Application = { // infer the final flink job state
+    val state: FlinkAppState =
+      
FlinkAppStateConverter.k8sEvalJobStateToFlinkAppState(jobSnapshot.evalState)
+    val jobStatusOption: Option[JobStatus] = jobSnapshot.jobStatus
+
+    if (jobStatusOption.nonEmpty) {
+      val jobStatus: JobStatus = jobStatusOption.get
+      // corrective start-time / end-time / duration
+      val preStartTime: Long =
+        if (app.getStartTime != null) app.getStartTime.getTime
+        else 0
+
+      val startTime: Long = Math.max(jobStatus.startTs, preStartTime)
+      val preEndTime: Long =
+        if (app.getEndTime != null) app.getEndTime.getTime
+        else 0
+      var endTime: Long = Math.max(jobStatus.endTs.getOrElse(-1L), preEndTime)
+      var duration: Long = if (app.getDuration != null) app.getDuration else 0
+      if (FlinkAppState.isEndState(state.getValue)) {
+        if (endTime < startTime) endTime = System.currentTimeMillis
+        if (duration <= 0) duration = endTime - startTime
+      }
+      app.setJobId(jobStatus.jobId)
+      val totalTask = if (jobStatus.tasks.nonEmpty) jobStatus.tasks.get.total 
else 0
+      app.setTotalTask(totalTask)
+      app.setStartTime(
+        new Date(
+          if (startTime > 0) startTime
+          else 0))
+      app.setEndTime(
+        if (endTime > 0 && endTime >= startTime) new Date(endTime)
+        else null)
+      app.setDuration(
+        if (duration > 0) duration
+        else 0)
+    }
+
+    app.setState(state.getValue)
+    // when a flink job status change event can be received, it means
+    // that the operation command sent by streampark has been completed.
+    app.setOptionState(OptionState.NONE.getValue)
+    app
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
index 93c3cfc6f..fc9449a97 100644
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.utils
 
 import org.apache.streampark.console.core.enums.FlinkAppState
+import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState
 import 
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
 
 import scala.util.Try
@@ -29,4 +30,9 @@ object FlinkAppStateConverter {
     
Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER)
   }
 
+  /** Convert [[FlinkAppState]] to [[EvalJobState]]. */
+  def flinkAppStateToK8sEvalJobState(jobState: FlinkAppState): EvalJobState = {
+    EvalJobState.values.find(e => e.toString == 
jobState.toString).getOrElse(EvalJobState.OTHER)
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
index 9463cf093..300be154d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala
@@ -50,6 +50,11 @@ object EvalJobState extends Enumeration {
   // copy from [[org.apache.streampark.console.core.enums.FlinkAppState]]
   val LOST, TERMINATED, OTHER = Value
 
+  // ending flink states, the tracking monitor will stop tracking these states 
of flink job.
+  val endingStates = Seq(FAILED, CANCELED, FINISHED, TERMINATED, LOST)
+
+  val effectEndStates = endingStates.filter(_ != LOST)
+
   def of(state: JobState): EvalJobState = values.find(e => e.toString == 
state.toString).getOrElse(OTHER)
 
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index 5225905f4..9d832e676 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -28,6 +28,9 @@ object OprError {
   case class FlinkRestEndpointNotFound(namespace: String, name: String)
     extends Exception(s"Flink cluster rest endpoint not found: 
namespace=$namespace, name=$name")
 
+  case class TrackKeyNotFound(namespace: String, name: String)
+    extends Exception(s"TrackKey not found: namespace=$namespace, name=$name")
+
   case class FlinkDeploymentCRDNotFound()
     extends Exception("The FlinkDeployment CRD is not currently deployed in 
the kubernetes cluster")
 }

Reply via email to