yuqi1129 commented on code in PR #7936:
URL: https://github.com/apache/gravitino/pull/7936#discussion_r2256078937


##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -388,7 +472,102 @@ public JobEntity cancelJob(String metalake, String jobId) 
throws NoSuchJobExcept
   @Override
   public void close() throws IOException {
     jobExecutor.close();
-    // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+    statusPullExecutor.shutdownNow();
+    cleanUpExecutor.shutdownNow();
+  }
+
+  public void pullAndUpdateJobStatus() {

Review Comment:
   Can be set as a package access level.



##########
core/src/main/java/org/apache/gravitino/job/JobManager.java:
##########
@@ -388,7 +472,102 @@ public JobEntity cancelJob(String metalake, String jobId) 
throws NoSuchJobExcept
   @Override
   public void close() throws IOException {
     jobExecutor.close();
-    // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+    statusPullExecutor.shutdownNow();
+    cleanUpExecutor.shutdownNow();
+  }
+
+  public void pullAndUpdateJobStatus() {
+    List<String> metalakes = listInUseMetalakes(entityStore);
+    for (String metalake : metalakes) {
+      // This unnecessary list all the jobs, we need to improve the code to 
only list the active
+      // jobs.
+      List<JobEntity> activeJobs =
+          listJobs(metalake, Optional.empty()).stream()
+              .filter(
+                  job ->
+                      job.status() == JobHandle.Status.QUEUED
+                          || job.status() == JobHandle.Status.STARTED
+                          || job.status() == JobHandle.Status.CANCELLING)
+              .toList();
+
+      activeJobs.forEach(
+          job -> {
+            JobHandle.Status newStatus = 
jobExecutor.getJobStatus(job.jobExecutionId());
+            if (newStatus != job.status()) {
+              JobEntity newJobEntity =
+                  JobEntity.builder()
+                      .withId(job.id())
+                      .withJobExecutionId(job.jobExecutionId())
+                      .withJobTemplateName(job.jobTemplateName())
+                      .withStatus(newStatus)
+                      .withNamespace(job.namespace())
+                      .withAuditInfo(
+                          AuditInfo.builder()
+                              .withCreator(job.auditInfo().creator())
+                              .withCreateTime(job.auditInfo().createTime())
+                              
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                              .withLastModifiedTime(Instant.now())
+                              .build())
+                      .build();
+
+              // Update the job entity with new status.
+              TreeLockUtils.doWithTreeLock(
+                  NameIdentifierUtil.ofJob(metalake, job.name()),
+                  LockType.WRITE,
+                  () -> {
+                    try {
+                      entityStore.put(newJobEntity, true /* overwrite */);
+                      return null;
+                    } catch (IOException e) {
+                      throw new RuntimeException(
+                          String.format(
+                              "Failed to update job entity %s to status %s",
+                              newJobEntity, newStatus),
+                          e);
+                    }
+                  });
+            }
+          });
+    }
+  }
+
+  public void cleanUpStagingDirs() {
+    List<String> metalakes = listInUseMetalakes(entityStore);
+
+    for (String metalake : metalakes) {
+      List<JobEntity> finishedJobs =
+          listJobs(metalake, Optional.empty()).stream()
+              .filter(
+                  job ->
+                      job.status() == JobHandle.Status.CANCELLED
+                          || job.status() == JobHandle.Status.SUCCEEDED
+                          || job.status() == JobHandle.Status.FAILED)
+              .filter(
+                  job ->
+                      job.finishedAt() > 0

Review Comment:
   When will a job status is canceled, succeeded or failed and the finished at 
is `0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to