This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3169beacd Solve the jobid is empty cause canceled inaccurate (#1600)
3169beacd is described below

commit 3169beacd251bf95a028dad3c85d6e418d7ee2cb
Author: monster <[email protected]>
AuthorDate: Wed Sep 14 21:17:05 2022 +0800

    Solve the jobid is empty cause canceled inaccurate (#1600)
    
    * Solve the jobid is empty cause canceled inaccurate
---
 .../streampark/common/conf/ConfigConst.scala       |  2 ++
 .../console/core/entity/Application.java           |  1 -
 .../core/service/impl/ApplicationServiceImpl.java  |  3 ++
 .../console/core/task/FlinkTrackingTask.java       |  1 -
 .../core/task/K8sFlinkChangeEventListener.java     |  1 -
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 42 ++++++++++++----------
 .../flink/submit/bean/SubmitRequest.scala          |  2 ++
 .../flink/submit/trait/FlinkSubmitTrait.scala      |  2 +-
 8 files changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 832c153a9..bc49cee95 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -40,6 +40,8 @@ object ConfigConst {
 
   val KEY_JOB_ID = "jobId"
 
+  val KEY_FLINK_JOB_ID = "flinkJobId"
+
   val KEY_SEMANTIC = "semantic"
 
   /**
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 88c78c051..93e714102 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -96,7 +96,6 @@ public class Application implements Serializable {
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String appId;
 
-    @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String jobId;
 
     /**
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 3dee66b95..1c0d60251 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -101,6 +101,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -573,6 +574,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         appParam.setLaunch(LaunchState.NEED_LAUNCH.get());
         appParam.setOptionState(OptionState.NONE.getValue());
         appParam.setCreateTime(new Date());
+        appParam.setJobId(new JobID().toHexString());
         appParam.doSetHotParams();
         if (appParam.isUploadJob()) {
             String jarPath = 
WebUtils.getAppTempDir().getAbsolutePath().concat("/").concat(appParam.getJar());
@@ -1262,6 +1264,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
         Map<String, Object> extraParameter = new HashMap<>(0);
         extraParameter.put(ConfigConst.KEY_JOB_ID(), application.getId());
+        extraParameter.put(ConfigConst.KEY_FLINK_JOB_ID(), 
application.getJobId());
 
         if (appParam.getAllowNonRestored()) {
             
extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(),
 true);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 62f7c53d8..e709f6713 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -333,7 +333,6 @@ public class FlinkTrackingTask {
 
         // 2) overview,刚启动第一次获取Overview信息.
         if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
-            application.setJobId(jobOverview.getId());
             application.setTotalTask(jobOverview.getTasks().getTotal());
             application.setOverview(jobOverview.getTasks());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
index 858999b53..cca06a2b5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
@@ -162,7 +162,6 @@ public class K8sFlinkChangeEventListener {
         app.setState(fromK8sFlinkJobState(state).getValue());
 
         // update relevant fields of Application from JobStatusCV
-        app.setJobId(jobStatus.jobId());
         app.setTotalTask(jobStatus.taskTotal());
         if (FlinkJobState.isEndState(state)) {
             app.setOptionState(OptionState.NONE.getValue());
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 46e13c2fc..339bf1bac 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -196,17 +196,13 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
     implicit val pollEmitTime: Long = System.currentTimeMillis
     val clusterId = trackId.clusterId
     val namespace = trackId.namespace
-    logger.info("Enter the touchApplicationJob logic")
+    logger.debug("Enter the touchApplicationJob logic.")
     val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, 
clusterId))
-    lazy val k8sInferResult = 
inferApplicationFlinkJobStateFromK8sEvent(trackId)
-    jobDetails match {
-      case Some(details) =>
-        if (details.jobs.isEmpty) k8sInferResult; else {
-          // just receive the first result
-          val jobDetail = details.jobs.head.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis)
-          Some(jobDetail)
-        }
-      case _ => k8sInferResult
+    if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
+      logger.debug("The normal acquisition fails and the speculative logic is 
used.")
+      inferApplicationFlinkJobStateFromK8sEvent(trackId)
+    } else {
+      Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, 
System.currentTimeMillis))
     }
   }
 
@@ -218,13 +214,18 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
     Try {
       val clusterRestUrl = 
trackController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return
 None)
       // list flink jobs from rest api
-      callJobsOverviewsApi(clusterRestUrl)
+      val v = callJobsOverviewsApi(clusterRestUrl)
+      logger.debug(s"The first visit was successful.")
+      v
     }.getOrElse {
+      logger.debug("Failed to visit remote flink jobs on 
kubernetes-native-mode cluster, and the retry access logic is performed.")
       val clusterRestUrl = 
trackController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
       Try(callJobsOverviewsApi(clusterRestUrl)) match {
-        case Success(s) => s
+        case Success(s) =>
+          logger.debug("The retry is successful.")
+          s
         case Failure(e) =>
-          logError(s"failed to visit remote flink jobs on 
kubernetes-native-mode cluster, errorStack=${e.getMessage}")
+          logger.debug(s"The retry fetch failed, final status failed, 
errorStack=${e.getMessage}.")
           None
       }
     }
@@ -234,12 +235,15 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
    * list flink jobs details from rest api
    */
   private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
-    JobDetails.as(
+    logger.debug(s"Try to access flink's service via 
http:${restUrl}/jobs/overview.")
+    val jobDetails = JobDetails.as(
       Request.get(s"$restUrl/jobs/overview")
         
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
         
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
         .execute.returnContent().asString(StandardCharsets.UTF_8)
     )
+    logger.debug(s"Access flink's service through http success 
jobDetail:${jobDetails.toString}.")
+    jobDetails
   }
 
   /**
@@ -250,8 +254,10 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = 
JobStatusWatcherConfi
   private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: 
TrackId)
                                                        (implicit pollEmitTime: 
Long): Option[JobStatusCV] = {
 
+    logger.debug("Inaccessible to flink the logic to judge the state.")
     // infer from k8s deployment and event
     val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
+    logger.debug(s"Query the local cache 
result:${trackController.canceling.has(trackId).toString},trackId 
${trackId.toString}.")
     val jobState = {
       if (trackController.canceling.has(trackId)) FlinkJobState.CANCELED else {
         // whether deployment exists on kubernetes cluster
@@ -260,20 +266,20 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig 
= JobStatusWatcherConfi
         val isConnection = 
KubernetesDeploymentHelper.isTheK8sConnectionNormal()
 
         if (isDeployExists && !deployStateOfTheError) {
-          logger.info("task Enter the initialization process")
+          logger.debug("Task Enter the initialization process.")
           FlinkJobState.K8S_INITIALIZING
         } else if (isDeployExists && deployStateOfTheError && isConnection) {
           KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, 
trackId.clusterId)
           KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, 
trackId.clusterId)
           IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
-          logger.info("Enter the task failure deletion process")
+          logger.debug("Enter the task failure deletion process.")
           FlinkJobState.FAILED
         } else if (!isDeployExists && isConnection) {
-          logger.info("The deployment is deleted and enters the task failure 
process")
+          logger.debug("The deployment is deleted and enters the task failure 
process.")
           FlinkJobState.FAILED
         }
         else {
-          logger.info("Enter the disconnected state process")
+          logger.debug("Enter the disconnected state process.")
           // determine if the state should be SILENT or LOST
           inferSilentOrLostFromPreCache(latest)
         }
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index e8461fd43..c501aacc4 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -74,6 +74,8 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val jobID: String = extraParameter.get(KEY_JOB_ID).toString
 
+  lazy val flinkJobID: String = extraParameter.get(KEY_FLINK_JOB_ID).toString
+
   lazy val savepointRestoreSettings: SavepointRestoreSettings = {
     lazy val allowNonRestoredState = 
Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
     savePoint match {
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 57ec45112..719f02dfc 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -98,7 +98,7 @@ trait FlinkSubmitTrait extends Logger {
       .safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER, 
submitRequest.resolveOrder.getName)
       .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
submitRequest.appMain)
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, 
extractProgramArgs(submitRequest))
-      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, new 
JobID().toHexString)
+      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.flinkJobID)
 
     val flinkDefaultConfiguration = 
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
     //state.checkpoints.num-retained

Reply via email to