qinf commented on code in PR #1003: URL: https://github.com/apache/flink-kubernetes-operator/pull/1003#discussion_r2255715719
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ########## @@ -560,6 +560,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) { && e.getMessage().contains("Checkpointing has not been enabled")) { LOG.warn("Checkpointing not enabled for job {}", jobId, e); return Optional.empty(); + } else if (e instanceof ExecutionException + && e.getMessage() != null + && e.getMessage() + .contains(String.format("Job %s not found", jobId.toString()))) { + LOG.warn("Job {} not found", jobId, e); Review Comment: @1996fanrui Thanks for the quick feedback, I have added the background. I also add a example here. For FINISHED job `6f4257aaf163d19a6fa1519479795c11`: 1. when request `GET /jobs/:jobid` -> `GET /jobs/6f4257aaf163d19a6fa1519479795c11`, the response is: ```json { "jid": "6f4257aaf163d19a6fa1519479795c11", "name": "app442315instance1204699", "isStoppable": false, "state": "FINISHED", "start-time": 1754445330065, "end-time": 1754445366256, "duration": 36191, "maxParallelism": -1, "now": 1754445409626, "timestamps": { "CREATED": 1754445338012, "RUNNING": 1754445338616, "CANCELED": 0, "INITIALIZING": 1754445330065, "RECONCILING": 0, "FINISHED": 1754445366256, "FAILED": 0, "CANCELLING": 0, "FAILING": 0, "RESTARTING": 0, "SUSPENDED": 0 }, ...... } ``` 2. when request `GET /jobs/:jobid/checkpoints` -> `GET /jobs/6f4257aaf163d19a6fa1519479795c11/checkpoints`, the response is: ```json { "errors": [ "org.apache.flink.runtime.rest.NotFoundException: Job 6f4257aaf163d19a6fa1519479795c11 not found\n\tat org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$2(AbstractCheckpointStatsHandler.java:102)\n\tat java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n\tat java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n\tat ..." } ``` Maybe the current solution catching the exception in `getLastCheckpoint()` is somewhat unintuitive here. Another solution is catching the exception `Job not found` in `observeLatestCheckpoint()` ```Java private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) { var status = ctx.getResource().getStatus(); var jobStatus = status.getJobStatus(); ctx.getFlinkService() .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) .ifPresentOrElse( snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), () -> { if (ReconciliationUtils.isJobCancelled(status)) { // For cancelled jobs the observed savepoint is always definite, // so if empty we know the job doesn't have any // checkpoints/savepoints jobStatus.setUpgradeSavepointPath(null); } }); } ``` And can check the job is FINISHED by ```Java JobStatus.FINISHED == resource.getStatus().getJobStatus().getState(); ``` Do you have any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org