This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 65ffd73af [Improve] k8s session mode ingress config improvement
65ffd73af is described below
commit 65ffd73af0c615782a40dd9b3144923a835f426c
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 20 11:18:52 2024 +0800
[Improve] k8s session mode ingress config improvement
---
.../{LogClientService.java => ServiceHelper.java} | 27 ++++++++++++------
.../core/service/impl/ApplicationServiceImpl.java | 32 ++++++++++------------
.../core/service/impl/FlinkClusterServiceImpl.java | 16 ++++++++++-
.../kubernetes/ingress/IngressController.scala | 1 -
.../watcher/FlinkCheckpointWatcher.scala | 6 ++--
.../kubernetes/watcher/FlinkK8sEventWatcher.scala | 26 +++++++++++-------
.../kubernetes/watcher/FlinkMetricsWatcher.scala | 2 +-
7 files changed, 67 insertions(+), 43 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
similarity index 67%
rename from
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
rename to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
index e60a5115c..048a2549a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ServiceHelper.java
@@ -18,30 +18,31 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.console.base.exception.ApiDetailException;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-/** log client */
-@Slf4j
@Component
-public class LogClientService {
+public class ServiceHelper {
+
+ @Autowired private SettingService settingService;
+
public String rollViewLog(String path, int offset, int limit) {
try {
File file = new File(path);
if (file.exists() && file.isFile()) {
try (Stream<String> stream = Files.lines(Paths.get(path))) {
- List<String> lines =
stream.skip(offset).limit(limit).collect(Collectors.toList());
- StringBuilder builder = new StringBuilder();
- lines.forEach(line -> builder.append(line).append("\r\n"));
- return builder.toString();
+ return
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
}
}
return null;
@@ -49,4 +50,12 @@ public class LogClientService {
throw new ApiDetailException("roll view log error: " + e);
}
}
+
+ public void configureIngress(String clusterId, String namespace)
+ throws KubernetesClientException {
+ String domainName = settingService.getIngressModeDefault();
+ if (StringUtils.isNotBlank(domainName)) {
+ IngressController.configureIngress(domainName, clusterId, namespace);
+ }
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9e827b5f8..1d26029c1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -73,9 +73,9 @@ import
org.apache.streampark.console.core.service.EffectiveService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
-import org.apache.streampark.console.core.service.LogClientService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.SavePointService;
+import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
@@ -89,7 +89,6 @@ import org.apache.streampark.flink.client.bean.SubmitResponse;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import
org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
-import org.apache.streampark.flink.kubernetes.ingress.IngressController;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
@@ -219,12 +218,12 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Autowired private VariableService variableService;
- @Autowired private LogClientService logClient;
-
@Autowired private YarnQueueService yarnQueueService;
@Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
+ @Autowired private ServiceHelper serviceHelper;
+
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
@@ -642,7 +641,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
future.cancel(true);
}
if (path != null) {
- return logClient.rollViewLog(path, offset, limit);
+ return serviceHelper.rollViewLog(path, offset, limit);
}
return null;
})
@@ -1681,19 +1680,16 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
flinkK8sWatcher.doWatching(trackId);
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
- String domainName = settingService.getIngressModeDefault();
- if (StringUtils.isNotBlank(domainName)) {
- try {
- IngressController.configureIngress(
- domainName, application.getClusterId(),
application.getK8sNamespace());
- } catch (KubernetesClientException e) {
- log.info("Failed to create ingress, stack info:{}",
e.getMessage());
- applicationLog.setException(e.getMessage());
- applicationLog.setSuccess(false);
- applicationLogService.save(applicationLog);
- application.setState(FlinkAppState.FAILED.getValue());
- application.setOptionState(OptionState.NONE.getValue());
- }
+ try {
+ serviceHelper.configureIngress(
+ application.getClusterId(), application.getK8sNamespace());
+ } catch (KubernetesClientException e) {
+ log.info("Failed to create ingress, stack info:{}",
e.getMessage());
+ applicationLog.setException(e.getMessage());
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ application.setState(FlinkAppState.FAILED.getValue());
+ application.setOptionState(OptionState.NONE.getValue());
}
}
} else {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 079f400b3..4b73ef388 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,6 +32,7 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.DeployRequest;
@@ -45,6 +46,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.annotations.VisibleForTesting;
+import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -88,6 +90,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private YarnQueueService yarnQueueService;
+ @Autowired private ServiceHelper serviceHelper;
+
@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
@@ -162,12 +166,13 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
try {
+ // 1) deployRequest
DeployRequest deployRequest = getDeployRequest(flinkCluster);
log.info("deploy cluster request: " + deployRequest);
Future<DeployResponse> future =
executorService.submit(() -> FlinkClient.deploy(deployRequest));
- DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
+ DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
if (deployResponse.error() != null) {
throw new ApiDetailException(
"deploy cluster "
@@ -175,6 +180,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
+ "failed, exception:\n"
+ Utils.stringifyException(deployResponse.error()));
} else {
+ // 2) set address.
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
@@ -186,6 +192,14 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setClusterState(ClusterState.STARTED.getValue());
flinkCluster.setException(null);
updateById(flinkCluster);
+
+ // 3) k8s session ingress
+ try {
+ serviceHelper.configureIngress(
+ flinkCluster.getClusterId(), flinkCluster.getK8sNamespace());
+ } catch (KubernetesClientException e) {
+ log.info("Failed to create ingress: {}", e.getMessage());
+ }
}
} catch (Exception e) {
log.error(e.getMessage(), e);
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index e06ae5fac..d443a22bd 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -32,7 +32,6 @@ object IngressController extends Logger {
private lazy val ingressStrategy: IngressStrategy = {
using(new DefaultKubernetesClient()) {
client =>
- val versionInfo = client.getVersion
val version =
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
if (version >= 1.19) {
new IngressStrategyV1()
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 3d627ea1f..0b344d764 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -161,10 +161,10 @@ object Checkpoint {
case _ =>
val cp = Checkpoint(
id = (completed \ "id").extractOpt[Long].getOrElse(0L),
- status = (completed \
"status").extractOpt[String].getOrElse(null),
- externalPath = (completed \
"external_path").extractOpt[String].getOrElse(null),
+ status = (completed \ "status").extractOpt[String].orNull,
+ externalPath = (completed \
"external_path").extractOpt[String].orNull,
isSavepoint = (completed \
"is_savepoint").extractOpt[Boolean].getOrElse(false),
- checkpointType = (completed \
"checkpoint_type").extractOpt[String].getOrElse(null),
+ checkpointType = (completed \
"checkpoint_type").extractOpt[String].orNull,
triggerTimestamp = (completed \
"trigger_timestamp").extractOpt[Long].getOrElse(0L)
)
Some(cp)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
index 8e9c348d1..7151f671c 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
@@ -27,7 +27,7 @@ import
org.apache.flink.kubernetes.kubeclient.resources.{CompatibleKubernetesWat
import javax.annotation.concurrent.ThreadSafe
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
/**
* K8s Event Watcher for Flink Native-K8s Mode. Currently only
flink-native-application mode events
@@ -63,15 +63,21 @@ class FlinkK8sEventWatcher(implicit watchController:
FlinkK8sWatchController)
override def doWatch(): Unit = {
// watch k8s deployment events
- k8sClient
- .apps()
- .deployments()
- .withLabel("type", "flink-native-kubernetes")
- .watch(new CompatibleKubernetesWatcher[Deployment,
CompKubernetesDeployment] {
- override def eventReceived(action: Watcher.Action, event: Deployment):
Unit = {
- handleDeploymentEvent(action, event)
- }
- })
+ Try {
+ k8sClient
+ .apps()
+ .deployments()
+ .withLabel("type", "flink-native-kubernetes")
+ .watch(new CompatibleKubernetesWatcher[Deployment,
CompKubernetesDeployment] {
+ override def eventReceived(action: Watcher.Action, event:
Deployment): Unit = {
+ handleDeploymentEvent(action, event)
+ }
+ })
+ } match {
+ case Failure(e) =>
+ logError(s"k8sClient error: $e")
+ case _ =>
+ }
}
private def handleDeploymentEvent(action: Watcher.Action, event:
Deployment): Unit = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 6f5ff0095..4dac87f53 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -117,7 +117,7 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
* This method can be called directly from outside, without affecting the
current cachePool
* result.
*/
- def collectMetrics(id: TrackId): Option[FlinkMetricCV] = {
+ private def collectMetrics(id: TrackId): Option[FlinkMetricCV] = {
// get flink rest api
val clusterKey: ClusterKey = ClusterKey.of(id)
val flinkJmRestUrl =