This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new b4c40508b [Improve] get flink job state from FsJobArchivist improvement
b4c40508b is described below

commit b4c40508b45954328c07194972ab899621ca86a0
Author: benjobs <[email protected]>
AuthorDate: Thu Apr 18 22:40:51 2024 +0800

    [Improve] get flink job state from FsJobArchivist improvement
---
 .../apache/streampark/common/conf/Workspace.scala  |  3 -
 .../streampark/console/core/entity/FlinkEnv.java   | 10 +++
 .../core/service/impl/ApplicationServiceImpl.java  | 13 ++--
 .../core/service/impl/SavePointServiceImpl.java    |  4 +-
 .../console/core/task/FlinkK8sWatcherWrapper.java  | 27 ++++++-
 .../flink/kubernetes/model/TrackId.scala           | 19 +++--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 87 ++++++++++++----------
 7 files changed, 102 insertions(+), 61 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index c33baec0b..a1778f9ef 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -56,9 +56,6 @@ object Workspace {
   /** project build log path. */
   lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
 
-  /** project archives path */
-  lazy val ARCHIVES_FILE_PATH = s"${remote.WORKSPACE}/historyserver/archive"
-
 }
 
 case class Workspace(storageType: StorageType) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index f9aefa9bd..6decd5de5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -35,6 +35,7 @@ import java.io.File;
 import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
+import java.util.Properties;
 
 @Getter
 @Setter
@@ -91,6 +92,15 @@ public class FlinkEnv implements Serializable {
     return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
   }
 
+  @JsonIgnore
+  public Properties getFlinkConfig() {
+    String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
+    Properties flinkConfig = new Properties();
+    Map<String, String> config = 
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+    flinkConfig.putAll(config);
+    return flinkConfig;
+  }
+
   @JsonIgnore
   public FlinkVersion getFlinkVersion() {
     if (this.flinkVersion == null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 4ba413501..3cb86d1d1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1635,7 +1635,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         SubmitRequest.apply(
             flinkEnv.getFlinkVersion(),
             ExecutionMode.of(application.getExecutionMode()),
-            getProperties(application),
+            getProperties(application, flinkEnv),
             flinkEnv.getFlinkConf(),
             DevelopmentMode.of(application.getJobType()),
             application.getId(),
@@ -1749,7 +1749,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         });
   }
 
