This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b2ec2cd8d ClusterKey code optimization (#2920)
b2ec2cd8d is described below
commit b2ec2cd8de714387a1f723523bd088fad5c4eef0
Author: ChengJie1053 <[email protected]>
AuthorDate: Sat Aug 5 12:19:49 2023 +0800
ClusterKey code optimization (#2920)
* ClusterKey code optimization
* FlinkCheckpointWatcher code optimization
---
.../org/apache/streampark/flink/kubernetes/model/ClusterKey.scala | 3 ++-
.../scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala | 3 ++-
.../streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala | 3 +--
.../streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 3 +--
.../streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala | 3 +--
5 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
index 991a9647d..3a2c72aa8 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
@@ -17,12 +17,13 @@
package org.apache.streampark.flink.kubernetes.model
+import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
/** flink cluster identifier on kubernetes */
case class ClusterKey(
executeMode: FlinkK8sExecuteMode.Value,
- namespace: String = "default",
+ namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
clusterId: String)
object ClusterKey {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index fba47eee6..410836e6a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes.model
+import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
@@ -25,7 +26,7 @@ import scala.util.Try
/** tracking identifier for flink on kubernetes */
case class TrackId(
executeMode: FlinkK8sExecuteMode.Value,
- namespace: String = "default",
+ namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
clusterId: String,
appId: Long,
jobId: String,
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index d031e82d5..5ac00db54 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -77,8 +77,7 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.def
// get all legal tracking cluster key
val trackIds: Set[TrackId] = Try(watchController.getActiveWatchingIds())
.filter(_.nonEmpty)
- .getOrElse(return
- )
+ .getOrElse(return None)
// retrieve flink metrics in thread pool
val futures: Set[Future[Option[CheckpointCV]]] =
trackIds.map(
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index f00b1d42d..e6b0d7923 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -99,8 +99,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
// get all legal tracking ids
val trackIds = Try(watchController.getAllWatchingIds())
.filter(_.nonEmpty)
- .getOrElse(return
- )
+ .getOrElse(return None)
// retrieve flink job status in thread pool
val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 748bc8862..8fc80a33a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -77,8 +77,7 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
// get all legal tracking cluster key
val trackIds: Set[TrackId] = Try(watchController.getActiveWatchingIds())
.filter(_.nonEmpty)
- .getOrElse(return
- )
+ .getOrElse(return None)
// retrieve flink metrics in thread pool
val futures: Set[Future[Option[FlinkMetricCV]]] =
trackIds.map(