This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new cd7013ef3 [Improve] KubernetesWatcher improvement
cd7013ef3 is described below

commit cd7013ef30a9f53b304a341c8447853daf605dc9
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 6 20:08:45 2024 +0800

    [Improve] KubernetesWatcher improvement
---
 .../flink/kubernetes/KubernetesRetriever.scala       | 18 +++++++++++++++---
 .../kubernetes/helper/KubernetesWatcherHelper.scala  | 20 +++++++++-----------
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala   |  1 +
 3 files changed, 25 insertions(+), 14 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 79ba51812..d2e3a8236 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -37,6 +37,7 @@ import javax.annotation.Nullable
 import java.time.Duration
 
 import scala.collection.JavaConverters._
+import scala.util
 import scala.util.{Failure, Success, Try}
 
 object KubernetesRetriever extends Logger {
@@ -105,10 +106,10 @@ object KubernetesRetriever extends Logger {
   /**
    * check whether deployment exists on kubernetes cluster
    *
-   * @param name
-   *   deployment name
    * @param namespace
    *   deployment namespace
+   * @param deploymentName
+   *   deployment name
    */
   def isDeploymentExists(namespace: String, deploymentName: String): Boolean = 
{
     using(KubernetesRetriever.newK8sClient()) {
@@ -122,7 +123,18 @@ object KubernetesRetriever extends Logger {
           .getItems
           .asScala
           .exists(_.getMetadata.getName == deploymentName)
-    }(_ => false)
+    } {
+      e =>
+        logError(
+          s"""
+             |[StreamPark] check deploymentExists error,
+             |namespace: $namespace,
+             |deploymentName: $deploymentName,
+             |error: $e
+             |""".stripMargin
+        )
+        false
+    }
   }
 
   /** retrieve flink jobManager rest url */
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
index 90ae67c1a..fa975cc6b 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.helper
 
+import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{DefaultFlinkK8sWatcher, 
FlinkK8sWatcher}
 
 import java.util.{Timer, TimerTask}
@@ -24,7 +25,7 @@ import java.util.{Timer, TimerTask}
 import scala.language.implicitConversions
 
 /** Debug helper for FlinkTrackMonitor, only for streampark development, 
debugging scenarios. */
-object KubernetesWatcherHelper {
+object KubernetesWatcherHelper extends Logger {
 
   implicit private def funcToTimerTask(fun: () => Unit): TimerTask = new 
TimerTask() {
     def run(): Unit = fun()
@@ -35,7 +36,7 @@ object KubernetesWatcherHelper {
   def watchJobStatusCacheSize(implicit k8sWatcher: FlinkK8sWatcher): Unit =
     new Timer().scheduleAtFixedRate(
       () =>
-        println(
+        logInfo(
           s"[flink-k8s][status-size]-${System.currentTimeMillis} => " +
             s"${k8sWatcher.getAllJobStatus.size}"),
       0,
@@ -45,7 +46,7 @@ object KubernetesWatcherHelper {
   def watchAggClusterMetricsCache(implicit k8sWatcher: FlinkK8sWatcher): Unit =
     new Timer().scheduleAtFixedRate(
       () =>
-        println(
+        logInfo(
           s"[flink-k8s][agg-metric]-${System.currentTimeMillis} => " +
             s"${k8sWatcher.getAccGroupMetrics()}"),
       0,
@@ -55,7 +56,7 @@ object KubernetesWatcherHelper {
   def watchClusterMetricsCache(implicit k8sWatcher: FlinkK8sWatcher): Unit =
     new Timer().scheduleAtFixedRate(
       () =>
-        println(s"[flink-k8s][metric]-${System.currentTimeMillis} => " +
+        logInfo(s"[flink-k8s][metric]-${System.currentTimeMillis} => " +
           
s"count=${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.flinkMetrics.asMap().size}
 | " +
           
s"${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.flinkMetrics.asMap().mkString(",")}"),
       0,
@@ -66,7 +67,7 @@ object KubernetesWatcherHelper {
   def watchJobStatusCache(implicit k8sWatcher: FlinkK8sWatcher): Unit =
     new Timer().scheduleAtFixedRate(
       () =>
-        println(
+        logInfo(
           s"[flink-k8s][status]-${System.currentTimeMillis} =>" +
             s"count=${k8sWatcher.getAllJobStatus.size} | " +
             s" ${k8sWatcher.getAllJobStatus.mkString(", ")}"),
@@ -78,35 +79,32 @@ object KubernetesWatcherHelper {
   def watchTrackIdsCache(implicit k8sWatcher: FlinkK8sWatcher): Unit = {
     new Timer().scheduleAtFixedRate(
       () =>
-        println(
+        logInfo(
           s"[flink-k8s][trackIds]-${System.currentTimeMillis} => " +
             s"${k8sWatcher.getAllWatchingIds.mkString(",")}"),
       0,
       1500)
   }
 
-  // print trackId cache size info
   def watchTrackIdsCacheSize(implicit k8sWatcher: FlinkK8sWatcher): Unit = {
     new Timer().scheduleAtFixedRate(
       () =>
-        println(
+        logInfo(
           s"[flink-k8s][trackIds-size]-${System.currentTimeMillis} => " +
             s"${k8sWatcher.getAllWatchingIds.size}"),
       0,
       1500)
   }
 
-  // print k8s event cache detail
   def watchK8sEventCache(implicit k8sWatcher: FlinkK8sWatcher): Unit = {
     new Timer().scheduleAtFixedRate(
       () =>
-        println(s"[flink-k8s][k8s-event]-${System.currentTimeMillis} => " +
+        logInfo(s"[flink-k8s][k8s-event]-${System.currentTimeMillis} => " +
           
s"count=${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.k8sDeploymentEvents.asMap().size}
 | " +
           
s"${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.k8sDeploymentEvents.asMap().mkString(",")}"),
       0,
       1500
     )
   }
-  // scalastyle:on println
 
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 2879f17bb..6d731fe83 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -130,6 +130,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
                 trackId.namespace,
                 trackId.clusterId
               )
+
               if (FlinkJobState.isEndState(jobState.jobState) && 
!deployExists) {
                 // remove trackId from cache of job that needs to be untracked
                 watchController.unWatching(trackId)

Reply via email to