-  private Map<String, Object> getProperties(Application application) {
+  private Map<String, Object> getProperties(Application application, FlinkEnv 
flinkEnv) {
     Map<String, Object> properties = application.getOptionMap();
     if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
       FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
@@ -1787,11 +1787,10 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     }
 
     if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
-      try {
-        HadoopUtils.yarnClient();
-        properties.put(JobManagerOptions.ARCHIVE_DIR.key(), 
Workspace.ARCHIVES_FILE_PATH());
-      } catch (Exception e) {
-        // skip
+      String archiveDir =
+          
flinkEnv.getFlinkConfig().getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+      if (archiveDir != null) {
+        properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
       }
     }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 5b59c9ff4..af0271437 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -150,7 +150,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     }
 
     if (cpThreshold == 0) {
-      String flinkConfNumRetained = 
flinkEnv.convertFlinkYamlAsMap().get(numRetainedKey);
+      String flinkConfNumRetained = 
flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
       int numRetainedDefaultValue = 
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
       if (flinkConfNumRetained != null) {
         try {
@@ -273,7 +273,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
       // flink
       FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
       savepointPath =
-          
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+          
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
     }
 
     return savepointPath;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index f63f21b37..c449b4e21 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -18,10 +18,13 @@
 package org.apache.streampark.console.core.task;
 
 import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
 import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
 import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
@@ -29,6 +32,7 @@ import 
org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
 import org.apache.streampark.flink.kubernetes.model.TrackId;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.configuration.JobManagerOptions;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.google.common.collect.Lists;
@@ -40,6 +44,8 @@ import org.springframework.context.annotation.Lazy;
 import javax.annotation.Nonnull;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static 
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8sFlinkJobState;
@@ -63,6 +69,8 @@ public class FlinkK8sWatcherWrapper {
 
   @Lazy @Autowired private FlinkClusterService flinkClusterService;
 
+  @Lazy @Autowired private FlinkEnvService flinkEnvService;
+
   /** Register FlinkTrackMonitor bean for tracking flink job on kubernetes. */
   @Bean(destroyMethod = "close")
   public FlinkK8sWatcher registerFlinkK8sWatcher() {
@@ -110,19 +118,34 @@ public class FlinkK8sWatcherWrapper {
   }
 
   public TrackId toTrackId(Application app) {
+    FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
+    Properties properties = flinkEnv.getFlinkConfig();
+
+    Map<String, String> dynamicProperties =
+        
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
+    String archiveDir = 
dynamicProperties.get(JobManagerOptions.ARCHIVE_DIR.key());
+    if (archiveDir != null) {
+      properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
+    }
     if (app.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
       return TrackId.onApplication(
           app.getK8sNamespace(),
           app.getJobName(),
           app.getId(),
           app.getJobId(),
-          app.getTeamId().toString());
+          app.getTeamId().toString(),
+          properties);
     } else if (app.getExecutionModeEnum() == 
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
       FlinkCluster flinkCluster = 
flinkClusterService.getById(app.getFlinkClusterId());
       String namespace = flinkCluster.getK8sNamespace();
       String clusterId = flinkCluster.getClusterId();
       return TrackId.onSession(
-          namespace, clusterId, app.getId(), app.getJobId(), 
app.getTeamId().toString());
+          namespace,
+          clusterId,
+          app.getId(),
+          app.getJobId(),
+          app.getTeamId().toString(),
+          properties);
     } else {
       throw new IllegalArgumentException("Illegal K8sExecuteMode, mode=" + 
app.getExecutionMode());
     }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index a7147ce67..7dd7dd393 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 
 import java.lang.{Boolean => JavaBool, Long => JavaLong}
+import java.util.Properties
 
 import scala.util.Try
 
@@ -31,7 +32,8 @@ case class TrackId(
     clusterId: String,
     appId: JavaLong = null,
     jobId: String,
-    groupId: String) {
+    groupId: String,
+    properties: Properties) {
 
   def isLegal: Boolean = {
     executeMode match {
@@ -50,7 +52,7 @@ case class TrackId(
   def toClusterKey: ClusterKey = ClusterKey(executeMode, namespace, clusterId)
 
   override def hashCode(): Int = {
-    Utils.hashCode(executeMode, clusterId, namespace, appId, jobId, groupId)
+    Utils.hashCode(executeMode, clusterId, namespace, appId, jobId, groupId, 
properties)
   }
 
   override def equals(obj: Any): Boolean = {
@@ -61,7 +63,8 @@ case class TrackId(
         this.namespace == that.namespace &&
         this.appId == that.appId &&
         this.jobId == that.jobId &&
-        this.groupId == that.groupId
+        this.groupId == that.groupId &&
+        this.properties == that.properties
       case _ => false
     }
   }
@@ -74,8 +77,9 @@ object TrackId {
       clusterId: String,
       appId: Long,
       jobId: String,
-      groupId: String): TrackId = {
-    this(FlinkK8sExecuteMode.SESSION, namespace, clusterId, appId, jobId, 
groupId)
+      groupId: String,
+      properties: Properties): TrackId = {
+    this(FlinkK8sExecuteMode.SESSION, namespace, clusterId, appId, jobId, 
groupId, properties)
   }
 
   def onApplication(
@@ -83,7 +87,8 @@ object TrackId {
       clusterId: String,
       appId: Long,
       jobId: String = null,
-      groupId: String): TrackId = {
-    this(FlinkK8sExecuteMode.APPLICATION, namespace, clusterId, appId, jobId, 
groupId)
+      groupId: String,
+      properties: Properties): TrackId = {
+    this(FlinkK8sExecuteMode.APPLICATION, namespace, clusterId, appId, jobId, 
groupId, properties)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 9243e10f7..59f2e37a6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -28,6 +28,7 @@ import org.apache.streampark.flink.kubernetes.model._
 
 import com.google.common.base.Charsets
 import com.google.common.io.Files
+import org.apache.flink.configuration.JobManagerOptions
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.hc.client5.http.fluent.Request
@@ -341,7 +342,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure 
process.")
-          
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
+          
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId))
         } else {
           inferFromPreCache(latest)
         }
@@ -495,52 +496,58 @@ private[kubernetes] object JobDetails {
 
 }
 
-private[kubernetes] object FlinkHistoryArchives {
+private[kubernetes] object FlinkHistoryArchives extends Logger {
 
   @transient
   implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
 
   private[this] val FAILED_STATE = "FAILED"
 
-  def getJobStateFromArchiveFile(jobId: String): String = Try {
-    require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId 
cannot be null.")
-    val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
-    FsJobArchivist.getArchivedJsons(archivePath) match {
-      case r if r.isEmpty => FAILED_STATE
-      case r =>
-        r.foreach {
-          a =>
-            if (a.getPath == s"/jobs/$jobId/exceptions") {
-              Try(parse(a.getJson)) match {
-                case Success(ok) =>
-                  val log = (ok \ "root-exception").extractOpt[String].orNull
-                  if (log != null) {
-                    val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
-                    val file = new File(path)
-                    Files.asCharSink(file, Charsets.UTF_8).write(log)
-                  }
-                case _ =>
-              }
-            } else if (a.getPath == "/jobs/overview") {
-              Try(parse(a.getJson)) match {
-                case Success(ok) =>
-                  ok \ "jobs" match {
-                    case JNothing | JNull =>
-                    case JArray(arr) =>
-                      arr.foreach(
-                        x => {
-                          val jid = (x \ "jid").extractOpt[String].orNull
-                          if (jid == jobId) {
-                            return (x \ "state").extractOpt[String].orNull
-                          }
-                        })
-                    case _ =>
-                  }
-                case Failure(_) =>
+  def getJobStateFromArchiveFile(trackId: TrackId): String = Try {
+    require(trackId.jobId != null, "[StreamPark] getJobStateFromArchiveFile: 
JobId cannot be null.")
+    val archiveDir = 
trackId.properties.getProperty(JobManagerOptions.ARCHIVE_DIR.key)
+    if (archiveDir == null) {
+      FAILED_STATE
+    } else {
+      val archivePath = new Path(archiveDir, trackId.jobId)
+      FsJobArchivist.getArchivedJsons(archivePath) match {
+        case r if r.isEmpty => FAILED_STATE
+        case r =>
+          r.foreach {
+            a =>
+              if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
+                Try(parse(a.getJson)) match {
+                  case Success(ok) =>
+                    val log = (ok \ "root-exception").extractOpt[String].orNull
+                    if (log != null) {
+                      val path = 
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
+                      val file = new File(path)
+                      Files.asCharSink(file, Charsets.UTF_8).write(log)
+                      println(" error path: " + path)
+                    }
+                  case _ =>
+                }
+              } else if (a.getPath == "/jobs/overview") {
+                Try(parse(a.getJson)) match {
+                  case Success(ok) =>
+                    ok \ "jobs" match {
+                      case JNothing | JNull =>
+                      case JArray(arr) =>
+                        arr.foreach(
+                          x => {
+                            val jid = (x \ "jid").extractOpt[String].orNull
+                            if (jid == trackId.jobId) {
+                              return (x \ "state").extractOpt[String].orNull
+                            }
+                          })
+                      case _ =>
+                    }
+                  case Failure(_) =>
+                }
               }
-            }
-        }
-        FAILED_STATE
+          }
+          FAILED_STATE
+      }
     }
   }.getOrElse(FAILED_STATE)
 

Reply via email to