phet commented on code in PR #4047:
URL: https://github.com/apache/gobblin/pull/4047#discussion_r1750548456
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -56,6 +58,8 @@
@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
+ Set<String> resourcesToCleanUp = new HashSet<>();
Review Comment:
let's avoid an instance member, and just use a local var. I'm concerned
about thread-safety, esp, given this initialization in `WorkFulfillmentWorker`:
```
protected Object[] getActivityImplInstances() {
return new Object[] { new CommitActivityImpl(), new
GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()
};
}
```
(where we pass in an *instance* of the activity impl, rather than the class)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -178,32 +176,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec,
EventSubmitterContext ev
try {
CleanupResult cleanupResult =
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
- if (directoriesToClean.size() !=
cleanupResult.getAttemptedCleanedDirectories().size()) {
- log.warn("Expected to clean up {} directories, but only cleaned up
{}", directoriesToClean.size(),
- cleanupResult.getAttemptedCleanedDirectories().size());
- for (String dir : directoriesToClean) {
- if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
- log.error("Directory {} was not cleaned up, please clean up
manually", dir);
- }
+ for (String dir : directoriesToClean) {
+ if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
+ log.error("Directory {} was not cleaned up, please clean up
manually", dir);
}
}
} catch (Exception e) {
log.error("Failed to cleanup work dirs", e);
}
}
- private Set<String> calculateWorkDirsToDelete(String jobId, Set<String>
workDirsToClean) {
+ protected static Set<String> calculateWorkDirsToDelete(String jobId,
Set<String> workDirsToClean) {
// We want to delete the job-level directory once the job completes as
well, which is the parent of the task staging/output dirs
- Set<String> allDirsToClean = workDirsToClean.stream().map(workDir -> (new
Path(workDir).getParent()).toString()).collect(
- Collectors.toSet());
- allDirsToClean.addAll(workDirsToClean);
+ Set<Path> allDirsToClean =
+ workDirsToClean.stream().map(workDir -> (new
Path(workDir).getParent())).collect(Collectors.toSet());
+
allDirsToClean.addAll(workDirsToClean.stream().map(Path::new).collect(Collectors.toSet()));
// Only delete directories that are associated with the current job,
otherwise
- return allDirsToClean.stream().filter(workDir -> {
- if (!workDir.contains(jobId)) {
- log.warn("Not deleting work dir {} as it does not contain the jobId
{}", workDir, jobId);
+ Set<String> resultSet = new HashSet<>();
+ for (Path dir : allDirsToClean) {
+ if (dir.toString().contains(jobId)) {
+ resultSet.add(dir.toString());
+ } else {
+ log.warn("Skipping deletion of directory {} as it is not associated
with job {}", dir, jobId);
Review Comment:
also feels like an exception. to get here, something went wrong between the
impl and the input. it should fail loudly, not merely log a message that's
easily missed
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -136,7 +127,19 @@ protected static List<WorkUnit>
generateWorkUnitsForJobState(JobState jobState,
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
-
+ // Validate every workunit if they have the temp dir props since some
workunits may be commit steps
+ Iterator<WorkUnit> workUnitIterator = handledWorkUnitStream.getWorkUnits();
+ while (workUnitIterator.hasNext()) {
+ WorkUnit workUnit = workUnitIterator.next();
+ if (workUnit.isMultiWorkUnit()) {
+ List<WorkUnit> workUnitList = ((MultiWorkUnit)
workUnit).getWorkUnits();
+ for (WorkUnit wu : workUnitList) {
+ collectTaskStagingAndOutputDirsFromWorkUnit(wu,
this.resourcesToCleanUp);
+ }
+ } else {
+ collectTaskStagingAndOutputDirsFromWorkUnit(workUnit,
this.resourcesToCleanUp);
+ }
+ }
Review Comment:
I still believe this deserves its own abstraction:
```
@VisibleForTesting static Set<String>
calcWorkDirPathsToDelete(Iterator<WorkUnit> wus)
```
that makes it isolated and hence more unit-testable.
(... but if there's a counter-argument against abstraction, please just fill
me in, so I stop requesting this :) )
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -158,4 +161,16 @@ protected static List<WorkUnit>
generateWorkUnitsForJobState(JobState jobState,
return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
}
+
+ private static void collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit
workUnit, Set<String> resourcesToCleanUp) {
+ if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
+ }
+ if (workUnit.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+ }
+ if (workUnit.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR,
ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
Review Comment:
don't you also want to verify that the property `ROW_LEVEL_ERR_FILE` exists?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CleanupResult.java:
##########
@@ -33,5 +33,5 @@
@RequiredArgsConstructor
public class CleanupResult {
- @NonNull private Map<String, Boolean> attemptedCleanedDirectories;
+ @NonNull private Map<String, Boolean> deletionSuccessesByDirPath;
Review Comment:
if we change the class name to clarify that cleanup means deletion, the
method might be:
```
DirDeletionResult::successesByDirPath
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -178,32 +176,31 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec,
EventSubmitterContext ev
try {
CleanupResult cleanupResult =
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
- if (directoriesToClean.size() !=
cleanupResult.getAttemptedCleanedDirectories().size()) {
- log.warn("Expected to clean up {} directories, but only cleaned up
{}", directoriesToClean.size(),
- cleanupResult.getAttemptedCleanedDirectories().size());
- for (String dir : directoriesToClean) {
- if (cleanupResult.getAttemptedCleanedDirectories().get(dir)) {
- log.error("Directory {} was not cleaned up, please clean up
manually", dir);
- }
+ for (String dir : directoriesToClean) {
+ if (!cleanupResult.getDeletionSuccessesByDirPath().get(dir)) {
+ log.error("Directory {} was not cleaned up, please clean up
manually", dir);
Review Comment:
shouldn't this be an exception?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -126,8 +126,11 @@ public ExecGobblinStats execute(Properties jobProps,
EventSubmitterContext event
} finally {
// TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid
flight
try {
+ log.info("Cleaning up work dirs for job {}",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
if (generateWorkUnitResultsOpt.isPresent()) {
cleanupWorkDirs(wuSpec, eventSubmitterContext,
generateWorkUnitResultsOpt.get().getWorkDirPathsToCleanup());
+ } else {
+ log.warn("Skipping cleanup of work dirs for job due to no output
from GenerateWorkUnits");
}
} catch (IOException e) {
Review Comment:
rather than swallowing and merely logging, this seems like an
`ApplicationFailure`.
the only problem is that there may already be an exception in progress. for
that I suggest a boolean:
```
boolean wasSuccessful = false;
try { // (L104)
....
wasSuccessful = true;
return new ExecuteGobblinStats(...);
} catch (Exception e) {
...
} finally {
try {
...
} catch (IOException e) {
log.error(...);
if (wasSuccessful) { // fail only when otherwise successful: DO NOT
subvert a prior failure already in-progress
throw ApplicationFailure.newNonRetryableFailureWithCause(...);
}
}
```
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ExecuteGobblinWorkflowImplTest {
+
+ @Test
+ public void testCalculateWorkDirsDeletion() {
Review Comment:
great test! ... just thinking this should be an exception
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Maps;
+
+import io.temporal.failure.ApplicationFailure;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.work.CleanupResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
+ static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
+ @Override
+ public CleanupResult delete(WUProcessingSpec workSpec, EventSubmitterContext
eventSubmitterContext, Set<String> workDirPaths) {
+ //TODO: Emit timers to measure length of cleanup step
+ Optional<String> optJobName = Optional.empty();
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ optJobName = Optional.ofNullable(jobState.getJobName());
+
+ Map<String, Boolean> attemptedCleanedDirectories =
jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK,
ConfigurationKeys.DEFAULT_CLEANUP_STAGING_DATA_PER_TASK) ?
+ cleanupStagingDataPerTask(jobState, workDirPaths) :
cleanupStagingDataForEntireJob(jobState, workDirPaths);
+
+ return new CleanupResult(attemptedCleanedDirectories);
+ } catch (Exception e) {
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed to cleanup temporary folders for job %s",
optJobName.orElse(UNDEFINED_JOB_NAME)),
+ IOException.class.toString(),
+ new IOException(e)
+ );
+ }
+ }
+
+ private static Map<String, Boolean> cleanupStagingDataPerTask(JobState
jobState, Set<String> resourcesToClean) throws IOException {
+ log.error("Clean up staging data by task is not supported, will clean up
job level data instead");
+
+ return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
+ }
+
+ private static Map<String, Boolean> cleanupStagingDataForEntireJob(JobState
state, Set<String> resourcesToClean) throws IOException {
+ if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) ||
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+ return Maps.newHashMap();
+ }
+ String writerFsUri =
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri,
WriterUtils.getFsConfiguration(state));
+ Map<String, Boolean> attemptedCleanedDirectories = new HashMap<>();
+
+ for (String resource : resourcesToClean) {
+ Path pathToClean = new Path(resource);
+ log.info("Cleaning up resource directory " + pathToClean);
+ try {
+ HadoopUtils.deletePath(fs, pathToClean, true);
+ attemptedCleanedDirectories.put(resource, true);
+ } catch (IOException e) {
+ log.error("Failed to clean up resource directory " + pathToClean, e);
+ attemptedCleanedDirectories.put(resource, false);
+ }
Review Comment:
to allow for activity retry, wherein a prior attempt partially succeeded and
deleted some but not all of the dirs, there could be cases where the dir
doesn't exist. in that case, the "deletion" should still be considered
success. failure only when the deletion failed and the dir still remains. do
you agree?
the nuance toward avoiding the race condition between checking for existence
and attempting delete, is to do the existence check in the exception handler:
```
} catch (IOException e) {
boolean doesExist = pathToClean.exists();
if (doesExist) {
log.error(...);
}
attemptedDirs.put(resource, !doesExist);
}
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java:
##########
@@ -96,129 +68,26 @@ public CleanupResult cleanup(WUProcessingSpec workSpec,
EventSubmitterContext ev
}
}
- private boolean canCleanStagingData(JobContext jobContext, JobState jobState)
- throws IOException {
- return jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE ||
!jobContext.getCommitSequenceStore()
- .get().exists(jobState.getJobName());
+ private static Map<String, Long> cleanupStagingDataPerTask(JobState
jobState, Set<String> resourcesToClean) throws IOException {
+ log.warn("Clean up staging data by task is not supported, will clean up
job level data instead");
+ return cleanupStagingDataForEntireJob(jobState, resourcesToClean);
Review Comment:
actually, yes, no need to introduce another config, given the equivalent
choice already exists!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]