phet commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1750548456


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -56,6 +58,8 @@
 @Slf4j
 public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
 
+  Set<String> resourcesToCleanUp = new HashSet<>();

Review Comment:
   let's avoid an instance member, and just use a local var.  I'm concerned 
about thread-safety, esp, given this initialization in `WorkFulfillmentWorker`:
   ```
   protected Object[] getActivityImplInstances() {
       return new Object[] { new CommitActivityImpl(), new 
GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl() 
};
   }
   ```
   (where we pass in an *instance* of the activity impl, rather than the class)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -178,32 +176,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     try {
       CleanupResult cleanupResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
           calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
-      if (directoriesToClean.size() != 
cleanupResult.getAttemptedCleanedDirectories().size()) {
-        log.warn("Expected to clean up {} directories, but only cleaned up 
{}", directoriesToClean.size(),
-            cleanupResult.getAttemptedCleanedDirectories().size());
-        for (String dir : directoriesToClean) {
-          if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
-            log.error("Directory {} was not cleaned up, please clean up 
manually", dir);
-          }
+      for (String dir : directoriesToClean) {
+        if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
+          log.error("Directory {} was not cleaned up, please clean up 
manually", dir);
         }
       }
     } catch (Exception e) {
       log.error("Failed to cleanup work dirs", e);
     }
   }
 
-  private Set<String> calculateWorkDirsToDelete(String jobId, Set<String> 
workDirsToClean) {
+  protected static Set<String> calculateWorkDirsToDelete(String jobId, 
Set<String> workDirsToClean) {
     // We want to delete the job-level directory once the job completes as 
well, which is the parent of the task staging/output dirs
-    Set<String> allDirsToClean = workDirsToClean.stream().map(workDir -> (new 
Path(workDir).getParent()).toString()).collect(
-        Collectors.toSet());
-    allDirsToClean.addAll(workDirsToClean);
+    Set<Path> allDirsToClean =
+        workDirsToClean.stream().map(workDir -> (new 
Path(workDir).getParent())).collect(Collectors.toSet());
+    
allDirsToClean.addAll(workDirsToClean.stream().map(Path::new).collect(Collectors.toSet()));
 
     // Only delete directories that are associated with the current job, 
otherwise
-    return allDirsToClean.stream().filter(workDir -> {
-      if (!workDir.contains(jobId)) {
-        log.warn("Not deleting work dir {} as it does not contain the jobId 
{}", workDir, jobId);
+    Set<String> resultSet = new HashSet<>();
+    for (Path dir : allDirsToClean) {
+      if (dir.toString().contains(jobId)) {
+        resultSet.add(dir.toString());
+      } else {
+        log.warn("Skipping deletion of directory {} as it is not associated 
with job {}", dir, jobId);

Review Comment:
   also feels like an exception.  to get here, something went wrong between the 
impl and the input.  it should fail loudly, not merely log a message that's 
easily missed



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -136,7 +127,19 @@ protected static List<WorkUnit> 
generateWorkUnitsForJobState(JobState jobState,
     DestinationDatasetHandlerService datasetHandlerService = closer.register(
         new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
     WorkUnitStream handledWorkUnitStream = 
datasetHandlerService.executeHandlers(workUnitStream);
-
+    // Validate every workunit if they have the temp dir props since some 
workunits may be commit steps
+    Iterator<WorkUnit> workUnitIterator = handledWorkUnitStream.getWorkUnits();
+    while (workUnitIterator.hasNext()) {
+      WorkUnit workUnit = workUnitIterator.next();
+      if (workUnit.isMultiWorkUnit()) {
+        List<WorkUnit> workUnitList = ((MultiWorkUnit) 
workUnit).getWorkUnits();
+        for (WorkUnit wu : workUnitList) {
+          collectTaskStagingAndOutputDirsFromWorkUnit(wu, 
this.resourcesToCleanUp);
+        }
+      } else {
+        collectTaskStagingAndOutputDirsFromWorkUnit(workUnit, 
this.resourcesToCleanUp);
+      }
+    }

Review Comment:
   I still believe this deserves its own abstraction:
   ```
   @VisibleForTesting static Set<String> 
calcWorkDirPathsToDelete(Iterator<WorkUnit> wus)
   ```
   that makes it isolated and hence more unit-testable.
   
   (... but if there's a counter-argument against abstraction, please just fill 
me in, so I stop requesting this :) )



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -158,4 +161,16 @@ protected static List<WorkUnit> 
generateWorkUnitsForJobState(JobState jobState,
 
     return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
   }
+
+  private static void collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit 
workUnit, Set<String> resourcesToCleanUp) {
+    if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
+      
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
+    }
+    if (workUnit.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+      
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+    }
+    if (workUnit.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR, 
ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)) {
+      
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));

Review Comment:
   don't you also want to verify that the property `ROW_LEVEL_ERR_FILE` exists?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CleanupResult.java:
##########
@@ -33,5 +33,5 @@
 @RequiredArgsConstructor
 public class CleanupResult {
 
-  @NonNull private Map<String, Boolean> attemptedCleanedDirectories;
+  @NonNull private Map<String, Boolean> deletionSuccessesByDirPath;

Review Comment:
   if we change the class name to clarify that cleanup means deletion, the 
method might be:
   ```
   DirDeletionResult::successesByDirPath
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -178,32 +176,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     try {
       CleanupResult cleanupResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
           calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
-      if (directoriesToClean.size() != 
cleanupResult.getAttemptedCleanedDirectories().size()) {
-        log.warn("Expected to clean up {} directories, but only cleaned up 
{}", directoriesToClean.size(),
-            cleanupResult.getAttemptedCleanedDirectories().size());
-        for (String dir : directoriesToClean) {
-          if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
-            log.error("Directory {} was not cleaned up, please clean up 
manually", dir);
-          }
+      for (String dir : directoriesToClean) {
+        if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
+          log.error("Directory {} was not cleaned up, please clean up 
manually", dir);

Review Comment:
   shouldn't this be an exception?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -126,8 +126,11 @@ public ExecGobblinStats execute(Properties jobProps, 
EventSubmitterContext event
     } finally {
       // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid 
flight
       try {
+        log.info("Cleaning up work dirs for job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
         if (generateWorkUnitResultsOpt.isPresent()) {
           cleanupWorkDirs(wuSpec, eventSubmitterContext, 
generateWorkUnitResultsOpt.get().getWorkDirPathsToCleanup());
+        } else {
+          log.warn("Skipping cleanup of work dirs for job due to no output 
from GenerateWorkUnits");
         }
       } catch (IOException e) {

Review Comment:
   rather than swallowing and merely logging, this seems like an 
`ApplicationFailure`.
   
   the only problem is that there may already be an exception in progress.  for 
that I suggest a boolean:
   ```
   boolean wasSuccessful = false;
   try { // (L104)
     ....
     wasSuccessful = true;
     return new ExecuteGobblinStats(...);
   } catch (Exception e) {
     ...
   } finally {
     try {
       ...
     } catch (IOException e) {
       log.error(...);
       if (wasSuccessful) { // fail only when otherwise successful: DO NOT 
subvert a prior failure already in-progress
         throw ApplicationFailure.newNonRetryableFailureWithCause(...);
     }
   }
   ```



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ExecuteGobblinWorkflowImplTest {
+
+  @Test
+  public void testCalculateWorkDirsDeletion() {

Review Comment:
   great test!  ... just thinking this should be an exception



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Maps;
+
+import io.temporal.failure.ApplicationFailure;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.work.CleanupResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
+  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
+  @Override
+  public CleanupResult delete(WUProcessingSpec workSpec, EventSubmitterContext 
eventSubmitterContext, Set<String> workDirPaths) {
+    //TODO: Emit timers to measure length of cleanup step
+    Optional<String> optJobName = Optional.empty();
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      optJobName = Optional.ofNullable(jobState.getJobName());
+
+      Map<String, Boolean> attemptedCleanedDirectories = 
jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK, 
ConfigurationKeys.DEFAULT_CLEANUP_STAGING_DATA_PER_TASK) ?
+          cleanupStagingDataPerTask(jobState, workDirPaths) : 
cleanupStagingDataForEntireJob(jobState, workDirPaths);
+
+      return new CleanupResult(attemptedCleanedDirectories);
+    } catch (Exception e) {
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed to cleanup temporary folders for job %s", 
optJobName.orElse(UNDEFINED_JOB_NAME)),
+          IOException.class.toString(),
+          new IOException(e)
+      );
+    }
+  }
+
+  private static Map<String, Boolean> cleanupStagingDataPerTask(JobState 
jobState, Set<String> resourcesToClean) throws IOException {
+    log.error("Clean up staging data by task is not supported, will clean up 
job level data instead");
+
+    return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
+  }
+
+  private static Map<String, Boolean> cleanupStagingDataForEntireJob(JobState 
state, Set<String> resourcesToClean) throws IOException {
+    if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) || 
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+      return Maps.newHashMap();
+    }
+    String writerFsUri = 
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri, 
WriterUtils.getFsConfiguration(state));
+    Map<String, Boolean> attemptedCleanedDirectories = new HashMap<>();
+
+    for (String resource : resourcesToClean) {
+      Path pathToClean = new Path(resource);
+      log.info("Cleaning up resource directory " + pathToClean);
+      try {
+        HadoopUtils.deletePath(fs, pathToClean, true);
+        attemptedCleanedDirectories.put(resource, true);
+      } catch (IOException e) {
+        log.error("Failed to clean up resource directory " + pathToClean, e);
+        attemptedCleanedDirectories.put(resource, false);
+      }

Review Comment:
   to allow for activity retry, wherein a prior attempt partially succeeded and 
deleted some but not all of the dirs, there could be cases where the dir 
doesn't exist.  in that case, the "deletion" should still be considered 
success.  failure only when the deletion failed and the dir still remains.  do 
you agree?
   
   the nuance toward avoiding the race condition between checking for existence 
and attempting delete, is to do the existence check in the exception handler:
   ```
   } catch (IOException e) {
     boolean doesExist = pathToClean.exists();
     if (doesExist) {
       log.error(...);
     }
     attemptedDirs.put(resource, !doesExist);
   }
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec, 
EventSubmitterContext ev
     }
   }
 
-  private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
-      throws IOException {
-    return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || 
!jobContext.getCommitSequenceStore()
-        .get().exists(jobState.getJobName());
+  private static Map<String, Long> cleanupStagingDataPerTask(JobState 
jobState, Set<String> resourcesToClean) throws IOException {
+    log.warn("Clean up staging data by task is not supported, will clean up 
job level data instead");
+    return cleanupStagingDataForEntireJob(jobState, resourcesToClean);

Review Comment:
   actually, yes, no need to introduce another config, given the equivalent 
choice already exists!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to