This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new cd7013ef3 [Improve] KubernetesWatcher improvement
cd7013ef3 is described below
commit cd7013ef30a9f53b304a341c8447853daf605dc9
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 6 20:08:45 2024 +0800
[Improve] KubernetesWatcher improvement
---
.../flink/kubernetes/KubernetesRetriever.scala | 18 +++++++++++++++---
.../kubernetes/helper/KubernetesWatcherHelper.scala | 20 +++++++++-----------
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 1 +
3 files changed, 25 insertions(+), 14 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 79ba51812..d2e3a8236 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -37,6 +37,7 @@ import javax.annotation.Nullable
import java.time.Duration
import scala.collection.JavaConverters._
+import scala.util
import scala.util.{Failure, Success, Try}
object KubernetesRetriever extends Logger {
@@ -105,10 +106,10 @@ object KubernetesRetriever extends Logger {
/**
* check whether deployment exists on kubernetes cluster
*
- * @param name
- * deployment name
* @param namespace
* deployment namespace
+ * @param deploymentName
+ * deployment name
*/
def isDeploymentExists(namespace: String, deploymentName: String): Boolean =
{
using(KubernetesRetriever.newK8sClient()) {
@@ -122,7 +123,18 @@ object KubernetesRetriever extends Logger {
.getItems
.asScala
.exists(_.getMetadata.getName == deploymentName)
- }(_ => false)
+ } {
+ e =>
+ logError(
+ s"""
+ |[StreamPark] check deploymentExists error,
+ |namespace: $namespace,
+ |deploymentName: $deploymentName,
+ |error: $e
+ |""".stripMargin
+ )
+ false
+ }
}
/** retrieve flink jobManager rest url */
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
index 90ae67c1a..fa975cc6b 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesWatcherHelper.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes.helper
+import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{DefaultFlinkK8sWatcher,
FlinkK8sWatcher}
import java.util.{Timer, TimerTask}
@@ -24,7 +25,7 @@ import java.util.{Timer, TimerTask}
import scala.language.implicitConversions
/** Debug helper for FlinkTrackMonitor, only for streampark development,
debugging scenarios. */
-object KubernetesWatcherHelper {
+object KubernetesWatcherHelper extends Logger {
implicit private def funcToTimerTask(fun: () => Unit): TimerTask = new
TimerTask() {
def run(): Unit = fun()
@@ -35,7 +36,7 @@ object KubernetesWatcherHelper {
def watchJobStatusCacheSize(implicit k8sWatcher: FlinkK8sWatcher): Unit =
new Timer().scheduleAtFixedRate(
() =>
- println(
+ logInfo(
s"[flink-k8s][status-size]-${System.currentTimeMillis} => " +
s"${k8sWatcher.getAllJobStatus.size}"),
0,
@@ -45,7 +46,7 @@ object KubernetesWatcherHelper {
def watchAggClusterMetricsCache(implicit k8sWatcher: FlinkK8sWatcher): Unit =
new Timer().scheduleAtFixedRate(
() =>
- println(
+ logInfo(
s"[flink-k8s][agg-metric]-${System.currentTimeMillis} => " +
s"${k8sWatcher.getAccGroupMetrics()}"),
0,
@@ -55,7 +56,7 @@ object KubernetesWatcherHelper {
def watchClusterMetricsCache(implicit k8sWatcher: FlinkK8sWatcher): Unit =
new Timer().scheduleAtFixedRate(
() =>
- println(s"[flink-k8s][metric]-${System.currentTimeMillis} => " +
+ logInfo(s"[flink-k8s][metric]-${System.currentTimeMillis} => " +
s"count=${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.flinkMetrics.asMap().size}
| " +
s"${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.flinkMetrics.asMap().mkString(",")}"),
0,
@@ -66,7 +67,7 @@ object KubernetesWatcherHelper {
def watchJobStatusCache(implicit k8sWatcher: FlinkK8sWatcher): Unit =
new Timer().scheduleAtFixedRate(
() =>
- println(
+ logInfo(
s"[flink-k8s][status]-${System.currentTimeMillis} =>" +
s"count=${k8sWatcher.getAllJobStatus.size} | " +
s" ${k8sWatcher.getAllJobStatus.mkString(", ")}"),
@@ -78,35 +79,32 @@ object KubernetesWatcherHelper {
def watchTrackIdsCache(implicit k8sWatcher: FlinkK8sWatcher): Unit = {
new Timer().scheduleAtFixedRate(
() =>
- println(
+ logInfo(
s"[flink-k8s][trackIds]-${System.currentTimeMillis} => " +
s"${k8sWatcher.getAllWatchingIds.mkString(",")}"),
0,
1500)
}
- // print trackId cache size info
def watchTrackIdsCacheSize(implicit k8sWatcher: FlinkK8sWatcher): Unit = {
new Timer().scheduleAtFixedRate(
() =>
- println(
+ logInfo(
s"[flink-k8s][trackIds-size]-${System.currentTimeMillis} => " +
s"${k8sWatcher.getAllWatchingIds.size}"),
0,
1500)
}
- // print k8s event cache detail
def watchK8sEventCache(implicit k8sWatcher: FlinkK8sWatcher): Unit = {
new Timer().scheduleAtFixedRate(
() =>
- println(s"[flink-k8s][k8s-event]-${System.currentTimeMillis} => " +
+ logInfo(s"[flink-k8s][k8s-event]-${System.currentTimeMillis} => " +
s"count=${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.k8sDeploymentEvents.asMap().size}
| " +
s"${k8sWatcher.asInstanceOf[DefaultFlinkK8sWatcher].watchController.k8sDeploymentEvents.asMap().mkString(",")}"),
0,
1500
)
}
- // scalastyle:on println
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 2879f17bb..6d731fe83 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -130,6 +130,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
trackId.namespace,
trackId.clusterId
)
+
if (FlinkJobState.isEndState(jobState.jobState) &&
!deployExists) {
// remove trackId from cache of job that needs to be untracked
watchController.unWatching(trackId)