This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new e67bc43e5 [Improve] k8s deployment check exists improvement
e67bc43e5 is described below
commit e67bc43e56de8009f7fc205efbfb18cc90d924db
Author: benjobs <[email protected]>
AuthorDate: Sun Apr 14 13:30:15 2024 +0800
[Improve] k8s deployment check exists improvement
---
.../core/service/impl/AppBuildPipeServiceImpl.java | 10 +-
.../core/service/impl/ApplicationServiceImpl.java | 129 +++++++++------------
.../core/service/impl/SavePointServiceImpl.java | 15 ++-
.../core/task/FlinkK8sChangeEventListener.java | 6 +
.../console/core/task/FlinkK8sWatcherWrapper.java | 20 +---
.../flink/kubernetes/KubernetesRetriever.scala | 30 ++++-
6 files changed, 102 insertions(+), 108 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index d54aff194..c37fd5e8e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -338,13 +338,9 @@ public class AppBuildPipeServiceImpl
log.info("Submit params to building pipeline : {}", buildRequest);
return FlinkRemoteBuildPipeline.of(buildRequest);
case KUBERNETES_NATIVE_SESSION:
- String k8sNamespace = app.getK8sNamespace();
- String clusterId = app.getClusterId();
- if (app.getFlinkClusterId() != null) {
- FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
- k8sNamespace = flinkCluster.getK8sNamespace();
- clusterId = flinkCluster.getClusterId();
- }
+ FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ String k8sNamespace = flinkCluster.getK8sNamespace();
+ String clusterId = flinkCluster.getClusterId();
FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
new FlinkK8sSessionBuildRequest(
app.getJobName(),
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9be5f05d4..21e646d92 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -517,6 +517,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
this.baseMapper.page(page, appParam);
+
List<Application> records = page.getRecords();
long now = System.currentTimeMillis();
@@ -524,55 +525,43 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Map<Long, PipelineStatus> pipeStates =
appBuildPipeService.listPipelineStatus(appIds);
// add building pipeline status info and app control info
- records =
- records.stream()
- .peek(
- record -> {
- // 1) running Duration
- if (record.getTracking() == 1) {
- FlinkAppState state = record.getFlinkAppStateEnum();
- if (state == FlinkAppState.RUNNING
- || state == FlinkAppState.CANCELLING
- || state == FlinkAppState.MAPPING) {
- record.setDuration(now -
record.getStartTime().getTime());
- }
- }
- // 2) k8s restURL
- if (record.isKubernetesModeJob()) {
- // set duration
- String restUrl =
-
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
- record.setFlinkRestUrl(restUrl);
- }
- })
- .peek(
- record -> {
- // 3) buildStatus
- if (pipeStates.containsKey(record.getId())) {
-
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
- }
- })
- .peek(
- record -> {
- // 4) appControl
- AppControl appControl =
- new AppControl()
- .setAllowBuild(
- record.getBuildStatus() == null
- || !PipelineStatus.running
- .getCode()
- .equals(record.getBuildStatus()))
- .setAllowStart(
- !record.shouldBeTrack()
- && PipelineStatus.success
- .getCode()
- .equals(record.getBuildStatus()))
- .setAllowStop(record.isRunning());
- record.setAppControl(appControl);
- })
- .collect(Collectors.toList());
-
- page.setRecords(records);
+ records.forEach(
+ record -> {
+ // 1) running Duration
+ if (record.getTracking() == 1) {
+ FlinkAppState state = record.getFlinkAppStateEnum();
+ if (state == FlinkAppState.RUNNING
+ || state == FlinkAppState.CANCELLING
+ || state == FlinkAppState.MAPPING) {
+ record.setDuration(now - record.getStartTime().getTime());
+ }
+ // 2) k8s restURL
+ if (record.isKubernetesModeJob()) {
+ // set duration
+ String restUrl =
+
flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
+ record.setFlinkRestUrl(restUrl);
+ }
+ }
+
+ // 3) buildStatus
+ if (pipeStates.containsKey(record.getId())) {
+ record.setBuildStatus(pipeStates.get(record.getId()).getCode());
+ }
+
+ // 4) appControl
+ AppControl appControl =
+ new AppControl()
+ .setAllowBuild(
+ record.getBuildStatus() == null
+ ||
!PipelineStatus.running.getCode().equals(record.getBuildStatus()))
+ .setAllowStart(
+ !record.shouldBeTrack()
+ &&
PipelineStatus.success.getCode().equals(record.getBuildStatus()))
+ .setAllowStop(record.isRunning());
+ record.setAppControl(appControl);
+ });
+
return page;
}
@@ -1307,8 +1296,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
applicationLog.setYarnAppId(application.getClusterId());
if (appParam.getSavePointed()) {
- FlinkAppHttpWatcher.addSavepoint(application.getId());
- application.setOptionState(OptionState.SAVEPOINTING.getValue());
+ if (!application.isKubernetesModeJob()) {
+ FlinkAppHttpWatcher.addSavepoint(application.getId());
+ application.setOptionState(OptionState.SAVEPOINTING.getValue());
+ }
} else {
application.setOptionState(OptionState.CANCELLING.getValue());
}
@@ -1620,17 +1611,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String k8sClusterId;
FlinkK8sRestExposedType exposedType = null;
if (application.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
- // For compatibility with historical versions
- if (application.getFlinkClusterId() == null) {
- k8sClusterId = application.getClusterId();
- k8sNamespace = application.getK8sNamespace();
- exposedType = application.getK8sRestExposedTypeEnum();
- } else {
- FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- k8sClusterId = cluster.getClusterId();
- k8sNamespace = cluster.getK8sNamespace();
- exposedType = cluster.getK8sRestExposedTypeEnum();
- }
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ k8sClusterId = cluster.getClusterId();
+ k8sNamespace = cluster.getK8sNamespace();
+ exposedType = cluster.getK8sRestExposedTypeEnum();
} else if (application.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
k8sClusterId = application.getJobName();
k8sNamespace = application.getK8sNamespace();
@@ -1969,19 +1953,14 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
k8sNamespace = application.getK8sNamespace();
break;
case KUBERNETES_NATIVE_SESSION:
- if (application.getFlinkClusterId() == null) {
- clusterId = application.getClusterId();
- k8sNamespace = application.getK8sNamespace();
- } else {
- cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- ApiAlertException.throwIfNull(
- cluster,
- String.format(
- "The Kubernetes session clusterId=%s can't found, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
- application.getFlinkClusterId()));
- clusterId = cluster.getClusterId();
- k8sNamespace = cluster.getK8sNamespace();
- }
+ cluster = flinkClusterService.getById(application.getFlinkClusterId());
+ ApiAlertException.throwIfNull(
+ cluster,
+ String.format(
+ "The Kubernetes session clusterId=%s can't found, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
+ application.getFlinkClusterId()));
+ clusterId = cluster.getClusterId();
+ k8sNamespace = cluster.getK8sNamespace();
break;
default:
break;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index bee73b6b8..b380483cc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -291,12 +291,15 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
- FlinkAppHttpWatcher.addSavepoint(application.getId());
-
- application.setOptionState(OptionState.SAVEPOINTING.getValue());
- application.setOptionTime(new Date());
- this.applicationService.updateById(application);
- flinkAppHttpWatcher.initialize();
+ if (!application.isKubernetesModeJob()) {
+ FlinkAppHttpWatcher.addSavepoint(application.getId());
+ application.setOptionState(OptionState.SAVEPOINTING.getValue());
+ application.setOptionTime(new Date());
+ this.applicationService.updateById(application);
+ flinkAppHttpWatcher.initialize();
+ } else {
+ this.applicationService.updateById(application);
+ }
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index e211e57f5..47129b165 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -34,6 +34,8 @@ import
org.apache.streampark.flink.kubernetes.model.JobStatusCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
@@ -42,6 +44,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.util.Date;
+import java.util.concurrent.TimeUnit;
import scala.Enumeration;
@@ -59,6 +62,9 @@ public class FlinkK8sChangeEventListener {
@Lazy @Autowired private CheckpointProcessor checkpointProcessor;
+ private static final Cache<Long, Byte> SAVEPOINT_CACHE =
+ Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
+
/**
* Catch FlinkJobStatusChangeEvent then storage it persistently to db.
Actually update
* org.apache.streampark.console.core.entity.Application records.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index 71724e41b..bcacd21ba 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -25,7 +25,6 @@ import
org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
-import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.model.TrackId;
@@ -105,14 +104,7 @@ public class FlinkK8sWatcherWrapper {
}
// filter out the application that should be tracking
return k8sApplication.stream()
- .filter(
- app -> {
- boolean isEndState =
-
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()));
- boolean deploymentExists =
-
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(),
app.getClusterId());
- return !isEndState || deploymentExists;
- })
+ .filter(app ->
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
.map(this::toTrackId)
.collect(Collectors.toList());
}
@@ -126,13 +118,9 @@ public class FlinkK8sWatcherWrapper {
app.getJobId(),
app.getTeamId().toString());
} else if (app.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
- String namespace = app.getK8sNamespace();
- String clusterId = app.getClusterId();
- if (app.getFlinkClusterId() != null) {
- FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
- namespace = flinkCluster.getK8sNamespace();
- clusterId = flinkCluster.getClusterId();
- }
+ FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
+ String namespace = flinkCluster.getK8sNamespace();
+ String clusterId = flinkCluster.getClusterId();
return TrackId.onSession(
namespace, clusterId, app.getId(), app.getJobId(),
app.getTeamId().toString());
} else {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index c93a84923..47bfd62ea 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes
-import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.{DateUtils, Logger, Utils}
import org.apache.streampark.common.util.Utils.using
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.ingress.IngressController
@@ -47,6 +47,8 @@ object KubernetesRetriever extends Logger {
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
+ private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
+
/** get new KubernetesClient */
@throws(classOf[KubernetesClientException])
def newK8sClient(): KubernetesClient = {
@@ -122,15 +124,35 @@ object KubernetesRetriever extends Logger {
.exists(_.getMetadata.getName == deploymentName)
} {
e =>
- logError(
+ logWarn(
s"""
- |[StreamPark] check deploymentExists error,
+ |[StreamPark] check deploymentExists WARN,
|namespace: $namespace,
|deploymentName: $deploymentName,
|error: $e
|""".stripMargin
)
- true
+ val key = s"${namespace}_$deploymentName"
+ DEPLOYMENT_LOST_TIME.get(key) match {
+ case Some(time) =>
+ val timeOut = 1000 * 60 * 3L
+ if (System.currentTimeMillis() - time >= timeOut) {
+ logError(
+ s"""
+ |[StreamPark] check deploymentExists Failed,
+ |namespace: $namespace,
+ |deploymentName: $deploymentName,
+ |detail: deployment: $deploymentName Not Found more than 3
minutes, $e
+ |""".stripMargin
+ )
+ DEPLOYMENT_LOST_TIME -= key
+ return false
+ }
+ return true
+ case _ =>
+ DEPLOYMENT_LOST_TIME += key -> System.currentTimeMillis()
+ true
+ }
}
}