This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new da9dcae7d [Improve] code style improvement
da9dcae7d is described below
commit da9dcae7da9ca432eb61a2f536a36076ac5206b3
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 11 23:06:00 2024 +0800
[Improve] code style improvement
---
.../console/core/task/FlinkK8sWatcherWrapper.java | 9 +++++++
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 31 +++++++++++++++-------
2 files changed, 31 insertions(+), 9 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index d54580032..6a113a56e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -102,6 +102,15 @@ public class FlinkK8sWatcherWrapper {
if (CollectionUtils.isEmpty(k8sApplication)) {
return Lists.newArrayList();
}
+ // correct corrupted data
+ List<Application> correctApps =
+ k8sApplication.stream()
+ .filter(app -> !Bridge.toTrackId(app).isLegal())
+ .collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(correctApps)) {
+ applicationService.saveOrUpdateBatch(correctApps);
+ }
+
// filter out the application that should be tracking
return k8sApplication.stream()
.filter(
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index eec41ebd6..36bac3d68 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -170,13 +170,22 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
*/
def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
val pollEmitTime = System.currentTimeMillis
- val clusterId = trackId.clusterId
- val namespace = trackId.namespace
- val appId = trackId.appId
- val jobId = trackId.jobId
- val rsMap = touchSessionAllJob(clusterId, namespace, appId,
trackId.groupId).toMap
- val id = TrackId.onSession(namespace, clusterId, appId, jobId,
trackId.groupId)
+ val id = TrackId.onSession(
+ trackId.namespace,
+ trackId.clusterId,
+ trackId.appId,
+ trackId.jobId,
+ trackId.groupId
+ )
+
+ val rsMap = touchSessionAllJob(
+ trackId.namespace,
+ trackId.clusterId,
+ trackId.appId,
+ trackId.groupId
+ ).toMap
+
val jobState = rsMap.get(id).filter(_.jobState !=
FlinkJobState.SILENT).getOrElse {
val preCache = watchController.jobStatuses.get(id)
val state = inferSilentOrLostFromPreCache(preCache)
@@ -207,22 +216,26 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
* result.
*/
private def touchSessionAllJob(
- @Nonnull clusterId: String,
@Nonnull namespace: String,
+ @Nonnull clusterId: String,
@Nonnull appId: Long,
@Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
+
lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
val pollEmitTime = System.currentTimeMillis
+
val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
.getOrElse(return defaultResult)
.jobs
+
if (jobDetails.isEmpty) {
defaultResult
} else {
jobDetails.map {
d =>
- TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) ->
- d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
+ val trackId = TrackId.onSession(namespace, clusterId, appId, d.jid,
groupId)
+ val jobStatus = d.toJobStatusCV(pollEmitTime,
System.currentTimeMillis)
+ trackId -> jobStatus
}
}
}