This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new e67bc43e5 [Improve] k8s deployment check exists improvement
e67bc43e5 is described below

commit e67bc43e56de8009f7fc205efbfb18cc90d924db
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 14 13:30:15 2024 +0800

    [Improve] k8s deployment check exists improvement
---
 .../core/service/impl/AppBuildPipeServiceImpl.java |  10 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 129 +++++++++------------
 .../core/service/impl/SavePointServiceImpl.java    |  15 ++-
 .../core/task/FlinkK8sChangeEventListener.java     |   6 +
 .../console/core/task/FlinkK8sWatcherWrapper.java  |  20 +---
 .../flink/kubernetes/KubernetesRetriever.scala     |  30 ++++-
 6 files changed, 102 insertions(+), 108 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index d54aff194..c37fd5e8e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -338,13 +338,9 @@ public class AppBuildPipeServiceImpl
         log.info("Submit params to building pipeline : {}", buildRequest);
         return FlinkRemoteBuildPipeline.of(buildRequest);
       case KUBERNETES_NATIVE_SESSION:
-        String k8sNamespace = app.getK8sNamespace();
-        String clusterId = app.getClusterId();
-        if (app.getFlinkClusterId() != null) {
-          FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
-          k8sNamespace = flinkCluster.getK8sNamespace();
-          clusterId = flinkCluster.getClusterId();
-        }
+        FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+        String k8sNamespace = flinkCluster.getK8sNamespace();
+        String clusterId = flinkCluster.getClusterId();
         FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
             new FlinkK8sSessionBuildRequest(
                 app.getJobName(),
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9be5f05d4..21e646d92 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -517,6 +517,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       }
     }
     this.baseMapper.page(page, appParam);
+
     List<Application> records = page.getRecords();
     long now = System.currentTimeMillis();
 
@@ -524,55 +525,43 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     Map<Long, PipelineStatus> pipeStates = 
appBuildPipeService.listPipelineStatus(appIds);
 
     // add building pipeline status info and app control info
-    records =
-        records.stream()
-            .peek(
-                record -> {
-                  // 1) running Duration
-                  if (record.getTracking() == 1) {
-                    FlinkAppState state = record.getFlinkAppStateEnum();
-                    if (state == FlinkAppState.RUNNING
-                        || state == FlinkAppState.CANCELLING
-                        || state == FlinkAppState.MAPPING) {
-                      record.setDuration(now - 
record.getStartTime().getTime());
-                    }
-                  }
-                  // 2) k8s restURL
-                  if (record.isKubernetesModeJob()) {
-                    // set duration
-                    String restUrl =
-                        
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
-                    record.setFlinkRestUrl(restUrl);
-                  }
-                })
-            .peek(
-                record -> {
-                  // 3) buildStatus
-                  if (pipeStates.containsKey(record.getId())) {
-                    
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
-                  }
-                })
-            .peek(
-                record -> {
-                  // 4) appControl
-                  AppControl appControl =
-                      new AppControl()
-                          .setAllowBuild(
-                              record.getBuildStatus() == null
-                                  || !PipelineStatus.running
-                                      .getCode()
-                                      .equals(record.getBuildStatus()))
-                          .setAllowStart(
-                              !record.shouldBeTrack()
-                                  && PipelineStatus.success
-                                      .getCode()
-                                      .equals(record.getBuildStatus()))
-                          .setAllowStop(record.isRunning());
-                  record.setAppControl(appControl);
-                })
-            .collect(Collectors.toList());
-
-    page.setRecords(records);
+    records.forEach(
+        record -> {
+          // 1) running Duration
+          if (record.getTracking() == 1) {
+            FlinkAppState state = record.getFlinkAppStateEnum();
+            if (state == FlinkAppState.RUNNING
+                || state == FlinkAppState.CANCELLING
+                || state == FlinkAppState.MAPPING) {
+              record.setDuration(now - record.getStartTime().getTime());
+            }
+            // 2) k8s restURL
+            if (record.isKubernetesModeJob()) {
+              // set duration
+              String restUrl =
+                  
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
+              record.setFlinkRestUrl(restUrl);
+            }
+          }
+
+          // 3) buildStatus
+          if (pipeStates.containsKey(record.getId())) {
+            record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+          }
+
+          // 4) appControl
+          AppControl appControl =
+              new AppControl()
+                  .setAllowBuild(
+                      record.getBuildStatus() == null
+                          || 
!PipelineStatus.running.getCode().equals(record.getBuildStatus()))
+                  .setAllowStart(
+                      !record.shouldBeTrack()
+                          && 
PipelineStatus.success.getCode().equals(record.getBuildStatus()))
+                  .setAllowStop(record.isRunning());
+          record.setAppControl(appControl);
+        });
+
     return page;
   }
 
