This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new b4c40508b [Improve] get flink job state from FsJobArchivist improvement
b4c40508b is described below
commit b4c40508b45954328c07194972ab899621ca86a0
Author: benjobs <[email protected]>
AuthorDate: Thu Apr 18 22:40:51 2024 +0800
[Improve] get flink job state from FsJobArchivist improvement
---
.../apache/streampark/common/conf/Workspace.scala | 3 -
.../streampark/console/core/entity/FlinkEnv.java | 10 +++
.../core/service/impl/ApplicationServiceImpl.java | 13 ++--
.../core/service/impl/SavePointServiceImpl.java | 4 +-
.../console/core/task/FlinkK8sWatcherWrapper.java | 27 ++++++-
.../flink/kubernetes/model/TrackId.scala | 19 +++--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 87 ++++++++++++----------
7 files changed, 102 insertions(+), 61 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index c33baec0b..a1778f9ef 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -56,9 +56,6 @@ object Workspace {
/** project build log path. */
lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
- /** project archives path */
- lazy val ARCHIVES_FILE_PATH = s"${remote.WORKSPACE}/historyserver/archive"
-
}
case class Workspace(storageType: StorageType) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index f9aefa9bd..6decd5de5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -35,6 +35,7 @@ import java.io.File;
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
+import java.util.Properties;
@Getter
@Setter
@@ -91,6 +92,15 @@ public class FlinkEnv implements Serializable {
return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
}
+ @JsonIgnore
+ public Properties getFlinkConfig() {
+ String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
+ Properties flinkConfig = new Properties();
+ Map<String, String> config =
PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
+ flinkConfig.putAll(config);
+ return flinkConfig;
+ }
+
@JsonIgnore
public FlinkVersion getFlinkVersion() {
if (this.flinkVersion == null) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 4ba413501..3cb86d1d1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1635,7 +1635,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
SubmitRequest.apply(
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
- getProperties(application),
+ getProperties(application, flinkEnv),
flinkEnv.getFlinkConf(),
DevelopmentMode.of(application.getJobType()),
application.getId(),
@@ -1749,7 +1749,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
});
}
- private Map<String, Object> getProperties(Application application) {
+ private Map<String, Object> getProperties(Application application, FlinkEnv
flinkEnv) {
Map<String, Object> properties = application.getOptionMap();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
@@ -1787,11 +1787,10 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
- try {
- HadoopUtils.yarnClient();
- properties.put(JobManagerOptions.ARCHIVE_DIR.key(),
Workspace.ARCHIVES_FILE_PATH());
- } catch (Exception e) {
- // skip
+ String archiveDir =
+
flinkEnv.getFlinkConfig().getProperty(JobManagerOptions.ARCHIVE_DIR.key());
+ if (archiveDir != null) {
+ properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 5b59c9ff4..af0271437 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -150,7 +150,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
}
if (cpThreshold == 0) {
- String flinkConfNumRetained =
flinkEnv.convertFlinkYamlAsMap().get(numRetainedKey);
+ String flinkConfNumRetained =
flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
int numRetainedDefaultValue =
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
if (flinkConfNumRetained != null) {
try {
@@ -273,7 +273,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
savepointPath =
-
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
return savepointPath;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
index f63f21b37..c449b4e21 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java
@@ -18,10 +18,13 @@
package org.apache.streampark.console.core.task;
import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
@@ -29,6 +32,7 @@ import
org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.configuration.JobManagerOptions;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
@@ -40,6 +44,8 @@ import org.springframework.context.annotation.Lazy;
import javax.annotation.Nonnull;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
import static
org.apache.streampark.console.core.enums.FlinkAppState.Bridge.toK8sFlinkJobState;
@@ -63,6 +69,8 @@ public class FlinkK8sWatcherWrapper {
@Lazy @Autowired private FlinkClusterService flinkClusterService;
+ @Lazy @Autowired private FlinkEnvService flinkEnvService;
+
/** Register FlinkTrackMonitor bean for tracking flink job on kubernetes. */
@Bean(destroyMethod = "close")
public FlinkK8sWatcher registerFlinkK8sWatcher() {
@@ -110,19 +118,34 @@ public class FlinkK8sWatcherWrapper {
}
public TrackId toTrackId(Application app) {
+ FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
+ Properties properties = flinkEnv.getFlinkConfig();
+
+ Map<String, String> dynamicProperties =
+
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
+ String archiveDir =
dynamicProperties.get(JobManagerOptions.ARCHIVE_DIR.key());
+ if (archiveDir != null) {
+ properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
+ }
if (app.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
return TrackId.onApplication(
app.getK8sNamespace(),
app.getJobName(),
app.getId(),
app.getJobId(),
- app.getTeamId().toString());
+ app.getTeamId().toString(),
+ properties);
} else if (app.getExecutionModeEnum() ==
ExecutionMode.KUBERNETES_NATIVE_SESSION) {
FlinkCluster flinkCluster =
flinkClusterService.getById(app.getFlinkClusterId());
String namespace = flinkCluster.getK8sNamespace();
String clusterId = flinkCluster.getClusterId();
return TrackId.onSession(
- namespace, clusterId, app.getId(), app.getJobId(),
app.getTeamId().toString());
+ namespace,
+ clusterId,
+ app.getId(),
+ app.getJobId(),
+ app.getTeamId().toString(),
+ properties);
} else {
throw new IllegalArgumentException("Illegal K8sExecuteMode, mode=" +
app.getExecutionMode());
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index a7147ce67..7dd7dd393 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import java.lang.{Boolean => JavaBool, Long => JavaLong}
+import java.util.Properties
import scala.util.Try
@@ -31,7 +32,8 @@ case class TrackId(
clusterId: String,
appId: JavaLong = null,
jobId: String,
- groupId: String) {
+ groupId: String,
+ properties: Properties) {
def isLegal: Boolean = {
executeMode match {
@@ -50,7 +52,7 @@ case class TrackId(
def toClusterKey: ClusterKey = ClusterKey(executeMode, namespace, clusterId)
override def hashCode(): Int = {
- Utils.hashCode(executeMode, clusterId, namespace, appId, jobId, groupId)
+ Utils.hashCode(executeMode, clusterId, namespace, appId, jobId, groupId,
properties)
}
override def equals(obj: Any): Boolean = {
@@ -61,7 +63,8 @@ case class TrackId(
this.namespace == that.namespace &&
this.appId == that.appId &&
this.jobId == that.jobId &&
- this.groupId == that.groupId
+ this.groupId == that.groupId &&
+ this.properties == that.properties
case _ => false
}
}
@@ -74,8 +77,9 @@ object TrackId {
clusterId: String,
appId: Long,
jobId: String,
- groupId: String): TrackId = {
- this(FlinkK8sExecuteMode.SESSION, namespace, clusterId, appId, jobId,
groupId)
+ groupId: String,
+ properties: Properties): TrackId = {
+ this(FlinkK8sExecuteMode.SESSION, namespace, clusterId, appId, jobId,
groupId, properties)
}
def onApplication(
@@ -83,7 +87,8 @@ object TrackId {
clusterId: String,
appId: Long,
jobId: String = null,
- groupId: String): TrackId = {
- this(FlinkK8sExecuteMode.APPLICATION, namespace, clusterId, appId, jobId,
groupId)
+ groupId: String,
+ properties: Properties): TrackId = {
+ this(FlinkK8sExecuteMode.APPLICATION, namespace, clusterId, appId, jobId,
groupId, properties)
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 9243e10f7..59f2e37a6 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -28,6 +28,7 @@ import org.apache.streampark.flink.kubernetes.model._
import com.google.common.base.Charsets
import com.google.common.io.Files
+import org.apache.flink.configuration.JobManagerOptions
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.hc.client5.http.fluent.Request
@@ -341,7 +342,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure
process.")
-
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
+
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId))
} else {
inferFromPreCache(latest)
}
@@ -495,52 +496,58 @@ private[kubernetes] object JobDetails {
}
-private[kubernetes] object FlinkHistoryArchives {
+private[kubernetes] object FlinkHistoryArchives extends Logger {
@transient
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
private[this] val FAILED_STATE = "FAILED"
- def getJobStateFromArchiveFile(jobId: String): String = Try {
- require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId
cannot be null.")
- val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
- FsJobArchivist.getArchivedJsons(archivePath) match {
- case r if r.isEmpty => FAILED_STATE
- case r =>
- r.foreach {
- a =>
- if (a.getPath == s"/jobs/$jobId/exceptions") {
- Try(parse(a.getJson)) match {
- case Success(ok) =>
- val log = (ok \ "root-exception").extractOpt[String].orNull
- if (log != null) {
- val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
- val file = new File(path)
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- }
- case _ =>
- }
- } else if (a.getPath == "/jobs/overview") {
- Try(parse(a.getJson)) match {
- case Success(ok) =>
- ok \ "jobs" match {
- case JNothing | JNull =>
- case JArray(arr) =>
- arr.foreach(
- x => {
- val jid = (x \ "jid").extractOpt[String].orNull
- if (jid == jobId) {
- return (x \ "state").extractOpt[String].orNull
- }
- })
- case _ =>
- }
- case Failure(_) =>
+ def getJobStateFromArchiveFile(trackId: TrackId): String = Try {
+ require(trackId.jobId != null, "[StreamPark] getJobStateFromArchiveFile:
JobId cannot be null.")
+ val archiveDir =
trackId.properties.getProperty(JobManagerOptions.ARCHIVE_DIR.key)
+ if (archiveDir == null) {
+ FAILED_STATE
+ } else {
+ val archivePath = new Path(archiveDir, trackId.jobId)
+ FsJobArchivist.getArchivedJsons(archivePath) match {
+ case r if r.isEmpty => FAILED_STATE
+ case r =>
+ r.foreach {
+ a =>
+ if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ val log = (ok \ "root-exception").extractOpt[String].orNull
+ if (log != null) {
+ val path =
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
+ val file = new File(path)
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ println(" error path: " + path)
+ }
+ case _ =>
+ }
+ } else if (a.getPath == "/jobs/overview") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull =>
+ case JArray(arr) =>
+ arr.foreach(
+ x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == trackId.jobId) {
+ return (x \ "state").extractOpt[String].orNull
+ }
+ })
+ case _ =>
+ }
+ case Failure(_) =>
+ }
}
- }
- }
- FAILED_STATE
+ }
+ FAILED_STATE
+ }
}
}.getOrElse(FAILED_STATE)