This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new 2b7651cca [Bug] set kubernetes pod template parameter bug fixed.
2b7651cca is described below

commit 2b7651cca1049aa6cdec59062e5ff1f63337eeae
Author: benjobs <[email protected]>
AuthorDate: Sun Mar 30 22:20:19 2025 +0800

    [Bug] set kubernetes pod template parameter bug fixed.
---
 .../src/views/flink/app/Detail.vue                 |  1 -
 .../client/trait/KubernetesNativeClientTrait.scala | 12 ++--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 80 ++++++++++++----------
 3 files changed, 52 insertions(+), 41 deletions(-)

diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
index 2b718ac19..772b4d9f7 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
@@ -197,7 +197,6 @@
               :redirect="link.renderedLinkUrl"
               :color="link.badgeColor"
               :message="link.badgeName"
-              :disabled="appNotRunning"
             />
           </div>
         </Space>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 72ba662e3..10976204c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.enums.{ExecutionMode, 
FlinkK8sRestExposedType}
 import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.PodTemplateTool
 import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
 
 import org.apache.commons.lang3.StringUtils
@@ -53,10 +54,13 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
         val buildResult = 
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
         buildResult.podTemplatePaths.foreach(
           p => {
-            flinkConfig
-              .safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
-              .safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
-              .safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+            if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
+              
flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+            } else if 
(PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
+              
flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+            } else if 
(PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
+              
flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+            }
           })
       }
     }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 05835252f..39114d197 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -494,52 +494,60 @@ private[kubernetes] object FlinkHistoryArchives extends 
Logger {
 
   private[this] val FAILED_STATE = "FAILED"
 
-  def getJobStateFromArchiveFile(trackId: TrackId): String = Try {
+  def getJobStateFromArchiveFile(trackId: TrackId): String = {
+
     require(trackId.jobId != null, "[StreamPark] getJobStateFromArchiveFile: 
JobId cannot be null.")
     val archiveDir = 
trackId.properties.getProperty(JobManagerOptions.ARCHIVE_DIR.key)
     if (archiveDir == null) {
       FAILED_STATE
     } else {
       val archivePath = new Path(archiveDir, trackId.jobId)
-      FsJobArchivist.getArchivedJsons(archivePath) match {
-        case r if r.isEmpty => FAILED_STATE
-        case r =>
-          r.foreach {
-            a =>
-              if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
-                Try(parse(a.getJson)) match {
-                  case Success(ok) =>
-                    val log = (ok \ "root-exception").extractOpt[String].orNull
-                    if (log != null) {
-                      val path = 
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
-                      val file = new File(path)
-                      Files.asCharSink(file, Charsets.UTF_8).write(log)
-                      println(" error path: " + path)
-                    }
-                  case _ =>
+      Try(FsJobArchivist.getArchivedJsons(archivePath)) match {
+        case Failure(e) =>
+          throw new RuntimeException(
+            s"[StreamPark] getJobStateFromArchiveFile: Failed to get archived 
jsons, error: ${e.getMessage}")
+        case Success(r) =>
+          if (r.isEmpty) {
+            FAILED_STATE
+          } else {
+            r.foreach {
+              a =>
+                if (a.getPath == "/jobs/overview") {
+                  Try(parse(a.getJson)) match {
+                    case Success(ok) =>
+                      ok \ "jobs" match {
+                        case JNothing | JNull =>
+                        case JArray(arr) =>
+                          arr.foreach(
+                            x => {
+                              val jid = (x \ "jid").extractOpt[String].orNull
+                              if (jid == trackId.jobId) {
+                                return (x \ "state").extractOpt[String].orNull
+                              }
+                            })
+                        case _ =>
+                      }
+                    case Failure(_) =>
+                  }
                 }
-              } else if (a.getPath == "/jobs/overview") {
-                Try(parse(a.getJson)) match {
-                  case Success(ok) =>
-                    ok \ "jobs" match {
-                      case JNothing | JNull =>
-                      case JArray(arr) =>
-                        arr.foreach(
-                          x => {
-                            val jid = (x \ "jid").extractOpt[String].orNull
-                            if (jid == trackId.jobId) {
-                              return (x \ "state").extractOpt[String].orNull
-                            }
-                          })
-                      case _ =>
-                    }
-                  case Failure(_) =>
+                if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
+                  Try(parse(a.getJson)) match {
+                    case Success(ok) =>
+                      val log = (ok \ 
"root-exception").extractOpt[String].orNull
+                      if (log != null) {
+                        val path = 
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
+                        val file = new File(path)
+                        Files.asCharSink(file, Charsets.UTF_8).write(log)
+                        println(" error path: " + path)
+                      }
+                    case _ =>
+                  }
                 }
-              }
+            }
+            FAILED_STATE
           }
-          FAILED_STATE
       }
     }
-  }.getOrElse(FAILED_STATE)
+  }
 
 }

Reply via email to