gyfora commented on code in PR #260:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/260#discussion_r893132426


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -94,14 +91,64 @@ private void ifRunningMoveToReconciling(JobStatus 
jobStatus, String previousJobS
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the 
cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code 
Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code 
Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param status the target job status
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private void updateJobStatus(
+            CommonStatus<SPEC> status,
+            JobStatusMessage clusterJobStatus,
+            Configuration deployedConfig) {
+        var jobStatus = status.getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);
+        } else {
+            
jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+            LOG.info(
+                    "Job status successfully updated from {} to {}",
+                    previousJobStatus,
+                    jobStatus.getState());
+        }
+
+        if (clusterJobStatus.getJobState() == 
org.apache.flink.api.common.JobStatus.FAILED) {
+            try {
+                var result =
+                        flinkService.requestJobResult(deployedConfig, 
clusterJobStatus.getJobId());
+                result.getSerializedThrowable()
+                        .ifPresent(
+                                t -> {
+                                    var error = 
t.getFullStringifiedStackTrace();
+                                    if (error != null && 
!error.equals(status.getError())) {

Review Comment:
   oh I see, sry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to