This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new f02fa6683d2 HADOOP-18793. S3A StagingCommitter does not clean up staging-uploads directory (#5818) f02fa6683d2 is described below commit f02fa6683d2e3599416cb4ff4cfeb6b727116a82 Author: Harunobu Daikoku <hd.11235813.revi1...@gmail.com> AuthorDate: Sat Jul 8 18:53:54 2023 +0700 HADOOP-18793. S3A StagingCommitter does not clean up staging-uploads directory (#5818) Contributed by Harunobu Daikoku --- .../apache/hadoop/fs/s3a/commit/staging/Paths.java | 13 ++++ .../fs/s3a/commit/staging/StagingCommitter.java | 32 ++++++---- .../fs/s3a/commit/AbstractITCommitProtocol.java | 24 ++++++++ .../integration/ITestStagingCommitProtocol.java | 70 ++++++++++++++++++++++ 4 files changed, 128 insertions(+), 11 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java index 5f9e6e21363..4bf45c2446b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java @@ -227,6 +227,19 @@ public final class Paths { MRJobConfig.APPLICATION_ATTEMPT_ID, 0); } + /** + * Build a qualified parent path for the temporary multipart upload commit + * directory built by {@link #getMultipartUploadCommitsDirectory(Configuration, String)}. + * @param conf configuration defining default FS. + * @param uuid uuid of job + * @return a path which can be used for temporary work + * @throws IOException on an IO failure. + */ + public static Path getStagingUploadsParentDirectory(Configuration conf, + String uuid) throws IOException { + return getMultipartUploadCommitsDirectory(conf, uuid).getParent(); + } + /** * Build a qualified temporary path for the multipart upload commit * information in the cluster filesystem. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index d764055c45b..31d7693a2d9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -501,8 +501,8 @@ public class StagingCommitter extends AbstractS3ACommitter { /** * Staging committer cleanup includes calling wrapped committer's - * cleanup method, and removing all destination paths in the final - * filesystem. + * cleanup method, and removing staging uploads path and all + * destination paths in the final filesystem. * @param commitContext commit context * @param suppressExceptions should exceptions be suppressed? * @throws IOException IO failures if exceptions are not suppressed. @@ -515,6 +515,9 @@ public class StagingCommitter extends AbstractS3ACommitter { maybeIgnore(suppressExceptions, "Cleanup wrapped committer", () -> wrappedCommitter.cleanupJob( commitContext.getJobContext())); + maybeIgnore(suppressExceptions, "Delete staging uploads path", + () -> deleteStagingUploadsParentDirectory( + commitContext.getJobContext())); maybeIgnore(suppressExceptions, "Delete destination paths", () -> deleteDestinationPaths( commitContext.getJobContext())); @@ -543,11 +546,26 @@ public class StagingCommitter extends AbstractS3ACommitter { } } + /** + * Delete the multipart upload staging directory. + * @param context job context + * @throws IOException IO failure + */ + protected void deleteStagingUploadsParentDirectory(JobContext context) + throws IOException { + Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory( + context.getConfiguration(), getUUID()); + ignoreIOExceptions(LOG, + "Deleting staging uploads path", stagingUploadsPath.toString(), + () -> deleteWithWarning( + stagingUploadsPath.getFileSystem(getConf()), + stagingUploadsPath, + true)); + } /** * Delete the working paths of a job. * <ol> - * <li>The job attempt path</li> * <li>{@code $dest/__temporary}</li> * <li>the local working directory for staged files</li> * </ol> @@ -556,14 +574,6 @@ public class StagingCommitter extends AbstractS3ACommitter { * @throws IOException IO failure */ protected void deleteDestinationPaths(JobContext context) throws IOException { - Path attemptPath = getJobAttemptPath(context); - ignoreIOExceptions(LOG, - "Deleting Job attempt Path", attemptPath.toString(), - () -> deleteWithWarning( - getJobAttemptFileSystem(context), - attemptPath, - true)); - // delete the __temporary directory. This will cause problems // if there is >1 task targeting the same dest dir deleteWithWarning(getDestFS(), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index b193cca03db..e517a41629c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -403,6 +403,30 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { this.committer = committer; conf = job.getConfiguration(); } + + public Job getJob() { + return job; + } + + public JobContext getJContext() { + return jContext; + } + + public TaskAttemptContext getTContext() { + return tContext; + } + + public AbstractS3ACommitter getCommitter() { + return committer; + } + + public Configuration getConf() { + return conf; + } + + public Path getWrittenTextPath() { + return writtenTextPath; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java index bb3031b32c1..dd62064f86d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java @@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration; import java.io.IOException; import java.util.UUID; +import org.junit.Test; + import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -141,6 +143,74 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol { assertEquals("file", wd.toUri().getScheme()); } + @Test + public void testStagingUploadsDirectoryCleanedUp() throws Exception { + describe("Assert that the staging uploads directory is cleaned up after successful commit"); + JobData jobData = startJob(false); + JobContext jContext = jobData.getJContext(); + TaskAttemptContext tContext = jobData.getTContext(); + StagingCommitter committer = (StagingCommitter) jobData.getCommitter(); + + Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory( + jContext.getConfiguration(), + committer.getUUID()); + + ContractTestUtils.assertPathExists( + stagingUploadsDir.getFileSystem(jContext.getConfiguration()), + "staging uploads path must exist after setupJob", + stagingUploadsDir + ); + + // write output + writeTextOutput(tContext); + + // do commit + committer.commitTask(tContext); + + commitJob(committer, jContext); + + ContractTestUtils.assertPathDoesNotExist( + stagingUploadsDir.getFileSystem(jContext.getConfiguration()), + "staging uploads path must not exist after commitJob", + stagingUploadsDir + ); + } + + @Test + public void testStagingUploadsDirectoryCleanedUpWithFailure() throws Exception { + describe("Assert that the staging uploads directory is cleaned up after failed commit"); + JobData jobData = startJob(new FailingCommitterFactory(), false); + JobContext jContext = jobData.getJContext(); + TaskAttemptContext tContext = jobData.getTContext(); + StagingCommitter committer = (StagingCommitter) jobData.getCommitter(); + + Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory( + jContext.getConfiguration(), + committer.getUUID()); + + ContractTestUtils.assertPathExists( + stagingUploadsDir.getFileSystem(jContext.getConfiguration()), + "staging uploads path must exist after setupJob", + stagingUploadsDir + ); + + // do commit + committer.commitTask(tContext); + + // now fail job + expectSimulatedFailureOnJobCommit(jContext, committer); + + commitJob(committer, jContext); + + expectJobCommitToFail(jContext, committer); + + ContractTestUtils.assertPathDoesNotExist( + stagingUploadsDir.getFileSystem(jContext.getConfiguration()), + "staging uploads path must not exist after commitJob", + stagingUploadsDir + ); + } + /** * The class provides a overridden implementation of commitJobInternal which * causes the commit failed for the first time then succeed. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org