This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new 2b7651cca [Bug] set kubernetes pod template parameter bug fixed.
2b7651cca is described below
commit 2b7651cca1049aa6cdec59062e5ff1f63337eeae
Author: benjobs <[email protected]>
AuthorDate: Sun Mar 30 22:20:19 2025 +0800
[Bug] set kubernetes pod template parameter bug fixed.
---
.../src/views/flink/app/Detail.vue | 1 -
.../client/trait/KubernetesNativeClientTrait.scala | 12 ++--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 80 ++++++++++++----------
3 files changed, 52 insertions(+), 41 deletions(-)
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
index 2b718ac19..772b4d9f7 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
@@ -197,7 +197,6 @@
:redirect="link.renderedLinkUrl"
:color="link.badgeColor"
:message="link.badgeName"
- :disabled="appNotRunning"
/>
</div>
</Space>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 72ba662e3..10976204c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType}
import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import org.apache.commons.lang3.StringUtils
@@ -53,10 +54,13 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
val buildResult =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
buildResult.podTemplatePaths.foreach(
p => {
- flinkConfig
- .safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
- .safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
- .safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+ if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
+
flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+ } else if
(PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
+
flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+ } else if
(PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
+
flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+ }
})
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 05835252f..39114d197 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -494,52 +494,60 @@ private[kubernetes] object FlinkHistoryArchives extends
Logger {
private[this] val FAILED_STATE = "FAILED"
- def getJobStateFromArchiveFile(trackId: TrackId): String = Try {
+ def getJobStateFromArchiveFile(trackId: TrackId): String = {
+
require(trackId.jobId != null, "[StreamPark] getJobStateFromArchiveFile:
JobId cannot be null.")
val archiveDir =
trackId.properties.getProperty(JobManagerOptions.ARCHIVE_DIR.key)
if (archiveDir == null) {
FAILED_STATE
} else {
val archivePath = new Path(archiveDir, trackId.jobId)
- FsJobArchivist.getArchivedJsons(archivePath) match {
- case r if r.isEmpty => FAILED_STATE
- case r =>
- r.foreach {
- a =>
- if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
- Try(parse(a.getJson)) match {
- case Success(ok) =>
- val log = (ok \ "root-exception").extractOpt[String].orNull
- if (log != null) {
- val path =
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
- val file = new File(path)
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- println(" error path: " + path)
- }
- case _ =>
+ Try(FsJobArchivist.getArchivedJsons(archivePath)) match {
+ case Failure(e) =>
+ throw new RuntimeException(
+ s"[StreamPark] getJobStateFromArchiveFile: Failed to get archived
jsons, error: ${e.getMessage}")
+ case Success(r) =>
+ if (r.isEmpty) {
+ FAILED_STATE
+ } else {
+ r.foreach {
+ a =>
+ if (a.getPath == "/jobs/overview") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull =>
+ case JArray(arr) =>
+ arr.foreach(
+ x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == trackId.jobId) {
+ return (x \ "state").extractOpt[String].orNull
+ }
+ })
+ case _ =>
+ }
+ case Failure(_) =>
+ }
}
- } else if (a.getPath == "/jobs/overview") {
- Try(parse(a.getJson)) match {
- case Success(ok) =>
- ok \ "jobs" match {
- case JNothing | JNull =>
- case JArray(arr) =>
- arr.foreach(
- x => {
- val jid = (x \ "jid").extractOpt[String].orNull
- if (jid == trackId.jobId) {
- return (x \ "state").extractOpt[String].orNull
- }
- })
- case _ =>
- }
- case Failure(_) =>
+ if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ val log = (ok \
"root-exception").extractOpt[String].orNull
+ if (log != null) {
+ val path =
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
+ val file = new File(path)
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ println(" error path: " + path)
+ }
+ case _ =>
+ }
}
- }
+ }
+ FAILED_STATE
}
- FAILED_STATE
}
}
- }.getOrElse(FAILED_STATE)
+ }
}