This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 65ffd73af [Improve] k8s session mode ingress config improvement
65ffd73af is described below

commit 65ffd73af0c615782a40dd9b3144923a835f426c
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 20 11:18:52 2024 +0800

    [Improve] k8s session mode ingress config improvement
---
 .../{LogClientService.java => ServiceHelper.java}  | 27 ++++++++++++------
 .../core/service/impl/ApplicationServiceImpl.java  | 32 ++++++++++------------
 .../core/service/impl/FlinkClusterServiceImpl.java | 16 ++++++++++-
 .../kubernetes/ingress/IngressController.scala     |  1 -
 .../watcher/FlinkCheckpointWatcher.scala           |  6 ++--
 .../kubernetes/watcher/FlinkK8sEventWatcher.scala  | 26 +++++++++++-------
 .../kubernetes/watcher/FlinkMetricsWatcher.scala   |  2 +-
 7 files changed, 67 insertions(+), 43 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
similarity index 67%
rename from 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
rename to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
index e60a5115c..048a2549a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
@@ -18,30 +18,31 @@
 package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.console.base.exception.ApiDetailException;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/** log client */
-@Slf4j
 @Component
-public class LogClientService {
+public class ServiceHelper {
+
+  @Autowired private SettingService settingService;
+
   public String rollViewLog(String path, int offset, int limit) {
     try {
       File file = new File(path);
       if (file.exists() && file.isFile()) {
         try (Stream<String> stream = Files.lines(Paths.get(path))) {
-          List<String> lines = 
stream.skip(offset).limit(limit).collect(Collectors.toList());
-          StringBuilder builder = new StringBuilder();
-          lines.forEach(line -> builder.append(line).append("\r\n"));
-          return builder.toString();
+          return 
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
         }
       }
       return null;
@@ -49,4 +50,12 @@ public class LogClientService {
       throw new ApiDetailException("roll view log error: " + e);
     }
   }
+
+  public void configureIngress(String clusterId, String namespace)
+      throws KubernetesClientException {
+    String domainName = settingService.getIngressModeDefault();
+    if (StringUtils.isNotBlank(domainName)) {
+      IngressController.configureIngress(domainName, clusterId, namespace);
+    }
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9e827b5f8..1d26029c1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -73,9 +73,9 @@ import 
org.apache.streampark.console.core.service.EffectiveService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.console.core.service.FlinkSqlService;
-import org.apache.streampark.console.core.service.LogClientService;
 import org.apache.streampark.console.core.service.ProjectService;
 import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
@@ -89,7 +89,6 @@ import org.apache.streampark.flink.client.bean.SubmitResponse;
 import org.apache.streampark.flink.core.conf.ParameterCli;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import 
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
-import org.apache.streampark.flink.kubernetes.ingress.IngressController;
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
@@ -219,12 +218,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private VariableService variableService;
 
-  @Autowired private LogClientService logClient;
-
   @Autowired private YarnQueueService yarnQueueService;
 
   @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
 
+  @Autowired private ServiceHelper serviceHelper;
+
   @PostConstruct
   public void resetOptionState() {
     this.baseMapper.resetOptionState();
@@ -642,7 +641,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                   future.cancel(true);
                 }
                 if (path != null) {
-                  return logClient.rollViewLog(path, offset, limit);
+                  return serviceHelper.rollViewLog(path, offset, limit);
                 }
                 return null;
               })
@@ -1681,19 +1680,16 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             flinkK8sWatcher.doWatching(trackId);
 
             if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
-              String domainName = settingService.getIngressModeDefault();
-              if (StringUtils.isNotBlank(domainName)) {
-                try {
-                  IngressController.configureIngress(
-                      domainName, application.getClusterId(), 
application.getK8sNamespace());
-                } catch (KubernetesClientException e) {
-                  log.info("Failed to create ingress, stack info:{}", 
e.getMessage());
-                  applicationLog.setException(e.getMessage());
-                  applicationLog.setSuccess(false);
-                  applicationLogService.save(applicationLog);
-                  application.setState(FlinkAppState.FAILED.getValue());
-                  application.setOptionState(OptionState.NONE.getValue());
-                }
+              try {
+                serviceHelper.configureIngress(
+                    application.getClusterId(), application.getK8sNamespace());
+              } catch (KubernetesClientException e) {
+                log.info("Failed to create ingress, stack info:{}", 
e.getMessage());
+                applicationLog.setException(e.getMessage());
+                applicationLog.setSuccess(false);
+                applicationLogService.save(applicationLog);
+                application.setState(FlinkAppState.FAILED.getValue());
+                application.setOptionState(OptionState.NONE.getValue());
               }
             }
           } else {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 079f400b3..4b73ef388 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.CommonService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.DeployRequest;
@@ -45,6 +46,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.google.common.annotations.VisibleForTesting;
+import io.fabric8.kubernetes.client.KubernetesClientException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -88,6 +90,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private YarnQueueService yarnQueueService;
 
+  @Autowired private ServiceHelper serviceHelper;
+
   @Override
   public ResponseResult check(FlinkCluster cluster) {
     ResponseResult result = new ResponseResult();
@@ -162,12 +166,13 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     }
 
     try {
+      // 1) deployRequest
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
       log.info("deploy cluster request: " + deployRequest);
 
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
-      DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
+      DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
       if (deployResponse.error() != null) {
         throw new ApiDetailException(
             "deploy cluster "
@@ -175,6 +180,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
                 + "failed, exception:\n"
                 + Utils.stringifyException(deployResponse.error()));
       } else {
+        // 2) set address.
         if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
           String address =
               YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
@@ -186,6 +192,14 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         flinkCluster.setClusterState(ClusterState.STARTED.getValue());
         flinkCluster.setException(null);
         updateById(flinkCluster);
+
+        // 3) k8s session ingress
+        try {
+          serviceHelper.configureIngress(
+              flinkCluster.getClusterId(), flinkCluster.getK8sNamespace());
+        } catch (KubernetesClientException e) {
+          log.info("Failed to create ingress: {}", e.getMessage());
+        }
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index e06ae5fac..d443a22bd 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -32,7 +32,6 @@ object IngressController extends Logger {
   private lazy val ingressStrategy: IngressStrategy = {
     using(new DefaultKubernetesClient()) {
       client =>
-        val versionInfo = client.getVersion
         val version = 
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
         if (version >= 1.19) {
           new IngressStrategyV1()
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 3d627ea1f..0b344d764 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -161,10 +161,10 @@ object Checkpoint {
           case _ =>
             val cp = Checkpoint(
               id = (completed \ "id").extractOpt[Long].getOrElse(0L),
-              status = (completed \ 
"status").extractOpt[String].getOrElse(null),
-              externalPath = (completed \ 
"external_path").extractOpt[String].getOrElse(null),
+              status = (completed \ "status").extractOpt[String].orNull,
+              externalPath = (completed \ 
"external_path").extractOpt[String].orNull,
               isSavepoint = (completed \ 
"is_savepoint").extractOpt[Boolean].getOrElse(false),
-              checkpointType = (completed \ 
"checkpoint_type").extractOpt[String].getOrElse(null),
+              checkpointType = (completed \ 
"checkpoint_type").extractOpt[String].orNull,
               triggerTimestamp = (completed \ 
"trigger_timestamp").extractOpt[Long].getOrElse(0L)
             )
             Some(cp)
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
index 8e9c348d1..7151f671c 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.kubernetes.kubeclient.resources.{CompatibleKubernetesWat
 
 import javax.annotation.concurrent.ThreadSafe
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 /**
  * K8s Event Watcher for Flink Native-K8s Mode. Currently only 
flink-native-application mode events
@@ -63,15 +63,21 @@ class FlinkK8sEventWatcher(implicit watchController: 
FlinkK8sWatchController)
 
   override def doWatch(): Unit = {
     // watch k8s deployment events
-    k8sClient
-      .apps()
-      .deployments()
-      .withLabel("type", "flink-native-kubernetes")
-      .watch(new CompatibleKubernetesWatcher[Deployment, 
CompKubernetesDeployment] {
-        override def eventReceived(action: Watcher.Action, event: Deployment): 
Unit = {
-          handleDeploymentEvent(action, event)
-        }
-      })
+    Try {
+      k8sClient
+        .apps()
+        .deployments()
+        .withLabel("type", "flink-native-kubernetes")
+        .watch(new CompatibleKubernetesWatcher[Deployment, 
CompKubernetesDeployment] {
+          override def eventReceived(action: Watcher.Action, event: 
Deployment): Unit = {
+            handleDeploymentEvent(action, event)
+          }
+        })
+    } match {
+      case Failure(e) =>
+        logError(s"k8sClient error: $e")
+      case _ =>
+    }
   }
 
   private def handleDeploymentEvent(action: Watcher.Action, event: 
Deployment): Unit = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 6f5ff0095..4dac87f53 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -117,7 +117,7 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = 
MetricWatcherConfig.default
    * This method can be called directly from outside, without affecting the 
current cachePool
    * result.
    */
-  def collectMetrics(id: TrackId): Option[FlinkMetricCV] = {
+  private def collectMetrics(id: TrackId): Option[FlinkMetricCV] = {
     // get flink rest api
     val clusterKey: ClusterKey = ClusterKey.of(id)
     val flinkJmRestUrl =

Reply via email to