This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch yarn-session in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit a199c211ac0fa50131e8e110fb60bdf98ca2de58 Author: benjobs <[email protected]> AuthorDate: Fri Nov 25 15:08:36 2022 +0800 [Bug] flink-job archivefile parse bug fixed --- .../common/util/SystemPropertyUtils.scala | 3 ++ .../helper/KubernetesDeploymentHelper.scala | 16 ++++-- .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 36 +++++-------- .../d933fa6c785f0db6dccc6cc05dd43bab.json | 1 + .../flink/kubernetes/FlinkRestJsonTest.scala | 62 ++++++++++++++++++++++ .../flink/core/FlinkStreamingInitializer.scala | 2 - 6 files changed, 92 insertions(+), 28 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala index 211e04683..ce1f7c94b 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala @@ -119,4 +119,7 @@ object SystemPropertyUtils extends Logger { SystemPropertyUtils.set(key, appHome) } } + + def getTmpdir() : String = get("java.io.tmpdir", "temp") + } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index e9922385b..994fdf220 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -83,8 +83,7 @@ object KubernetesDeploymentHelper extends Logger { def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = { tryWithResource(KubernetesRetriever.newK8sClient()) { client => - val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp") - val path = s"$projectPath/$jobId.log" + val path = KubernetesDeploymentHelper.getJobLog(jobId) val file = new File(path) val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog Files.asCharSink(file, Charsets.UTF_8).write(log) @@ -96,8 +95,7 @@ object KubernetesDeploymentHelper extends Logger { tryWithResource(KubernetesRetriever.newK8sClient()) { client => Try { val podName = getPods(nameSpace, jobName).head.getMetadata.getName - val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp") - val path = s"$projectPath/${jobId}_err.log" + val path = KubernetesDeploymentHelper.getJobErrorLog(jobId) val file = new File(path) val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog Files.asCharSink(file, Charsets.UTF_8).write(log) @@ -118,4 +116,14 @@ object KubernetesDeploymentHelper extends Logger { } } + private[kubernetes] def getJobLog(jobId: String): String = { + val tmpPath = SystemPropertyUtils.getTmpdir() + s"$tmpPath/$jobId.log" + } + + private[kubernetes] def getJobErrorLog(jobId: String): String = { + val tmpPath = SystemPropertyUtils.getTmpdir() + s"$tmpPath/${jobId}_err.log" + } + } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 037e68258..7a12aa4f2 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -18,12 +18,11 @@ package org.apache.streampark.flink.kubernetes.watcher import com.google.common.base.Charsets -import com.google.common.io.{FileWriteMode, Files} +import com.google.common.io.Files import org.apache.commons.collections.CollectionUtils import org.apache.flink.core.fs.Path import org.apache.flink.runtime.history.FsJobArchivist -import org.apache.flink.runtime.webmonitor.history.ArchivedJson -import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} +import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.enums.FlinkJobState import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION} import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent @@ -39,7 +38,6 @@ import org.json4s.jackson.JsonMethods.parse import java.io.File import java.nio.charset.StandardCharsets -import java.util import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit} import javax.annotation.Nonnull import javax.annotation.concurrent.ThreadSafe @@ -96,7 +94,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi * single flink job status tracking task */ override def doWatch(): Unit = { - this.synchronized{ + this.synchronized { logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " + Thread.currentThread().getName) // get all legal tracking ids val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return) @@ -444,17 +442,11 @@ private[kubernetes] object FlinkHistoryArchives { if (a.getPath == s"/jobs/$jobId/exceptions") { Try(parse(a.getJson)) match { case Success(ok) => - ok \ "root-exception" match { - case JNothing | JNull => - case JArray(arr) => - arr.foreach(x => { - val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp") - val path = s"${projectPath}/${jobId}_err.log" - val file = new File(path) - val log = (x \ "root-exception").extractOpt[String].orNull - Files.asCharSink(file, Charsets.UTF_8).write(log) - }) - } + val path = KubernetesDeploymentHelper.getJobErrorLog(jobId) + val file = new File(path) + val log = (ok \ "root-exception").extractOpt[String].orNull + Files.asCharSink(file, Charsets.UTF_8).write(log) + case _ => } } else if (a.getPath == "/jobs/overview") { Try(parse(a.getJson)) match { @@ -462,12 +454,12 @@ private[kubernetes] object FlinkHistoryArchives { ok \ "jobs" match { case JNothing | JNull => case JArray(arr) => - arr.foreach(x => { - val jid = (x \ "jid").extractOpt[String].orNull - if (jid == jobId) { - state = (x \ "state").extractOpt[String].orNull - } - }) + arr.foreach(x => { + val jid = (x \ "jid").extractOpt[String].orNull + if (jid == jobId) { + state = (x \ "state").extractOpt[String].orNull + } + }) case _ => } case Failure(_) => diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json new file mode 100644 index 000000000..af2c1f6b6 --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json @@ -0,0 +1 @@ +{"archive":[{"path":"/jobs/overview","json":"{\"jobs\":[{\"jid\":\"d933fa6c785f0db6dccc6cc05dd43bab\",\"name\":\"test555\",\"state\":\"FAILED\",\"start-time\":1669348683915,\"end-time\":1669348702523,\"duration\":18608,\"last-modification\":1669348702523,\"tasks\":{\"total\":1,\"created\":0,\"scheduled\":0,\"deploying\":0,\"running\":0,\"finished\":0,\"canceling\":0,\"canceled\":0,\"failed\":1,\"reconciling\":0,\"initializing\":0}}]}"},{"path":"/jobs/d933fa6c785f0db6dccc6cc05dd43bab/conf [...] \ No newline at end of file diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala index 300989d20..b0d3ed301 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala @@ -16,8 +16,22 @@ */ package org.apache.streampark.flink.kubernetes +import com.google.common.base.Charsets +import com.google.common.io.Files +import org.apache.commons.collections.CollectionUtils +import org.apache.flink.core.fs.Path +import org.apache.flink.runtime.history.FsJobArchivist +import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails} +import org.json4s.DefaultFormats import org.junit.jupiter.api.Test +import org.json4s.{JNothing, JNull} +import org.json4s.JsonAST.JArray +import org.json4s.jackson.JsonMethods.parse + +import java.io.File +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` +import scala.util.{Failure, Success, Try} // scalastyle:off println class FlinkRestJsonTest { @@ -280,4 +294,52 @@ class FlinkRestJsonTest { println(ingressMeta.get) } + @Test def testHistoryArchives(): Unit = { + + @transient + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + + + val state = Try { + val archivePath = new Path("src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json") + val jobId = "d933fa6c785f0db6dccc6cc05dd43bab" + val archivedJson = FsJobArchivist.getArchivedJsons(archivePath) + var state: String = "FAILED" + if (CollectionUtils.isNotEmpty(archivedJson)) { + archivedJson.foreach { a => + if (a.getPath == s"/jobs/$jobId/exceptions") { + Try(parse(a.getJson)) match { + case Success(ok) => + val path = KubernetesDeploymentHelper.getJobErrorLog(jobId) + val file = new File(path) + val log = (ok \ "root-exception").extractOpt[String].orNull + Files.asCharSink(file, Charsets.UTF_8).write(log) + println(" error path: " + path) + case _ => + } + } else if (a.getPath == "/jobs/overview") { + Try(parse(a.getJson)) match { + case Success(ok) => + ok \ "jobs" match { + case JNothing | JNull => + case JArray(arr) => + arr.foreach(x => { + val jid = (x \ "jid").extractOpt[String].orNull + if (jid == jobId) { + state = (x \ "state").extractOpt[String].orNull + } + }) + case _ => + } + case Failure(_) => + } + } + } + } + state + }.getOrElse("FAILED") + + println(state) + } + } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 8189a034c..3e9676d54 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -81,9 +81,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api def initParameter(): FlinkConfiguration = { val argsMap = ParameterTool.fromArgs(args) val config = argsMap.get(KEY_APP_CONF(), null) match { - // scalastyle:off throwerror case null | "" => throw new ExceptionInInitializerError("[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments") - // scalastyle:on throwerror case file => file } val configMap = parseConfig(config)
