[ 
https://issues.apache.org/jira/browse/GOBBLIN-2152?focusedWorklogId=934068&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934068
 ]

ASF GitHub Bot logged work on GOBBLIN-2152:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Sep/24 17:54
            Start Date: 10/Sep/24 17:54
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1752428794


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -178,32 +176,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     try {
       CleanupResult cleanupResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
           calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
-      if (directoriesToClean.size() != 
cleanupResult.getAttemptedCleanedDirectories().size()) {
-        log.warn("Expected to clean up {} directories, but only cleaned up 
{}", directoriesToClean.size(),
-            cleanupResult.getAttemptedCleanedDirectories().size());
-        for (String dir : directoriesToClean) {
-          if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
-            log.error("Directory {} was not cleaned up, please clean up 
manually", dir);
-          }
+      for (String dir : directoriesToClean) {
+        if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
+          log.error("Directory {} was not cleaned up, please clean up 
manually", dir);
         }
       }
     } catch (Exception e) {
       log.error("Failed to cleanup work dirs", e);
     }
   }
 
-  private Set<String> calculateWorkDirsToDelete(String jobId, Set<String> 
workDirsToClean) {
+  protected static Set<String> calculateWorkDirsToDelete(String jobId, 
Set<String> workDirsToClean) {
     // We want to delete the job-level directory once the job completes as 
well, which is the parent of the task staging/output dirs
-    Set<String> allDirsToClean = workDirsToClean.stream().map(workDir -> (new 
Path(workDir).getParent()).toString()).collect(
-        Collectors.toSet());
-    allDirsToClean.addAll(workDirsToClean);
+    Set<Path> allDirsToClean =
+        workDirsToClean.stream().map(workDir -> (new 
Path(workDir).getParent())).collect(Collectors.toSet());
+    
allDirsToClean.addAll(workDirsToClean.stream().map(Path::new).collect(Collectors.toSet()));
 
     // Only delete directories that are associated with the current job, 
otherwise
-    return allDirsToClean.stream().filter(workDir -> {
-      if (!workDir.contains(jobId)) {
-        log.warn("Not deleting work dir {} as it does not contain the jobId 
{}", workDir, jobId);
+    Set<String> resultSet = new HashSet<>();
+    for (Path dir : allDirsToClean) {
+      if (dir.toString().contains(jobId)) {
+        resultSet.add(dir.toString());
+      } else {
+        log.warn("Skipping deletion of directory {} as it is not associated 
with job {}", dir, jobId);

Review Comment:
   Added an IOException here





Issue Time Tracking
-------------------

    Worklog Id:     (was: 934068)
    Time Spent: 0.5h  (was: 20m)

> Gobblin Temporal Jobs should be properly cleaning up writer staging and 
> output dirs
> -----------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2152
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2152
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Gobblin HDFS writers write to a staging directory, which renames to an output 
> directory, before committing data to the final destination. This is so that 
> commits are atomic and not partially done (unless configured).
> Gobblin needs to clean up these directories in order to manage HDFS space 
> properly so that failed jobs do not maintain their files around. It is 
> critical to ensure that deletion is only done to folders that are configured 
> at a job level, so that any folders that may be shared between jobs or 
> concurrently are not deleted mid-execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to