qinf commented on code in PR #1003:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/1003#discussion_r2255715719


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -560,6 +560,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, 
Configuration conf) {
                     && e.getMessage().contains("Checkpointing has not been 
enabled")) {
                 LOG.warn("Checkpointing not enabled for job {}", jobId, e);
                 return Optional.empty();
+            } else if (e instanceof ExecutionException
+                    && e.getMessage() != null
+                    && e.getMessage()
+                            .contains(String.format("Job %s not found", 
jobId.toString()))) {
+                LOG.warn("Job {} not found", jobId, e);

Review Comment:
   @1996fanrui Thanks for the quick feedback, I  have added the background. I 
also add a example here.
   For FINISHED job `6f4257aaf163d19a6fa1519479795c11`:
   1.  when request `GET /jobs/:jobid` -> `GET  
/jobs/6f4257aaf163d19a6fa1519479795c11`, the response is:
     ```json
   {
       "jid": "6f4257aaf163d19a6fa1519479795c11",
       "name": "app442315instance1204699",
       "isStoppable": false,
       "state": "FINISHED",
       "start-time": 1754445330065,
       "end-time": 1754445366256,
       "duration": 36191,
       "maxParallelism": -1,
       "now": 1754445409626,
       "timestamps": {
       "CREATED": 1754445338012,
       "RUNNING": 1754445338616,
       "CANCELED": 0,
       "INITIALIZING": 1754445330065,
       "RECONCILING": 0,
       "FINISHED": 1754445366256,
       "FAILED": 0,
       "CANCELLING": 0,
       "FAILING": 0,
       "RESTARTING": 0,
       "SUSPENDED": 0
       },
     ......
   }
     ```
   2. when  request `GET /jobs/:jobid/checkpoints` -> `GET 
/jobs/6f4257aaf163d19a6fa1519479795c11/checkpoints`,  the  response is:
   ```json
   {
   "errors": [
   "org.apache.flink.runtime.rest.NotFoundException: Job 
6f4257aaf163d19a6fa1519479795c11 not found\n\tat 
org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$2(AbstractCheckpointStatsHandler.java:102)\n\tat
 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n\tat
 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n\tat
 ..."
   }
   ```
   Maybe the current solution catching the exception in `getLastCheckpoint()` 
is somewhat unintuitive here. Another solution is catching the exception `Job  
not found` in `observeLatestCheckpoint()`
   ```Java
   private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String 
jobId) {
           var status = ctx.getResource().getStatus();
           var jobStatus = status.getJobStatus();
           ctx.getFlinkService()
                   .getLastCheckpoint(JobID.fromHexString(jobId), 
ctx.getObserveConfig())
                   .ifPresentOrElse(
                           snapshot -> 
jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
                           () -> {
                               if (ReconciliationUtils.isJobCancelled(status)) {
                                   // For cancelled jobs the observed savepoint 
is always definite,
                                   // so if empty we know the job doesn't have 
any
                                   // checkpoints/savepoints
                                   jobStatus.setUpgradeSavepointPath(null);
                               }
                           });
       }
   ```
   And can check the job is FINISHED by
   ```Java
   JobStatus.FINISHED == resource.getStatus().getJobStatus().getState();
   ```
   Do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to