anmolanmol1234 commented on code in PR #6716:
URL: https://github.com/apache/hadoop/pull/6716#discussion_r1567132937
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java:
##########
@@ -143,6 +145,20 @@ public final class ManifestCommitterConstants {
*/
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true;
+ /**
+ * Should parallel cleanup try to delete teh base first?
Review Comment:
typo: the
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java:
##########
@@ -142,64 +154,93 @@ protected Result executeStage(
}
Outcome outcome = null;
- IOException exception;
+ IOException exception = null;
+ boolean baseDirDeleted = false;
// to delete.
LOG.info("{}: Deleting job directory {}", getName(), baseDir);
if (args.deleteTaskAttemptDirsInParallel) {
- // Attempt to do a parallel delete of task attempt dirs;
- // don't overreact if a delete fails, but stop trying
- // to delete the others, and fall back to deleting the
- // job dir.
- Path taskSubDir
- = getStageConfig().getJobAttemptTaskSubDir();
- try (DurationInfo info = new DurationInfo(LOG,
- "parallel deletion of task attempts in %s",
- taskSubDir)) {
- RemoteIterator<FileStatus> dirs =
- RemoteIterators.filteringRemoteIterator(
- listStatusIterator(taskSubDir),
- FileStatus::isDirectory);
- TaskPool.foreach(dirs)
- .executeWith(getIOProcessors())
- .stopOnFailure()
- .suppressExceptions(false)
- .run(this::rmTaskAttemptDir);
- getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
-
- if (getLastDeleteException() != null) {
- // one of the task attempts failed.
- throw getLastDeleteException();
+
+ // parallel delete of task attempt dirs.
+
+ if (args.parallelDeleteAttemptBaseDeleteFirst) {
+ // attempt to delete the base dir first.
+ // This can reduce ABFS delete load but may time out
+ // (which the fallback to parallel delete will handle).
+ // on GCS it is slow.
+ try (DurationInfo info = new DurationInfo(LOG, true,
+ "Initial delete of %s", baseDir)) {
+ exception = deleteOneDir(baseDir);
+ if (exception == null) {
+ // success: record this as the outcome, which
+ // will skip the parallel delete.
+ outcome = Outcome.DELETED;
+ baseDirDeleted = true;
+ } else {
+ // failure: log and continue
+ LOG.warn("{}: Exception on initial attempt at deleting base dir
{}\n"
+ + "attempting parallel delete",
+ getName(), baseDir, exception);
+ }
+ }
+ }
+ if (!baseDirDeleted) {
Review Comment:
Do we need to change the state of this variable here because if not it will
go to the next if block
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java:
##########
@@ -142,64 +154,93 @@ protected Result executeStage(
}
Outcome outcome = null;
- IOException exception;
+ IOException exception = null;
+ boolean baseDirDeleted = false;
// to delete.
LOG.info("{}: Deleting job directory {}", getName(), baseDir);
if (args.deleteTaskAttemptDirsInParallel) {
- // Attempt to do a parallel delete of task attempt dirs;
- // don't overreact if a delete fails, but stop trying
- // to delete the others, and fall back to deleting the
- // job dir.
- Path taskSubDir
- = getStageConfig().getJobAttemptTaskSubDir();
- try (DurationInfo info = new DurationInfo(LOG,
- "parallel deletion of task attempts in %s",
- taskSubDir)) {
- RemoteIterator<FileStatus> dirs =
- RemoteIterators.filteringRemoteIterator(
- listStatusIterator(taskSubDir),
- FileStatus::isDirectory);
- TaskPool.foreach(dirs)
- .executeWith(getIOProcessors())
- .stopOnFailure()
- .suppressExceptions(false)
- .run(this::rmTaskAttemptDir);
- getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
-
- if (getLastDeleteException() != null) {
- // one of the task attempts failed.
- throw getLastDeleteException();
+
+ // parallel delete of task attempt dirs.
+
+ if (args.parallelDeleteAttemptBaseDeleteFirst) {
+ // attempt to delete the base dir first.
+ // This can reduce ABFS delete load but may time out
+ // (which the fallback to parallel delete will handle).
+ // on GCS it is slow.
+ try (DurationInfo info = new DurationInfo(LOG, true,
+ "Initial delete of %s", baseDir)) {
+ exception = deleteOneDir(baseDir);
+ if (exception == null) {
+ // success: record this as the outcome, which
+ // will skip the parallel delete.
+ outcome = Outcome.DELETED;
+ baseDirDeleted = true;
+ } else {
+ // failure: log and continue
+ LOG.warn("{}: Exception on initial attempt at deleting base dir
{}\n"
+ + "attempting parallel delete",
+ getName(), baseDir, exception);
+ }
+ }
+ }
+ if (!baseDirDeleted) {
+ // no base delete attempted or it failed.
+ // Attempt to do a parallel delete of task attempt dirs;
+ // don't overreact if a delete fails, but stop trying
+ // to delete the others, and fall back to deleting the
+ // job dir.
+ Path taskSubDir
+ = getStageConfig().getJobAttemptTaskSubDir();
+ try (DurationInfo info = new DurationInfo(LOG, true,
+ "parallel deletion of task attempts in %s",
+ taskSubDir)) {
+ RemoteIterator<FileStatus> dirs =
+ RemoteIterators.filteringRemoteIterator(
+ listStatusIterator(taskSubDir),
+ FileStatus::isDirectory);
+ TaskPool.foreach(dirs)
+ .executeWith(getIOProcessors())
+ .stopOnFailure()
+ .suppressExceptions(false)
+ .run(this::rmTaskAttemptDir);
+ getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
+
+ if (getLastDeleteException() != null) {
+ // one of the task attempts failed.
+ throw getLastDeleteException();
Review Comment:
what is the recovery mechanism here ? If one of the task attempts fail with
an exception, do we intend to retry that delete operation or return failure ?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java:
##########
@@ -582,19 +607,64 @@ protected final Path directoryMustExist(
* Save a task manifest or summary. This will be done by
* writing to a temp path and then renaming.
* If the destination path exists: Delete it.
+ * This will retry so that a rename failure from abfs load or IO errors
+ * will not fail the task.
* @param manifestData the manifest/success file
* @param tempPath temp path for the initial save
* @param finalPath final path for rename.
- * @throws IOException failure to load/parse
+ * @throws IOException failure to rename after retries.
*/
@SuppressWarnings("unchecked")
protected final <T extends AbstractManifestData> void save(T manifestData,
final Path tempPath,
final Path finalPath) throws IOException {
- LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath,
finalPath);
- trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
- operations.save(manifestData, tempPath, true));
- renameFile(tempPath, finalPath);
+
+ int retryCount = 0;
+ RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
+ getStageConfig().getManifestSaveAttempts(),
+ SAVE_SLEEP_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ boolean success = false;
+ while (!success) {
+ try {
+ LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
+ getName(), tempPath, finalPath, retryCount);
+
+ trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, ()
->
+ operations.save(manifestData, tempPath, true));
Review Comment:
Also if rename failed in the first attempt but succeeded in the backend,
will the save operation on tmpPath fail with an error and if yes how to recover
from that
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]