This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch log
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/log by this push:
new bb2e111a0 [Improve] Log archive retention for failed tasks
bb2e111a0 is described below
commit bb2e111a0f7822394e6b39ba0de51f9fd51e0d1e
Author: Monster <[email protected]>
AuthorDate: Thu Nov 3 17:29:49 2022 +0800
[Improve] Log archive retention for failed tasks
---
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index acaec9594..6bf452ea2 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,6 +17,8 @@
package org.apache.streampark.flink.kubernetes.watcher
+import com.google.common.base.Charsets
+import com.google.common.io.{FileWriteMode, Files}
import org.apache.commons.collections.CollectionUtils
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
@@ -34,6 +36,7 @@ import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
+import java.io.File
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import javax.annotation.Nonnull
@@ -434,6 +437,24 @@ private[kubernetes] object FlinkHistoryArchives {
val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
if (CollectionUtils.isNotEmpty(archivedJson)) {
+ archivedJson.stream
+ .filter( a => a.getPath.equals(s"/jobs/$jobId/exceptions"))
+ .map( a => {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull => _
+ case JArray(arr) =>
+ arr.foreach(x => {
+ val projectPath = new File("").getCanonicalPath
+ val path = s"$projectPath/$jobId.log"
+ val file = new File(path)
+ val log = (x \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8,
FileWriteMode.APPEND).write(log)
+ })
+ }
+ }
+ })
archivedJson.foreach { a =>
if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {