This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b2ec2cd8d ClusterKey code optimization (#2920)
b2ec2cd8d is described below

commit b2ec2cd8de714387a1f723523bd088fad5c4eef0
Author: ChengJie1053 <[email protected]>
AuthorDate: Sat Aug 5 12:19:49 2023 +0800

    ClusterKey code optimization (#2920)
    
    * ClusterKey code optimization
    
    * FlinkCheckpointWatcher code optimization
---
 .../org/apache/streampark/flink/kubernetes/model/ClusterKey.scala      | 3 ++-
 .../scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala   | 3 ++-
 .../streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala   | 3 +--
 .../streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala    | 3 +--
 .../streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala      | 3 +--
 5 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
index 991a9647d..3a2c72aa8 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
@@ -17,12 +17,13 @@
 
 package org.apache.streampark.flink.kubernetes.model
 
+import org.apache.streampark.common.conf.K8sFlinkConfig
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
 /** flink cluster identifier on kubernetes */
 case class ClusterKey(
     executeMode: FlinkK8sExecuteMode.Value,
-    namespace: String = "default",
+    namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
     clusterId: String)
 
 object ClusterKey {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index fba47eee6..410836e6a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.model
 
+import org.apache.streampark.common.conf.K8sFlinkConfig
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
@@ -25,7 +26,7 @@ import scala.util.Try
 /** tracking identifier for flink on kubernetes */
 case class TrackId(
     executeMode: FlinkK8sExecuteMode.Value,
-    namespace: String = "default",
+    namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
     clusterId: String,
     appId: Long,
     jobId: String,
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index d031e82d5..5ac00db54 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -77,8 +77,7 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.def
     // get all legal tracking cluster key
     val trackIds: Set[TrackId] = Try(watchController.getActiveWatchingIds())
       .filter(_.nonEmpty)
-      .getOrElse(return
-      )
+      .getOrElse(return None)
     // retrieve flink metrics in thread pool
     val futures: Set[Future[Option[CheckpointCV]]] =
       trackIds.map(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index f00b1d42d..e6b0d7923 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -99,8 +99,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
       // get all legal tracking ids
       val trackIds = Try(watchController.getAllWatchingIds())
         .filter(_.nonEmpty)
-        .getOrElse(return
-        )
+        .getOrElse(return None)
 
       // retrieve flink job status in thread pool
       val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 748bc8862..8fc80a33a 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -77,8 +77,7 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
     // get all legal tracking cluster key
     val trackIds: Set[TrackId] = Try(watchController.getActiveWatchingIds())
       .filter(_.nonEmpty)
-      .getOrElse(return
-      )
+      .getOrElse(return None)
     // retrieve flink metrics in thread pool
     val futures: Set[Future[Option[FlinkMetricCV]]] =
       trackIds.map(

Reply via email to