@@ -1307,8 +1296,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     applicationLog.setYarnAppId(application.getClusterId());
 
     if (appParam.getSavePointed()) {
-      FlinkAppHttpWatcher.addSavepoint(application.getId());
-      application.setOptionState(OptionState.SAVEPOINTING.getValue());
+      if (!application.isKubernetesModeJob()) {
+        FlinkAppHttpWatcher.addSavepoint(application.getId());
+        application.setOptionState(OptionState.SAVEPOINTING.getValue());
+      }
     } else {
       application.setOptionState(OptionState.CANCELLING.getValue());
     }
@@ -1620,17 +1611,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     String k8sClusterId;
     FlinkK8sRestExposedType exposedType = null;
     if (application.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
-      // For compatibility with historical versions
-      if (application.getFlinkClusterId() == null) {
-        k8sClusterId = application.getClusterId();
-        k8sNamespace = application.getK8sNamespace();
-        exposedType = application.getK8sRestExposedTypeEnum();
-      } else {
-        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        k8sClusterId = cluster.getClusterId();
-        k8sNamespace = cluster.getK8sNamespace();
-        exposedType = cluster.getK8sRestExposedTypeEnum();
-      }
+      FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+      k8sClusterId = cluster.getClusterId();
+      k8sNamespace = cluster.getK8sNamespace();
+      exposedType = cluster.getK8sRestExposedTypeEnum();
     } else if (application.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
       k8sClusterId = application.getJobName();
       k8sNamespace = application.getK8sNamespace();
@@ -1969,19 +1953,14 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         k8sNamespace = application.getK8sNamespace();
         break;
       case KUBERNETES_NATIVE_SESSION:
-        if (application.getFlinkClusterId() == null) {
-          clusterId = application.getClusterId();
-          k8sNamespace = application.getK8sNamespace();
-        } else {
-          cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-          ApiAlertException.throwIfNull(
-              cluster,
-              String.format(
-                  "The Kubernetes session clusterId=%s can't found, maybe the 
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
-                  application.getFlinkClusterId()));
-          clusterId = cluster.getClusterId();
-          k8sNamespace = cluster.getK8sNamespace();
-        }
+        cluster = flinkClusterService.getById(application.getFlinkClusterId());
+        ApiAlertException.throwIfNull(
+            cluster,
+            String.format(
+                "The Kubernetes session clusterId=%s can't found, maybe the 
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
+                application.getFlinkClusterId()));
+        clusterId = cluster.getClusterId();
+        k8sNamespace = cluster.getK8sNamespace();
         break;
       default:
         break;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index bee73b6b8..b380483cc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -291,12 +291,15 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     applicationLog.setOptionTime(new Date());
     applicationLog.setYarnAppId(application.getClusterId());
 
-    FlinkAppHttpWatcher.addSavepoint(application.getId());
-
-    application.setOptionState(OptionState.SAVEPOINTING.getValue());
-    application.setOptionTime(new Date());
-    this.applicationService.updateById(application);
-    flinkAppHttpWatcher.initialize();
+    if (!application.isKubernetesModeJob()) {
+      FlinkAppHttpWatcher.addSavepoint(application.getId());
+      application.setOptionState(OptionState.SAVEPOINTING.getValue());
+      application.setOptionTime(new Date());
+      this.applicationService.updateById(application);
+      flinkAppHttpWatcher.initialize();
+    } else {
+      this.applicationService.updateById(application);
+    }
 
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index e211e57f5..47129b165 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -34,6 +34,8 @@ import 
org.apache.streampark.flink.kubernetes.model.JobStatusCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
@@ -42,6 +44,7 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
+import java.util.concurrent.TimeUnit;
 
 import scala.Enumeration;
 
@@ -59,6 +62,9 @@ public class FlinkK8sChangeEventListener {
 
   @Lazy @Autowired private CheckpointProcessor checkpointProcessor;
 
+  private static final Cache<Long, Byte> SAVEPOINT_CACHE =
+      Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
+
   /**
    * Catch FlinkJobStatusChangeEvent then storage it persistently to db. 
Actually update
    * org.apache.streampark.console.core.entity.Application records.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 71724e41b..bcacd21ba 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -25,7 +25,6 @@ import 
org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
 import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
-import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 
@@ -105,14 +104,7 @@ public class FlinkK8sWatcherWrapper {
     }
     // filter out the application that should be tracking
     return k8sApplication.stream()
-        .filter(
-            app -> {
-              boolean isEndState =
-                  
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()));
-              boolean deploymentExists =
-                  
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(), 
app.getClusterId());
-              return !isEndState || deploymentExists;
-            })
+        .filter(app -> 
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
         .map(this::toTrackId)
         .collect(Collectors.toList());
   }
@@ -126,13 +118,9 @@ public class FlinkK8sWatcherWrapper {
           app.getJobId(),
           app.getTeamId().toString());
     } else if (app.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
-      String namespace = app.getK8sNamespace();
-      String clusterId = app.getClusterId();
-      if (app.getFlinkClusterId() != null) {
-        FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
-        namespace = flinkCluster.getK8sNamespace();
-        clusterId = flinkCluster.getClusterId();
-      }
+      FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
+      String namespace = flinkCluster.getK8sNamespace();
+      String clusterId = flinkCluster.getClusterId();
       return TrackId.onSession(
           namespace, clusterId, app.getId(), app.getJobId(), 
app.getTeamId().toString());
     } else {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index c93a84923..47bfd62ea 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes
 
-import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.{DateUtils, Logger, Utils}
 import org.apache.streampark.common.util.Utils.using
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import org.apache.streampark.flink.kubernetes.ingress.IngressController
@@ -47,6 +47,8 @@ object KubernetesRetriever extends Logger {
   val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
     Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
 
+  private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
+
   /** get new KubernetesClient */
   @throws(classOf[KubernetesClientException])
   def newK8sClient(): KubernetesClient = {
@@ -122,15 +124,35 @@ object KubernetesRetriever extends Logger {
           .exists(_.getMetadata.getName == deploymentName)
     } {
       e =>
-        logError(
+        logWarn(
           s"""
-             |[StreamPark] check deploymentExists error,
+             |[StreamPark] check deploymentExists WARN,
              |namespace: $namespace,
              |deploymentName: $deploymentName,
              |error: $e
              |""".stripMargin
         )
-        true
+        val key = s"${namespace}_$deploymentName"
+        DEPLOYMENT_LOST_TIME.get(key) match {
+          case Some(time) =>
+            val timeOut = 1000 * 60 * 3L
+            if (System.currentTimeMillis() - time >= timeOut) {
+              logError(
+                s"""
+                   |[StreamPark] check deploymentExists Failed,
+                   |namespace: $namespace,
+                   |deploymentName: $deploymentName,
+                   |detail: deployment: $deploymentName Not Found more than 3 
minutes, $e
+                   |""".stripMargin
+              )
+              DEPLOYMENT_LOST_TIME -= key
+              return false
+            }
+            return true
+          case _ =>
+            DEPLOYMENT_LOST_TIME += key -> System.currentTimeMillis()
+            true
+        }
     }
   }
 

Reply via email to