This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 3169beacd Solve the jobid is empty cause canceled inaccurate (#1600)
3169beacd is described below
commit 3169beacd251bf95a028dad3c85d6e418d7ee2cb
Author: monster <[email protected]>
AuthorDate: Wed Sep 14 21:17:05 2022 +0800
Solve the jobid is empty cause canceled inaccurate (#1600)
* Solve the jobid is empty cause canceled inaccurate
---
.../streampark/common/conf/ConfigConst.scala | 2 ++
.../console/core/entity/Application.java | 1 -
.../core/service/impl/ApplicationServiceImpl.java | 3 ++
.../console/core/task/FlinkTrackingTask.java | 1 -
.../core/task/K8sFlinkChangeEventListener.java | 1 -
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 42 ++++++++++++----------
.../flink/submit/bean/SubmitRequest.scala | 2 ++
.../flink/submit/trait/FlinkSubmitTrait.scala | 2 +-
8 files changed, 32 insertions(+), 22 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 832c153a9..bc49cee95 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -40,6 +40,8 @@ object ConfigConst {
val KEY_JOB_ID = "jobId"
+ val KEY_FLINK_JOB_ID = "flinkJobId"
+
val KEY_SEMANTIC = "semantic"
/**
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 88c78c051..93e714102 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -96,7 +96,6 @@ public class Application implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String appId;
- @TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobId;
/**
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3dee66b95..1c0d60251 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -101,6 +101,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.springframework.beans.factory.annotation.Autowired;
@@ -573,6 +574,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appParam.setLaunch(LaunchState.NEED_LAUNCH.get());
appParam.setOptionState(OptionState.NONE.getValue());
appParam.setCreateTime(new Date());
+ appParam.setJobId(new JobID().toHexString());
appParam.doSetHotParams();
if (appParam.isUploadJob()) {
String jarPath =
WebUtils.getAppTempDir().getAbsolutePath().concat("/").concat(appParam.getJar());
@@ -1262,6 +1264,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
Map<String, Object> extraParameter = new HashMap<>(0);
extraParameter.put(ConfigConst.KEY_JOB_ID(), application.getId());
+ extraParameter.put(ConfigConst.KEY_FLINK_JOB_ID(),
application.getJobId());
if (appParam.getAllowNonRestored()) {
extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(),
true);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 62f7c53d8..e709f6713 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -333,7 +333,6 @@ public class FlinkTrackingTask {
// 2) overview,刚启动第一次获取Overview信息.
if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
- application.setJobId(jobOverview.getId());
application.setTotalTask(jobOverview.getTasks().getTotal());
application.setOverview(jobOverview.getTasks());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
index 858999b53..cca06a2b5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
@@ -162,7 +162,6 @@ public class K8sFlinkChangeEventListener {
app.setState(fromK8sFlinkJobState(state).getValue());
// update relevant fields of Application from JobStatusCV
- app.setJobId(jobStatus.jobId());
app.setTotalTask(jobStatus.taskTotal());
if (FlinkJobState.isEndState(state)) {
app.setOptionState(OptionState.NONE.getValue());
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 46e13c2fc..339bf1bac 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -196,17 +196,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
implicit val pollEmitTime: Long = System.currentTimeMillis
val clusterId = trackId.clusterId
val namespace = trackId.namespace
- logger.info("Enter the touchApplicationJob logic")
+ logger.debug("Enter the touchApplicationJob logic.")
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace,
clusterId))
- lazy val k8sInferResult =
inferApplicationFlinkJobStateFromK8sEvent(trackId)
- jobDetails match {
- case Some(details) =>
- if (details.jobs.isEmpty) k8sInferResult; else {
- // just receive the first result
- val jobDetail = details.jobs.head.toJobStatusCV(pollEmitTime,
System.currentTimeMillis)
- Some(jobDetail)
- }
- case _ => k8sInferResult
+ if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
+ logger.debug("The normal acquisition fails and the speculative logic is
used.")
+ inferApplicationFlinkJobStateFromK8sEvent(trackId)
+ } else {
+ Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime,
System.currentTimeMillis))
}
}
@@ -218,13 +214,18 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
Try {
val clusterRestUrl =
trackController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return
None)
// list flink jobs from rest api
- callJobsOverviewsApi(clusterRestUrl)
+ val v = callJobsOverviewsApi(clusterRestUrl)
+ logger.debug(s"The first visit was successful.")
+ v
}.getOrElse {
+ logger.debug("Failed to visit remote flink jobs on
kubernetes-native-mode cluster, and the retry access logic is performed.")
val clusterRestUrl =
trackController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
Try(callJobsOverviewsApi(clusterRestUrl)) match {
- case Success(s) => s
+ case Success(s) =>
+ logger.debug("The retry is successful.")
+ s
case Failure(e) =>
- logError(s"failed to visit remote flink jobs on
kubernetes-native-mode cluster, errorStack=${e.getMessage}")
+ logger.debug(s"The retry fetch failed, final status failed,
errorStack=${e.getMessage}.")
None
}
}
@@ -234,12 +235,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
* list flink jobs details from rest api
*/
private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
- JobDetails.as(
+ logger.debug(s"Try to access flink's service via
http:${restUrl}/jobs/overview.")
+ val jobDetails = JobDetails.as(
Request.get(s"$restUrl/jobs/overview")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.execute.returnContent().asString(StandardCharsets.UTF_8)
)
+ logger.debug(s"Access flink's service through http success
jobDetail:${jobDetails.toString}.")
+ jobDetails
}
/**
@@ -250,8 +254,10 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig =
JobStatusWatcherConfi
private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId:
TrackId)
(implicit pollEmitTime:
Long): Option[JobStatusCV] = {
+ logger.debug("Inaccessible to flink the logic to judge the state.")
// infer from k8s deployment and event
val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
+ logger.debug(s"Query the local cache
result:${trackController.canceling.has(trackId).toString},trackId
${trackId.toString}.")
val jobState = {
if (trackController.canceling.has(trackId)) FlinkJobState.CANCELED else {
// whether deployment exists on kubernetes cluster
@@ -260,20 +266,20 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig
= JobStatusWatcherConfi
val isConnection =
KubernetesDeploymentHelper.isTheK8sConnectionNormal()
if (isDeployExists && !deployStateOfTheError) {
- logger.info("task Enter the initialization process")
+ logger.debug("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING
} else if (isDeployExists && deployStateOfTheError && isConnection) {
KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace,
trackId.clusterId)
KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace,
trackId.clusterId)
IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
- logger.info("Enter the task failure deletion process")
+ logger.debug("Enter the task failure deletion process.")
FlinkJobState.FAILED
} else if (!isDeployExists && isConnection) {
- logger.info("The deployment is deleted and enters the task failure
process")
+ logger.debug("The deployment is deleted and enters the task failure
process.")
FlinkJobState.FAILED
}
else {
- logger.info("Enter the disconnected state process")
+ logger.debug("Enter the disconnected state process.")
// determine if the state should be SILENT or LOST
inferSilentOrLostFromPreCache(latest)
}
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index e8461fd43..c501aacc4 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -74,6 +74,8 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val jobID: String = extraParameter.get(KEY_JOB_ID).toString
+ lazy val flinkJobID: String = extraParameter.get(KEY_FLINK_JOB_ID).toString
+
lazy val savepointRestoreSettings: SavepointRestoreSettings = {
lazy val allowNonRestoredState =
Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
savePoint match {
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 57ec45112..719f02dfc 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -98,7 +98,7 @@ trait FlinkSubmitTrait extends Logger {
.safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
submitRequest.resolveOrder.getName)
.safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS,
submitRequest.appMain)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS,
extractProgramArgs(submitRequest))
- .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, new
JobID().toHexString)
+ .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
submitRequest.flinkJobID)
val flinkDefaultConfiguration =
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
//state.checkpoints.num-retained