This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new f02fa6683d2 HADOOP-18793. S3A StagingCommitter does not clean up 
staging-uploads directory (#5818)
f02fa6683d2 is described below

commit f02fa6683d2e3599416cb4ff4cfeb6b727116a82
Author: Harunobu Daikoku <hd.11235813.revi1...@gmail.com>
AuthorDate: Sat Jul 8 18:53:54 2023 +0700

    HADOOP-18793. S3A StagingCommitter does not clean up staging-uploads 
directory (#5818)
    
    Contributed by Harunobu Daikoku
---
 .../apache/hadoop/fs/s3a/commit/staging/Paths.java | 13 ++++
 .../fs/s3a/commit/staging/StagingCommitter.java    | 32 ++++++----
 .../fs/s3a/commit/AbstractITCommitProtocol.java    | 24 ++++++++
 .../integration/ITestStagingCommitProtocol.java    | 70 ++++++++++++++++++++++
 4 files changed, 128 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
index 5f9e6e21363..4bf45c2446b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
@@ -227,6 +227,19 @@ public final class Paths {
         MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
   }
 
+  /**
+   * Build a qualified parent path for the temporary multipart upload commit
+   * directory built by {@link 
#getMultipartUploadCommitsDirectory(Configuration, String)}.
+   * @param conf configuration defining default FS.
+   * @param uuid uuid of job
+   * @return a path which can be used for temporary work
+   * @throws IOException on an IO failure.
+   */
+  public static Path getStagingUploadsParentDirectory(Configuration conf,
+      String uuid) throws IOException {
+    return getMultipartUploadCommitsDirectory(conf, uuid).getParent();
+  }
+
   /**
    * Build a qualified temporary path for the multipart upload commit
    * information in the cluster filesystem.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index d764055c45b..31d7693a2d9 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -501,8 +501,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
 
   /**
    * Staging committer cleanup includes calling wrapped committer's
-   * cleanup method, and removing all destination paths in the final
-   * filesystem.
+   * cleanup method, and removing staging uploads path and all
+   * destination paths in the final filesystem.
    * @param commitContext commit context
    * @param suppressExceptions should exceptions be suppressed?
    * @throws IOException IO failures if exceptions are not suppressed.
@@ -515,6 +515,9 @@ public class StagingCommitter extends AbstractS3ACommitter {
     maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
         () -> wrappedCommitter.cleanupJob(
             commitContext.getJobContext()));
+    maybeIgnore(suppressExceptions, "Delete staging uploads path",
+        () -> deleteStagingUploadsParentDirectory(
+            commitContext.getJobContext()));
     maybeIgnore(suppressExceptions, "Delete destination paths",
         () -> deleteDestinationPaths(
             commitContext.getJobContext()));
@@ -543,11 +546,26 @@ public class StagingCommitter extends 
AbstractS3ACommitter {
     }
   }
 
+  /**
+   * Delete the multipart upload staging directory.
+   * @param context job context
+   * @throws IOException IO failure
+   */
+  protected void deleteStagingUploadsParentDirectory(JobContext context)
+          throws IOException {
+    Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory(
+            context.getConfiguration(), getUUID());
+    ignoreIOExceptions(LOG,
+        "Deleting staging uploads path", stagingUploadsPath.toString(),
+        () -> deleteWithWarning(
+            stagingUploadsPath.getFileSystem(getConf()),
+            stagingUploadsPath,
+            true));
+  }
 
   /**
    * Delete the working paths of a job.
    * <ol>
-   *   <li>The job attempt path</li>
    *   <li>{@code $dest/__temporary}</li>
    *   <li>the local working directory for staged files</li>
    * </ol>
@@ -556,14 +574,6 @@ public class StagingCommitter extends AbstractS3ACommitter 
{
    * @throws IOException IO failure
    */
   protected void deleteDestinationPaths(JobContext context) throws IOException 
{
-    Path attemptPath = getJobAttemptPath(context);
-    ignoreIOExceptions(LOG,
-        "Deleting Job attempt Path", attemptPath.toString(),
-        () -> deleteWithWarning(
-            getJobAttemptFileSystem(context),
-            attemptPath,
-            true));
-
     // delete the __temporary directory. This will cause problems
     // if there is >1 task targeting the same dest dir
     deleteWithWarning(getDestFS(),
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index b193cca03db..e517a41629c 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -403,6 +403,30 @@ public abstract class AbstractITCommitProtocol extends 
AbstractCommitITest {
       this.committer = committer;
       conf = job.getConfiguration();
     }
+
+    public Job getJob() {
+      return job;
+    }
+
+    public JobContext getJContext() {
+      return jContext;
+    }
+
+    public TaskAttemptContext getTContext() {
+      return tContext;
+    }
+
+    public AbstractS3ACommitter getCommitter() {
+      return committer;
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public Path getWrittenTextPath() {
+      return writtenTextPath;
+    }
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index bb3031b32c1..dd62064f86d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.commit.staging.integration;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.junit.Test;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -141,6 +143,74 @@ public class ITestStagingCommitProtocol extends 
AbstractITCommitProtocol {
     assertEquals("file", wd.toUri().getScheme());
   }
 
+  @Test
+  public void testStagingUploadsDirectoryCleanedUp() throws Exception {
+    describe("Assert that the staging uploads directory is cleaned up after 
successful commit");
+    JobData jobData = startJob(false);
+    JobContext jContext = jobData.getJContext();
+    TaskAttemptContext tContext = jobData.getTContext();
+    StagingCommitter committer = (StagingCommitter) jobData.getCommitter();
+
+    Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
+            jContext.getConfiguration(),
+            committer.getUUID());
+
+    ContractTestUtils.assertPathExists(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must exist after setupJob",
+            stagingUploadsDir
+    );
+
+    // write output
+    writeTextOutput(tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+
+    commitJob(committer, jContext);
+
+    ContractTestUtils.assertPathDoesNotExist(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must not exist after commitJob",
+            stagingUploadsDir
+    );
+  }
+
+  @Test
+  public void testStagingUploadsDirectoryCleanedUpWithFailure() throws 
Exception {
+    describe("Assert that the staging uploads directory is cleaned up after 
failed commit");
+    JobData jobData = startJob(new FailingCommitterFactory(), false);
+    JobContext jContext = jobData.getJContext();
+    TaskAttemptContext tContext = jobData.getTContext();
+    StagingCommitter committer = (StagingCommitter) jobData.getCommitter();
+
+    Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
+            jContext.getConfiguration(),
+            committer.getUUID());
+
+    ContractTestUtils.assertPathExists(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must exist after setupJob",
+            stagingUploadsDir
+    );
+
+    // do commit
+    committer.commitTask(tContext);
+
+    // now fail job
+    expectSimulatedFailureOnJobCommit(jContext, committer);
+
+    commitJob(committer, jContext);
+
+    expectJobCommitToFail(jContext, committer);
+
+    ContractTestUtils.assertPathDoesNotExist(
+            stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
+            "staging uploads path must not exist after commitJob",
+            stagingUploadsDir
+    );
+  }
+
   /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to