This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new eba4808c2e [#7840] feat(core): Add job status puller and orphan file
cleaner (#7936)
eba4808c2e is described below
commit eba4808c2eac97dafdc0fd8bb64cc6df751f117f
Author: Jerry Shao <[email protected]>
AuthorDate: Thu Aug 7 10:00:08 2025 +0800
[#7840] feat(core): Add job status puller and orphan file cleaner (#7936)
### What changes were proposed in this pull request?
Add the job status pulling thread and orphan file cleaner thread for job
manager.
### Why are the changes needed?
Fix: #7840
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UTs.
---
.../main/java/org/apache/gravitino/Configs.java | 21 ++
.../java/org/apache/gravitino/job/JobManager.java | 219 ++++++++++++++++++++-
.../java/org/apache/gravitino/meta/JobEntity.java | 13 ++
.../gravitino/storage/relational/JDBCBackend.java | 2 +
.../storage/relational/mapper/JobMetaMapper.java | 3 +
.../mapper/JobMetaSQLProviderFactory.java | 4 +
.../provider/base/JobMetaBaseSQLProvider.java | 8 +
.../postgresql/JobMetaPostgreSQLProvider.java | 8 +
.../gravitino/storage/relational/po/JobPO.java | 1 +
.../storage/relational/service/JobMetaService.java | 16 +-
.../org/apache/gravitino/job/TestJobManager.java | 74 ++++++-
.../relational/service/TestJobMetaService.java | 42 ++++
12 files changed, 398 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index d90e069bc2..829f6330bf 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -424,4 +424,25 @@ public class Configs {
.stringConf()
.checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
.createWithDefault("local");
+
+ public static final ConfigEntry<Long> JOB_STAGING_DIR_KEEP_TIME_IN_MS =
+ new ConfigBuilder("gravitino.job.stagingDirKeepTimeInMs")
+ .doc(
+ "The time in milliseconds to keep the staging files of the
finished job in the job"
+ + " staging directory. The minimum recommended value is 10
minutes if you're "
+ + "not testing.")
+ .version(ConfigConstants.VERSION_1_0_0)
+ .longConf()
+ .checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+ .createWithDefault(7 * 24 * 3600 * 1000L); // Default is 7 days
+
+ public static final ConfigEntry<Long> JOB_STATUS_PULL_INTERVAL_IN_MS =
+ new ConfigBuilder("gravitino.job.statusPullIntervalInMs")
+ .doc(
+ "The interval in milliseconds to pull the job status from the
job executor. The "
+ + "minimum recommended value is 1 minute if you're not
testing.")
+ .version(ConfigConstants.VERSION_1_0_0)
+ .longConf()
+ .checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+ .createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
}
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 4ecb5066b9..e32bb3752c 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -19,9 +19,11 @@
package org.apache.gravitino.job;
+import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -30,6 +32,9 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -52,17 +57,35 @@ import
org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JobManager implements JobOperationDispatcher {
+ private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
+
private static final Pattern PLACEHOLDER_PATTERN =
Pattern.compile("\\{\\{([\\w.-]+)\\}\\}");
+ private static final String JOB_STAGING_DIR =
+ File.separator
+ + "%s"
+ + File.separator
+ + "%s"
+ + File.separator
+ + JobHandle.JOB_ID_PREFIX
+ + "%s";
+
+ private static final long JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS = 600 *
1000L; // 10 minute
+
+ private static final long JOB_STATUS_PULL_MIN_INTERVAL_IN_MS = 60 * 1000L;
// 1 minute
+
private static final int TIMEOUT_IN_MS = 30 * 1000; // 30 seconds
private final EntityStore entityStore;
@@ -73,6 +96,12 @@ public class JobManager implements JobOperationDispatcher {
private final IdGenerator idGenerator;
+ private final long jobStagingDirKeepTimeInMs;
+
+ private final ScheduledExecutorService cleanUpExecutor;
+
+ private final ScheduledExecutorService statusPullExecutor;
+
public JobManager(Config config, EntityStore entityStore, IdGenerator
idGenerator) {
this(config, entityStore, idGenerator, JobExecutorFactory.create(config));
}
@@ -102,6 +131,56 @@ public class JobManager implements JobOperationDispatcher {
String.format("Failed to create staging directory %s",
stagingDirPath));
}
}
+
+ this.jobStagingDirKeepTimeInMs =
config.get(Configs.JOB_STAGING_DIR_KEEP_TIME_IN_MS);
+ if (jobStagingDirKeepTimeInMs < JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS) {
+ LOG.warn(
+ "The job staging directory keep time is set to {} ms, the number is
too small, "
+ + "which will cause frequent cleanup, please set it to a value
larger than {} if "
+ + "you're not using it to do the test.",
+ jobStagingDirKeepTimeInMs,
+ JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS);
+ }
+
+ this.cleanUpExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread thread = new Thread(runnable, "job-staging-dir-cleanup");
+ thread.setDaemon(true);
+ return thread;
+ });
+ long scheduleInterval = jobStagingDirKeepTimeInMs / 10;
+ Preconditions.checkArgument(
+ scheduleInterval != 0,
+ "The schedule interval for "
+ + "job staging directory cleanup cannot be zero, please set the
job staging directory "
+ + "keep time to a value larger than %s ms",
+ JOB_STAGING_DIR_CLEANUP_MIN_TIME_IN_MS);
+
+ cleanUpExecutor.scheduleAtFixedRate(
+ this::cleanUpStagingDirs, scheduleInterval, scheduleInterval,
TimeUnit.MILLISECONDS);
+
+ long jobStatusPullIntervalInMs =
config.get(Configs.JOB_STATUS_PULL_INTERVAL_IN_MS);
+ if (jobStatusPullIntervalInMs < JOB_STATUS_PULL_MIN_INTERVAL_IN_MS) {
+ LOG.warn(
+ "The job status pull interval is set to {} ms, the number is too
small, "
+ + "which will cause frequent job status pull from external job
executor, please set "
+ + "it to a value larger than {} if you're not using it to do the
test.",
+ jobStatusPullIntervalInMs,
+ JOB_STATUS_PULL_MIN_INTERVAL_IN_MS);
+ }
+ this.statusPullExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread thread = new Thread(runnable, "job-status-pull");
+ thread.setDaemon(true);
+ return thread;
+ });
+ statusPullExecutor.scheduleAtFixedRate(
+ this::pullAndUpdateJobStatus,
+ jobStatusPullIntervalInMs,
+ jobStatusPullIntervalInMs,
+ TimeUnit.MILLISECONDS);
}
@Override
@@ -187,6 +266,18 @@ public class JobManager implements JobOperationDispatcher {
jobTemplateName, metalake);
}
+ // Delete all the job staging directories associated with the job template.
+ String jobTemplateStagingPath =
+ stagingDir.getAbsolutePath() + File.separator + metalake +
File.separator + jobTemplateName;
+ File jobTemplateStagingDir = new File(jobTemplateStagingPath);
+ if (jobTemplateStagingDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(jobTemplateStagingDir);
+ } catch (IOException e) {
+ LOG.error("Failed to delete job template staging directory: {}",
jobTemplateStagingPath, e);
+ }
+ }
+
// Delete the job template entity as well as all the jobs associated with
it.
return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(NamespaceUtil.ofJobTemplate(metalake).levels()),
@@ -275,15 +366,10 @@ public class JobManager implements JobOperationDispatcher
{
JobTemplateEntity jobTemplateEntity = getJobTemplate(metalake,
jobTemplateName);
// Create staging directory.
- // TODO(jerry). The job staging directory will be deleted using a
background thread.
long jobId = idGenerator.nextId();
String jobStagingPath =
stagingDir.getAbsolutePath()
- + File.separator
- + metalake
- + File.separator
- + JobHandle.JOB_ID_PREFIX
- + jobId;
+ + String.format(JOB_STAGING_DIR, metalake, jobTemplateName, jobId);
File jobStagingDir = new File(jobStagingPath);
if (!jobStagingDir.mkdirs()) {
throw new RuntimeException(
@@ -351,8 +437,6 @@ public class JobManager implements JobOperationDispatcher {
String.format("Failed to cancel job with ID %s under metalake %s",
jobId, metalake), e);
}
- // TODO(jerry). Implement a background thread to monitor the job status
and update it. Also,
- // we should delete the finished job entities after a certain period of
time.
// Update the job status to CANCELING
JobEntity newJobEntity =
JobEntity.builder()
@@ -388,7 +472,104 @@ public class JobManager implements JobOperationDispatcher
{
@Override
public void close() throws IOException {
jobExecutor.close();
- // TODO(jerry). Implement any necessary cleanup logic for the JobManager.
+ statusPullExecutor.shutdownNow();
+ cleanUpExecutor.shutdownNow();
+ }
+
+ @VisibleForTesting
+ void pullAndUpdateJobStatus() {
+ List<String> metalakes = listInUseMetalakes(entityStore);
+ for (String metalake : metalakes) {
+ // This unnecessary list all the jobs, we need to improve the code to
only list the active
+ // jobs.
+ List<JobEntity> activeJobs =
+ listJobs(metalake, Optional.empty()).stream()
+ .filter(
+ job ->
+ job.status() == JobHandle.Status.QUEUED
+ || job.status() == JobHandle.Status.STARTED
+ || job.status() == JobHandle.Status.CANCELLING)
+ .toList();
+
+ activeJobs.forEach(
+ job -> {
+ JobHandle.Status newStatus =
jobExecutor.getJobStatus(job.jobExecutionId());
+ if (newStatus != job.status()) {
+ JobEntity newJobEntity =
+ JobEntity.builder()
+ .withId(job.id())
+ .withJobExecutionId(job.jobExecutionId())
+ .withJobTemplateName(job.jobTemplateName())
+ .withStatus(newStatus)
+ .withNamespace(job.namespace())
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(job.auditInfo().creator())
+ .withCreateTime(job.auditInfo().createTime())
+
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+ .withLastModifiedTime(Instant.now())
+ .build())
+ .build();
+
+ // Update the job entity with new status.
+ TreeLockUtils.doWithTreeLock(
+ NameIdentifierUtil.ofJob(metalake, job.name()),
+ LockType.WRITE,
+ () -> {
+ try {
+ entityStore.put(newJobEntity, true /* overwrite */);
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to update job entity %s to status %s",
+ newJobEntity, newStatus),
+ e);
+ }
+ });
+ }
+ });
+ }
+ }
+
+ @VisibleForTesting
+ void cleanUpStagingDirs() {
+ List<String> metalakes = listInUseMetalakes(entityStore);
+
+ for (String metalake : metalakes) {
+ List<JobEntity> finishedJobs =
+ listJobs(metalake, Optional.empty()).stream()
+ .filter(
+ job ->
+ job.status() == JobHandle.Status.CANCELLED
+ || job.status() == JobHandle.Status.SUCCEEDED
+ || job.status() == JobHandle.Status.FAILED)
+ .filter(
+ job ->
+ job.finishedAt() > 0
+ && job.finishedAt() + jobStagingDirKeepTimeInMs
+ < System.currentTimeMillis())
+ .toList();
+
+ finishedJobs.forEach(
+ job -> {
+ try {
+ entityStore.delete(
+ NameIdentifierUtil.ofJob(metalake, job.name()),
Entity.EntityType.JOB);
+
+ String jobStagingPath =
+ stagingDir.getAbsolutePath()
+ + String.format(JOB_STAGING_DIR, metalake,
job.jobTemplateName(), job.id());
+ File jobStagingDir = new File(jobStagingPath);
+ if (jobStagingDir.exists()) {
+ FileUtils.deleteDirectory(jobStagingDir);
+ LOG.info("Deleted job staging directory {} for job {}",
jobStagingPath, job.name());
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to delete job and staging directory for job
{}", job.name(), e);
+ }
+ });
+ }
}
@VisibleForTesting
@@ -469,7 +650,7 @@ public class JobManager implements JobOperationDispatcher {
return inputString; // Return as is if the input string is blank
}
- StringBuffer result = new StringBuffer();
+ StringBuilder result = new StringBuilder();
Matcher matcher = PLACEHOLDER_PATTERN.matcher(inputString);
while (matcher.find()) {
@@ -521,4 +702,22 @@ public class JobManager implements JobOperationDispatcher {
throw new RuntimeException(String.format("Failed to fetch file from URI
%s", uri), e);
}
}
+
+ private static List<String> listInUseMetalakes(EntityStore entityStore) {
+ try {
+ List<BaseMetalake> metalakes =
+ TreeLockUtils.doWithRootTreeLock(
+ LockType.READ,
+ () ->
+ entityStore.list(
+ Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE));
+ return metalakes.stream()
+ .filter(
+ m -> (boolean)
m.propertiesMetadata().getOrDefault(m.properties(), PROPERTY_IN_USE))
+ .map(BaseMetalake::name)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list in-use metalakes", e);
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
index 4d2868bca2..6dff1cc84b 100644
--- a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
@@ -48,6 +48,8 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
public static final Field AUDIT_INFO =
Field.required(
"audit_info", AuditInfo.class, "The audit details of the job
template entity.");
+ public static final Field FINISHED_AT =
+ Field.optional("job_finished_at", Long.class, "The time when the job
finished execution.");
private Long id;
private String jobExecutionId;
@@ -55,6 +57,7 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
private String jobTemplateName;
private Namespace namespace;
private AuditInfo auditInfo;
+ private Long finishedAt;
private JobEntity() {}
@@ -66,6 +69,7 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
fields.put(TEMPLATE_NAME, jobTemplateName);
fields.put(STATUS, status);
fields.put(AUDIT_INFO, auditInfo);
+ fields.put(FINISHED_AT, finishedAt);
return Collections.unmodifiableMap(fields);
}
@@ -96,6 +100,10 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
return jobTemplateName;
}
+ public Long finishedAt() {
+ return finishedAt;
+ }
+
@Override
public AuditInfo auditInfo() {
return auditInfo;
@@ -170,6 +178,11 @@ public class JobEntity implements Entity, Auditable,
HasIdentifier {
return this;
}
+ public Builder withFinishedAt(Long finishedAt) {
+ jobEntity.finishedAt = finishedAt;
+ return this;
+ }
+
public JobEntity build() {
jobEntity.validate();
return jobEntity;
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index d512ce29b4..7c92e88a2b 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -307,6 +307,8 @@ public class JDBCBackend implements RelationalBackend {
return PolicyMetaService.getInstance().deletePolicy(ident);
case JOB_TEMPLATE:
return JobTemplateMetaService.getInstance().deleteJobTemplate(ident);
+ case JOB:
+ return JobMetaService.getInstance().deleteJob(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
index a8a45e3888..3bc359756d 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
@@ -67,4 +67,7 @@ public interface JobMetaMapper {
@DeleteProvider(type = JobMetaSQLProviderFactory.class, method =
"deleteJobMetasByLegacyTimeline")
Integer deleteJobMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+ @UpdateProvider(type = JobMetaSQLProviderFactory.class, method =
"softDeleteJobMetaByRunId")
+ Integer softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId);
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
index 25a4c924f6..fccb116a9c 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
@@ -94,4 +94,8 @@ public class JobMetaSQLProviderFactory {
@Param("legacyTimeline") Long legacyTimeline) {
return getProvider().softDeleteJobMetasByLegacyTimeline(legacyTimeline);
}
+
+ public static String softDeleteJobMetaByRunId(@Param("jobRunId") Long
jobRunId) {
+ return getProvider().softDeleteJobMetaByRunId(jobRunId);
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
index 03abcfcdc5..9f013f9586 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
@@ -159,6 +159,14 @@ public class JobMetaBaseSQLProvider {
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}
+ public String softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+ + " WHERE job_run_id = #{jobRunId} AND deleted_at = 0";
+ }
+
public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline")
Long legacyTimeline) {
return "UPDATE "
+ JobMetaMapper.TABLE_NAME
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
index ec9f851d74..d83cd98073 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
@@ -86,6 +86,14 @@ public class JobMetaPostgreSQLProvider extends
JobMetaBaseSQLProvider {
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}
+ public String softDeleteJobMetaByRunId(@Param("jobRunId") Long jobRunId) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) "
+ + " WHERE job_run_id = #{jobRunId} AND deleted_at = 0";
+ }
+
@Override
public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline")
Long legacyTimeline) {
return "UPDATE "
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
index 236a1eb6c3..8cb9d2e379 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
@@ -135,6 +135,7 @@ public class JobPO {
.withStatus(JobHandle.Status.valueOf(jobPO.jobRunStatus))
.withJobTemplateName(jobPO.jobTemplateName)
.withAuditInfo(JsonUtils.anyFieldMapper().readValue(jobPO.auditInfo,
AuditInfo.class))
+ .withFinishedAt(jobPO.jobFinishedAt())
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to deserialize job PO", e);
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
index ebc9c70760..485dba11b7 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -69,7 +69,7 @@ public class JobMetaService {
public JobEntity getJobByIdentifier(NameIdentifier ident) {
String metalakeName = ident.namespace().level(0);
String jobRunId = ident.name();
- Long jobRunIdLong;
+ long jobRunIdLong;
try {
jobRunIdLong =
Long.parseLong(jobRunId.substring(JobHandle.JOB_ID_PREFIX.length()));
} catch (NumberFormatException e) {
@@ -112,6 +112,20 @@ public class JobMetaService {
}
}
+ public boolean deleteJob(NameIdentifier jobIdent) {
+ String jobRunId = jobIdent.name();
+ long jobRunIdLong;
+ try {
+ jobRunIdLong =
Long.parseLong(jobRunId.substring(JobHandle.JOB_ID_PREFIX.length()));
+ } catch (NumberFormatException e) {
+ throw new NoSuchEntityException("Invalid job run ID format %s",
jobRunId);
+ }
+ int result =
+ SessionUtils.doWithCommitAndFetchResult(
+ JobMetaMapper.class, mapper ->
mapper.softDeleteJobMetaByRunId(jobRunIdLong));
+ return result > 0;
+ }
+
public int deleteJobsByLegacyTimeline(long legacyTimeline, int limit) {
// Mark jobs as deleted for finished jobs, so that they can be cleaned up
later
SessionUtils.doWithCommit(
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 25665d6127..1070af6bbd 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -24,8 +24,12 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
@@ -34,6 +38,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
@@ -43,6 +48,7 @@ import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
import org.apache.gravitino.SupportsRelationOperations;
import org.apache.gravitino.connector.job.JobExecutor;
import org.apache.gravitino.exceptions.InUseException;
@@ -54,13 +60,16 @@ import
org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.meta.SchemaVersion;
import org.apache.gravitino.metalake.MetalakeManager;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -87,18 +96,21 @@ public class TestJobManager {
private static JobExecutor jobExecutor;
+ private static IdGenerator idGenerator;
+
@BeforeAll
public static void setUp() throws IllegalAccessException {
config = new Config(false) {};
Random rand = new Random();
- testStagingDir = "test_staging_dir_" + rand.nextInt(1000);
+ testStagingDir = "test_staging_dir_" + rand.nextInt(100);
config.set(Configs.JOB_STAGING_DIR, testStagingDir);
+ config.set(Configs.JOB_STAGING_DIR_KEEP_TIME_IN_MS, 1000L);
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
entityStore = Mockito.mock(EntityStore.class);
jobExecutor = Mockito.mock(JobExecutor.class);
- IdGenerator idGenerator = new RandomIdGenerator();
+ idGenerator = new RandomIdGenerator();
JobManager jm = new JobManager(config, entityStore, idGenerator,
jobExecutor);
jobManager = Mockito.spy(jm);
@@ -529,6 +541,63 @@ public class TestJobManager {
RuntimeException.class, () -> jobManager.cancelJob(metalake,
job.name()));
}
+ @Test
+ public void testPullJobStatus() throws IOException {
+ JobEntity job = newJobEntity("shell_job", JobHandle.Status.QUEUED);
+ BaseMetalake mockMetalake =
+ BaseMetalake.builder()
+ .withName(metalake)
+ .withId(idGenerator.nextId())
+ .withVersion(SchemaVersion.V_0_1)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .build();
+ when(entityStore.list(Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE))
+ .thenReturn(ImmutableList.of(mockMetalake));
+ when(jobManager.listJobs(metalake,
Optional.empty())).thenReturn(ImmutableList.of(job));
+
+
when(jobExecutor.getJobStatus(job.jobExecutionId())).thenReturn(JobHandle.Status.QUEUED);
+ Assertions.assertDoesNotThrow(() -> jobManager.pullAndUpdateJobStatus());
+ verify(entityStore, never()).put(any(), anyBoolean());
+
+
when(jobExecutor.getJobStatus(job.jobExecutionId())).thenReturn(JobHandle.Status.SUCCEEDED);
+ Assertions.assertDoesNotThrow(() -> jobManager.pullAndUpdateJobStatus());
+ verify(entityStore, times(1)).put(any(JobEntity.class), anyBoolean());
+ }
+
+ @Test
+ public void testCleanUpStagingDirs() throws IOException,
InterruptedException {
+ JobEntity job = newJobEntity("shell_job", JobHandle.Status.STARTED);
+ BaseMetalake mockMetalake =
+ BaseMetalake.builder()
+ .withName(metalake)
+ .withId(idGenerator.nextId())
+ .withVersion(SchemaVersion.V_0_1)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .build();
+ when(entityStore.list(Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE))
+ .thenReturn(ImmutableList.of(mockMetalake));
+
+ when(jobManager.listJobs(metalake,
Optional.empty())).thenReturn(ImmutableList.of(job));
+ Assertions.assertDoesNotThrow(() -> jobManager.cleanUpStagingDirs());
+ verify(entityStore, never()).delete(any(), any());
+
+ JobEntity finishedJob = newJobEntity("shell_job",
JobHandle.Status.SUCCEEDED);
+ when(jobManager.listJobs(metalake,
Optional.empty())).thenReturn(ImmutableList.of(finishedJob));
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ Assertions.assertDoesNotThrow(() ->
jobManager.cleanUpStagingDirs());
+ try {
+ verify(entityStore, times(1)).delete(any(), any());
+ return true;
+ } catch (Throwable e) {
+ return false;
+ }
+ });
+ }
+
private static JobTemplateEntity newShellJobTemplateEntity(String name,
String comment) {
ShellJobTemplate shellJobTemplate =
ShellJobTemplate.builder()
@@ -575,6 +644,7 @@ public class TestJobManager {
.withJobExecutionId(rand.nextLong() + "")
.withNamespace(NamespaceUtil.ofJob(metalake))
.withJobTemplateName(templateName)
+ .withFinishedAt(System.currentTimeMillis())
.withStatus(status)
.withAuditInfo(
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
index 6c24addb95..e47cb50b6b 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
@@ -132,6 +132,17 @@ public class TestJobMetaService extends TestJDBCBackend {
JobMetaService.getInstance()
.getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
jobOverwrite.name()));
Assertions.assertEquals(jobOverwrite, updatedJob);
+
+ // Test insert and get job with finishedAt
+ JobEntity finishedJob =
+ TestJobTemplateMetaService.newJobEntity(
+ jobTemplate.name(), JobHandle.Status.SUCCEEDED, METALAKE_NAME);
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(finishedJob, false));
+
+ JobEntity retrievedFinishedJob =
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
finishedJob.name()));
+ Assertions.assertTrue(retrievedFinishedJob.finishedAt() > 0);
}
@Test
@@ -180,4 +191,35 @@ public class TestJobMetaService extends TestJDBCBackend {
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
Assertions.assertTrue(jobs.isEmpty(), "Jobs should be deleted by legacy
timeline");
}
+
+ @Test
+ public void testDeleteJobByIdentifier() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateEntity jobTemplate =
+ TestJobTemplateMetaService.newShellJobTemplateEntity(
+ "test_job_template", "test_comment", METALAKE_NAME);
+ JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+ JobEntity job =
+ TestJobTemplateMetaService.newJobEntity(
+ jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(job, false));
+
+ JobEntity retrievedJob =
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
job.name()));
+ Assertions.assertEquals(job, retrievedJob);
+
+ Assertions.assertTrue(
+ JobMetaService.getInstance()
+ .deleteJob(NameIdentifierUtil.ofJob(METALAKE_NAME, job.name())));
+
+ // Verify that the job is deleted
+ Assertions.assertFalse(
+ JobMetaService.getInstance()
+ .deleteJob(NameIdentifierUtil.ofJob(METALAKE_NAME, job.name())));
+ }
}