This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new da9dcae7d [Improve] code style improvement
da9dcae7d is described below

commit da9dcae7da9ca432eb61a2f536a36076ac5206b3
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 23:06:00 2024 +0800

    [Improve] code style improvement
---
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  9 +++++++
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 31 +++++++++++++++-------
 2 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index d54580032..6a113a56e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -102,6 +102,15 @@ public class FlinkK8sWatcherWrapper {
     if (CollectionUtils.isEmpty(k8sApplication)) {
       return Lists.newArrayList();
     }
+    // correct corrupted data
+    List<Application> correctApps =
+        k8sApplication.stream()
+            .filter(app -> !Bridge.toTrackId(app).isLegal())
+            .collect(Collectors.toList());
+    if (CollectionUtils.isNotEmpty(correctApps)) {
+      applicationService.saveOrUpdateBatch(correctApps);
+    }
+
     // filter out the application that should be tracking
     return k8sApplication.stream()
         .filter(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index eec41ebd6..36bac3d68 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -170,13 +170,22 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    */
   def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
     val pollEmitTime = System.currentTimeMillis
-    val clusterId = trackId.clusterId
-    val namespace = trackId.namespace
-    val appId = trackId.appId
-    val jobId = trackId.jobId
 
-    val rsMap = touchSessionAllJob(clusterId, namespace, appId, 
trackId.groupId).toMap
-    val id = TrackId.onSession(namespace, clusterId, appId, jobId, 
trackId.groupId)
+    val id = TrackId.onSession(
+      trackId.namespace,
+      trackId.clusterId,
+      trackId.appId,
+      trackId.jobId,
+      trackId.groupId
+    )
+
+    val rsMap = touchSessionAllJob(
+      trackId.namespace,
+      trackId.clusterId,
+      trackId.appId,
+      trackId.groupId
+    ).toMap
+
     val jobState = rsMap.get(id).filter(_.jobState != 
FlinkJobState.SILENT).getOrElse {
       val preCache = watchController.jobStatuses.get(id)
       val state = inferSilentOrLostFromPreCache(preCache)
@@ -207,22 +216,26 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    * result.
    */
   private def touchSessionAllJob(
-      @Nonnull clusterId: String,
       @Nonnull namespace: String,
+      @Nonnull clusterId: String,
       @Nonnull appId: Long,
       @Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
+
     lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
     val pollEmitTime = System.currentTimeMillis
+
     val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
       .getOrElse(return defaultResult)
       .jobs
+
     if (jobDetails.isEmpty) {
       defaultResult
     } else {
       jobDetails.map {
         d =>
-          TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
-            d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
+          val trackId = TrackId.onSession(namespace, clusterId, appId, d.jid, 
groupId)
+          val jobStatus = d.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis)
+          trackId -> jobStatus
       }
     }
   }

Reply via email to