This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/k8s-deploy by this push:
new 2c68e376a [Improve] k8s mode job state bug fixed.
2c68e376a is described below
commit 2c68e376a12e5705364041717dcd87bce4e4f3cf
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 23:14:55 2024 +0800
[Improve] k8s mode job state bug fixed.
---
.../console/core/entity/Application.java | 4 +
.../core/service/impl/ApplicationServiceImpl.java | 26 ++---
.../core/service/impl/FlinkClusterServiceImpl.java | 15 +--
.../flink/kubernetes/model/JobStatusCV.scala | 10 +-
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 107 +++++++++------------
5 files changed, 80 insertions(+), 82 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 4b03ee57b..f75bc8dbd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -483,6 +483,10 @@ public class Application implements Serializable {
return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
}
+ public boolean isKubernetesModeJob() {
+ return ExecutionMode.isKubernetesMode(this.getExecutionModeEnum());
+ }
+
@JsonIgnore
@SneakyThrows
public MavenDependency getMavenDependency() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 61bd08a90..b1c3ef872 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -157,7 +157,6 @@ import java.util.stream.Collectors;
import static org.apache.streampark.common.enums.StorageType.LFS;
import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
-import static
org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
@Slf4j
@Service
@@ -515,7 +514,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
record -> {
// status of flink job on kubernetes mode had been
automatically persisted to db
// in time.
- if (isKubernetesApp(record)) {
+ if (record.isKubernetesModeJob()) {
// set duration
String restUrl =
flinkK8sWatcher.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
@@ -1143,7 +1142,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
CompletableFuture<SubmitResponse> startFuture =
startFutureMap.remove(app.getId());
CompletableFuture<CancelResponse> cancelFuture =
cancelFutureMap.remove(app.getId());
Application application = this.baseMapper.getApp(app);
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(),
application.getJobId());
}
@@ -1194,7 +1193,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
// add flink web url info for k8s-mode
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
String restUrl =
flinkK8sWatcher.getRemoteRestUrl(toTrackId(application));
application.setFlinkRestUrl(restUrl);
@@ -1231,7 +1230,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
public boolean mapping(Application appParam) {
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
flinkK8sWatcher.doWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.doWatching(application);
@@ -1329,7 +1328,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
cancelFutureMap.put(application.getId(), cancelFuture);
- TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
+ TrackId trackId = application.isKubernetesModeJob() ?
toTrackId(application) : null;
cancelFuture.whenComplete(
(cancelResponse, throwable) -> {
@@ -1353,7 +1352,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
savePointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging exception
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
@@ -1379,7 +1378,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
savePointService.save(savePoint);
}
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
flinkK8sWatcher.unWatching(trackId);
}
});
@@ -1590,7 +1589,6 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.getK8sNamespace(),
application.getK8sRestExposedTypeEnum());
- TrackId trackId = isKubernetesApp(application) ? toTrackId(application) :
null;
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(
() -> FlinkClientHandler.submit(submitRequest), executorService);
@@ -1616,7 +1614,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
app.setState(FlinkAppState.FAILED.getValue());
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
- if (isKubernetesApp(app)) {
+ if (app.isKubernetesModeJob()) {
+ TrackId trackId = toTrackId(application);
flinkK8sWatcher.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
@@ -1651,13 +1650,16 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setEndTime(null);
// if start completed, will be added task to tracking queue
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
log.info(
"start job {} on {} success, doWatching...",
application.getJobName(),
application.getExecutionModeEnum().getName());
application.setRelease(ReleaseState.DONE.get());
+
+ TrackId trackId = toTrackId(application);
flinkK8sWatcher.doWatching(trackId);
+
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
@@ -1754,7 +1756,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
updateById(application);
savePointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
- if (isKubernetesApp(application)) {
+ if (application.isKubernetesModeJob()) {
TrackId id = toTrackId(application);
flinkK8sWatcher.doWatching(id);
} else {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index a207d28ff..d309eb5d3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -151,14 +151,17 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Transactional(rollbackFor = {Exception.class})
public void start(Long id) {
FlinkCluster flinkCluster = getById(id);
- ApiAlertException.throwIfTrue(
-
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
- "The same job name: "
- + flinkCluster.getClusterName()
- + " is already running in the yarn queue");
+ ApiAlertException.throwIfTrue(flinkCluster == null, "Invalid id, no
related cluster found.");
+ ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
+ if (executionModeEnum == ExecutionMode.YARN_SESSION) {
+ ApiAlertException.throwIfTrue(
+
!applicationService.getYARNApplication(flinkCluster.getClusterName()).isEmpty(),
+ "The application name: "
+ + flinkCluster.getClusterName()
+ + " is already running in the yarn queue, please check!");
+ }
try {
- ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
DeployRequest deployRequest = getDeployRequest(flinkCluster);
log.info("deploy cluster request: " + deployRequest);
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
index 7bd5bd52a..7b0e599fe 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/JobStatusCV.scala
@@ -42,4 +42,12 @@ case class JobStatusCV(
duration: Long = 0,
taskTotal: Int = 0,
pollEmitTime: Long,
- pollAckTime: Long)
+ pollAckTime: Long) {
+
+ def eq(that: JobStatusCV): Boolean = {
+ that == null ||
+ that.jobState != this.jobState ||
+ that.jobId != this.jobId
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 36bac3d68..aaaaf9bea 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
-import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
+import org.apache.streampark.flink.kubernetes.enums.{FlinkJobState,
FlinkK8sExecuteMode}
import
org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION,
SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
@@ -115,13 +115,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV =
watchController.jobStatuses.get(trackId)
-
- val eventChanged = latest == null ||
- latest.jobState != jobState.jobState ||
- latest.jobId != jobState.jobId
-
- if (eventChanged) {
- logInfo(s"eventChanged.....$trackId")
+ if (jobState.eq(latest)) {
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
@@ -168,44 +162,36 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
* This method can be called directly from outside, without affecting the
current cachePool
* result.
*/
- def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
+ def touchSessionJob(@Nonnull idArg: TrackId): Option[JobStatusCV] = {
val pollEmitTime = System.currentTimeMillis
- val id = TrackId.onSession(
- trackId.namespace,
- trackId.clusterId,
- trackId.appId,
- trackId.jobId,
- trackId.groupId
- )
+ val id = idArg.copy(executeMode = FlinkK8sExecuteMode.SESSION)
- val rsMap = touchSessionAllJob(
- trackId.namespace,
- trackId.clusterId,
- trackId.appId,
- trackId.groupId
- ).toMap
-
- val jobState = rsMap.get(id).filter(_.jobState !=
FlinkJobState.SILENT).getOrElse {
- val preCache = watchController.jobStatuses.get(id)
- val state = inferSilentOrLostFromPreCache(preCache)
- val nonFirstSilent =
- state == FlinkJobState.SILENT && preCache != null && preCache.jobState
== FlinkJobState.SILENT
- if (nonFirstSilent) {
- JobStatusCV(
- jobState = state,
- jobId = id.jobId,
- pollEmitTime = preCache.pollEmitTime,
- pollAckTime = preCache.pollAckTime)
- } else {
- JobStatusCV(
- jobState = state,
- jobId = id.jobId,
- pollEmitTime = pollEmitTime,
- pollAckTime = System.currentTimeMillis)
- }
+ touchSessionAllJob(id).get(id).filter(_.jobState != FlinkJobState.SILENT)
match {
+ case Some(state) => Option(state)
+ case _ =>
+ val preCache = watchController.jobStatuses.get(id)
+ val state = inferSilentOrLostFromPreCache(preCache)
+
+ val nonFirstSilent = state == FlinkJobState.SILENT &&
+ preCache != null &&
+ preCache.jobState == FlinkJobState.SILENT
+
+ val jobState = if (nonFirstSilent) {
+ JobStatusCV(
+ jobState = state,
+ jobId = id.jobId,
+ pollEmitTime = preCache.pollEmitTime,
+ pollAckTime = preCache.pollAckTime)
+ } else {
+ JobStatusCV(
+ jobState = state,
+ jobId = id.jobId,
+ pollEmitTime = pollEmitTime,
+ pollAckTime = System.currentTimeMillis)
+ }
+ Option(jobState)
}
- Some(jobState)
}
/**
@@ -215,28 +201,23 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
* This method can be called directly from outside, without affecting the
current cachePool
* result.
*/
- private def touchSessionAllJob(
- @Nonnull namespace: String,
- @Nonnull clusterId: String,
- @Nonnull appId: Long,
- @Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
-
- lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
+ private def touchSessionAllJob(trackId: TrackId): Map[TrackId, JobStatusCV]
= {
val pollEmitTime = System.currentTimeMillis
-
- val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
- .getOrElse(return defaultResult)
- .jobs
-
- if (jobDetails.isEmpty) {
- defaultResult
- } else {
- jobDetails.map {
- d =>
- val trackId = TrackId.onSession(namespace, clusterId, appId, d.jid,
groupId)
- val jobStatus = d.toJobStatusCV(pollEmitTime,
System.currentTimeMillis)
- trackId -> jobStatus
- }
+ val jobDetails = listJobsDetails(ClusterKey(SESSION, trackId.namespace,
trackId.clusterId))
+ jobDetails match {
+ case Some(details) if details.jobs.nonEmpty =>
+ details.jobs.map {
+ d =>
+ val id = TrackId.onSession(
+ trackId.namespace,
+ trackId.clusterId,
+ trackId.appId,
+ d.jid,
+ trackId.groupId)
+ val jobStatus = d.toJobStatusCV(pollEmitTime,
System.currentTimeMillis)
+ id -> jobStatus
+ }.toMap
+ case None => Map.empty
}
}