[
https://issues.apache.org/jira/browse/GOBBLIN-2152?focusedWorklogId=934068&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934068
]
ASF GitHub Bot logged work on GOBBLIN-2152:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Sep/24 17:54
Start Date: 10/Sep/24 17:54
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1752428794
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -178,32 +176,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec,
EventSubmitterContext ev
try {
CleanupResult cleanupResult =
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
- if (directoriesToClean.size() !=
cleanupResult.getAttemptedCleanedDirectories().size()) {
- log.warn("Expected to clean up {} directories, but only cleaned up
{}", directoriesToClean.size(),
- cleanupResult.getAttemptedCleanedDirectories().size());
- for (String dir : directoriesToClean) {
- if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
- log.error("Directory {} was not cleaned up, please clean up
manually", dir);
- }
+ for (String dir : directoriesToClean) {
+ if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
+ log.error("Directory {} was not cleaned up, please clean up
manually", dir);
}
}
} catch (Exception e) {
log.error("Failed to cleanup work dirs", e);
}
}
- private Set<String> calculateWorkDirsToDelete(String jobId, Set<String>
workDirsToClean) {
+ protected static Set<String> calculateWorkDirsToDelete(String jobId,
Set<String> workDirsToClean) {
// We want to delete the job-level directory once the job completes as
well, which is the parent of the task staging/output dirs
- Set<String> allDirsToClean = workDirsToClean.stream().map(workDir -> (new
Path(workDir).getParent()).toString()).collect(
- Collectors.toSet());
- allDirsToClean.addAll(workDirsToClean);
+ Set<Path> allDirsToClean =
+ workDirsToClean.stream().map(workDir -> (new
Path(workDir).getParent())).collect(Collectors.toSet());
+
allDirsToClean.addAll(workDirsToClean.stream().map(Path::new).collect(Collectors.toSet()));
// Only delete directories that are associated with the current job,
otherwise
- return allDirsToClean.stream().filter(workDir -> {
- if (!workDir.contains(jobId)) {
- log.warn("Not deleting work dir {} as it does not contain the jobId
{}", workDir, jobId);
+ Set<String> resultSet = new HashSet<>();
+ for (Path dir : allDirsToClean) {
+ if (dir.toString().contains(jobId)) {
+ resultSet.add(dir.toString());
+ } else {
+ log.warn("Skipping deletion of directory {} as it is not associated
with job {}", dir, jobId);
Review Comment:
Added an IOException here
Issue Time Tracking
-------------------
Worklog Id: (was: 934068)
Time Spent: 0.5h (was: 20m)
> Gobblin Temporal Jobs should be properly cleaning up writer staging and
> output dirs
> -----------------------------------------------------------------------------------
>
> Key: GOBBLIN-2152
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2152
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: William Lo
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Gobblin HDFS writers write to a staging directory, which renames to an output
> directory, before committing data to the final destination. This is so that
> commits are atomic and not partially done (unless configured).
> Gobblin needs to clean up these directories in order to manage HDFS space
> properly so that failed jobs do not maintain their files around. It is
> critical to ensure that deletion is only done to folders that are configured
> at a job level, so that any folders that may be shared between jobs or
> concurrently are not deleted mid-execution.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)