This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new f2ea733732f1 HADOOP-19047: S3A: Support in-memory tracking of Magic 
Commit data (#6468)
f2ea733732f1 is described below

commit f2ea733732f1aa50e81df8ab013aa21ab36053b8
Author: Syed Shameerur Rahman <rhma...@amazon.com>
AuthorDate: Tue Mar 26 20:59:35 2024 +0530

    HADOOP-19047: S3A: Support in-memory tracking of Magic Commit data (#6468)
    
    If the option fs.s3a.committer.magic.track.commits.in.memory.enabled
    is set to true, then rather than save data about in-progress uploads
    to S3, this information is cached in memory.
    
    If the number of files being committed is low, this will save network IO
    in both the generation of .pending and marker files, and in the scanning
    of task attempt directory trees during task commit.
    
    Contributed by Syed Shameerur Rahman
---
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  18 +++
 .../hadoop/fs/s3a/commit/CommitConstants.java      |  16 +++
 .../fs/s3a/commit/MagicCommitIntegration.java      |  20 +--
 .../commit/magic/InMemoryMagicCommitTracker.java   | 146 +++++++++++++++++++++
 .../fs/s3a/commit/magic/MagicCommitTracker.java    | 106 ++++-----------
 .../s3a/commit/magic/MagicCommitTrackerUtils.java  |  64 +++++++++
 .../fs/s3a/commit/magic/MagicS3GuardCommitter.java | 108 ++++++++++++---
 ...ommitTracker.java => S3MagicCommitTracker.java} | 109 +++------------
 .../site/markdown/tools/hadoop-aws/committers.md   |   7 +
 .../fs/s3a/commit/AbstractITCommitProtocol.java    |  15 ++-
 .../fs/s3a/commit/TestMagicCommitTrackerUtils.java |  64 +++++++++
 .../s3a/commit/magic/ITestMagicCommitProtocol.java |  30 +++++
 .../fs/s3a/commit/terasort/ITestTerasortOnS3A.java |  17 ++-
 13 files changed, 516 insertions(+), 204 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 3aec03766dac..755f1fffbdb1 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -117,6 +117,7 @@ import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
 import org.apache.hadoop.fs.s3a.auth.SignerManager;
 import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
 import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
+import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
 import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
 import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
 import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
@@ -231,6 +232,8 @@ import static 
org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.Token
 import static 
org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME;
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static 
org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
 import static 
org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
@@ -3894,6 +3897,21 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
   @Retries.RetryTranslated
   public FileStatus getFileStatus(final Path f) throws IOException {
     Path path = qualify(f);
+    if (isTrackMagicCommitsInMemoryEnabled(getConf()) && 
isMagicCommitPath(path)) {
+      // Some downstream apps might call getFileStatus for a magic path to get 
the file size.
+      // when commit data is stored in memory construct the dummy 
S3AFileStatus with correct
+      // file size fetched from the memory.
+      if 
(InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) {
+        long len = 
InMemoryMagicCommitTracker.getPathToBytesWritten().get(path);
+        return new S3AFileStatus(len,
+            0L,
+            path,
+            getDefaultBlockSize(path),
+            username,
+            MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME,
+            null);
+      }
+    }
     return trackDurationAndSpan(
         INVOCATION_GET_FILE_STATUS, path, () ->
             innerGetFileStatus(path, false, StatusProbeEnum.ALL));
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 52df58d6a4b4..4f0005509937 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -58,6 +58,10 @@ public final class CommitConstants {
    */
   public static final String PENDINGSET_SUFFIX = ".pendingset";
 
+  /**
+   * Etag name to be returned on non-committed S3 object: {@value}.
+   */
+  public static final String MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME = 
"pending";
 
   /**
    * Prefix to use for config options: {@value}.
@@ -242,6 +246,18 @@ public final class CommitConstants {
    */
   public static final int DEFAULT_COMMITTER_THREADS = 32;
 
+  /**
+   * Should Magic committer track all the pending commits in memory?
+   */
+  public static final String 
FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
+      "fs.s3a.committer.magic.track.commits.in.memory.enabled";
+
+  /**
+   * Default value for {@link 
#FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED}: {@value}.
+   */
+  public static final boolean 
FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
+      false;
+
   /**
    * Path  in the cluster filesystem for temporary data: {@value}.
    * This is for HDFS, not the local filesystem.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
index e6524c91961d..ba1dd400f6d7 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
@@ -26,11 +26,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.Statistic;
-import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
+import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
+import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
 import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
 import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
 
 import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
 
 /**
  * Adds the code needed for S3A to support magic committers.
@@ -105,13 +107,15 @@ public class MagicCommitIntegration extends 
AbstractStoreOperation {
         String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
         getStoreContext().incrementStatistic(
             Statistic.COMMITTER_MAGIC_FILES_CREATED);
-        tracker = new MagicCommitTracker(path,
-            getStoreContext().getBucket(),
-            key,
-            destKey,
-            pendingsetPath,
-            owner.getWriteOperationHelper(),
-            trackerStatistics);
+        if 
(isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) {
+          tracker = new InMemoryMagicCommitTracker(path, 
getStoreContext().getBucket(),
+              key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
+              trackerStatistics);
+        } else {
+          tracker = new S3MagicCommitTracker(path, 
getStoreContext().getBucket(),
+              key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
+              trackerStatistics);
+        }
         LOG.debug("Created {}", tracker);
       } else {
         LOG.warn("File being created has a \"magic\" path, but the filesystem"
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java
new file mode 100644
index 000000000000..8e36b1e485ef
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.util.Preconditions;
+
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;
+
+/**
+ * InMemoryMagicCommitTracker stores the commit data in memory.
+ * The commit data and related data stores are flushed out from
+ * the memory when the task is committed or aborted.
+ */
+public class InMemoryMagicCommitTracker extends MagicCommitTracker {
+
+  /**
+   * Map to store taskAttemptId, and it's corresponding list of pending commit 
data.
+   * The entries in the Map gets removed when a task commits or aborts.
+   */
+  private final static Map<String, List<SinglePendingCommit>> 
TASK_ATTEMPT_ID_TO_MPU_METADATA = new ConcurrentHashMap<>();
+
+  /**
+   * Map to store path of the file, and it's corresponding size.
+   * The entries in the Map gets removed when a task commits or aborts.
+   */
+  private final static Map<Path, Long> PATH_TO_BYTES_WRITTEN = new 
ConcurrentHashMap<>();
+
+  /**
+   * Map to store taskAttemptId, and list of paths to files written by it.
+   * The entries in the Map gets removed when a task commits or aborts.
+   */
+  private final static Map<String, List<Path>> TASK_ATTEMPT_ID_TO_PATH = new 
ConcurrentHashMap<>();
+
+  public InMemoryMagicCommitTracker(Path path,
+      String bucket,
+      String originalDestKey,
+      String destKey,
+      String pendingsetKey,
+      WriteOperationHelper writer,
+      PutTrackerStatistics trackerStatistics) {
+    super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, 
trackerStatistics);
+  }
+
+  @Override
+  public boolean aboutToComplete(String uploadId,
+      List<CompletedPart> parts,
+      long bytesWritten,
+      final IOStatistics iostatistics)
+      throws IOException {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
+        "empty/null upload ID: " + uploadId);
+    Preconditions.checkArgument(parts != null, "No uploaded parts list");
+    Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save");
+
+    // build the commit summary
+    SinglePendingCommit commitData = new SinglePendingCommit();
+    commitData.touch(System.currentTimeMillis());
+    commitData.setDestinationKey(getDestKey());
+    commitData.setBucket(getBucket());
+    commitData.setUri(getPath().toUri().toString());
+    commitData.setUploadId(uploadId);
+    commitData.setText("");
+    commitData.setLength(bytesWritten);
+    commitData.bindCommitData(parts);
+    commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics));
+
+    // extract the taskAttemptId from the path
+    String taskAttemptId = extractTaskAttemptIdFromPath(getPath());
+
+    // store the commit data with taskAttemptId as the key
+    TASK_ATTEMPT_ID_TO_MPU_METADATA.computeIfAbsent(taskAttemptId,
+        k -> Collections.synchronizedList(new ArrayList<>())).add(commitData);
+
+    // store the byteswritten(length) for the corresponding file
+    PATH_TO_BYTES_WRITTEN.put(getPath(), bytesWritten);
+
+    // store the mapping between taskAttemptId and path
+    // This information is used for removing entries from
+    // the map once the taskAttempt is completed/committed.
+    TASK_ATTEMPT_ID_TO_PATH.computeIfAbsent(taskAttemptId,
+        k -> Collections.synchronizedList(new ArrayList<>())).add(getPath());
+
+    LOG.info("commit metadata for {} parts in {}. size: {} byte(s) "
+            + "for the taskAttemptId: {} is stored in memory",
+        parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId);
+    LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
+        getPath(), getPendingPartKey(), commitData);
+
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "InMemoryMagicCommitTracker{");
+    sb.append(", Number of 
taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METADATA.size());
+    sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size());
+    sb.append('}');
+    return sb.toString();
+  }
+
+
+  public static Map<String, List<SinglePendingCommit>> 
getTaskAttemptIdToMpuMetadata() {
+    return TASK_ATTEMPT_ID_TO_MPU_METADATA;
+  }
+
+  public static Map<Path, Long> getPathToBytesWritten() {
+    return PATH_TO_BYTES_WRITTEN;
+  }
+
+  public static Map<String, List<Path>> getTaskAttemptIdToPath() {
+    return TASK_ATTEMPT_ID_TO_PATH;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
index b2e703e1b088..62151658b5aa 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
@@ -18,37 +18,22 @@
 
 package org.apache.hadoop.fs.s3a.commit.magic;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import software.amazon.awssdk.services.s3.model.CompletedPart;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.Retries;
-import org.apache.hadoop.fs.s3a.S3ADataBlocks;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 import org.apache.hadoop.fs.s3a.commit.PutTracker;
-import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
-import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
 import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
-import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
-import org.apache.hadoop.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
-import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
-import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
 
 /**
  * Put tracker for Magic commits.
@@ -56,7 +41,7 @@ import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDura
  * uses any datatype in hadoop-mapreduce.
  */
 @InterfaceAudience.Private
-public class MagicCommitTracker extends PutTracker {
+public abstract class MagicCommitTracker extends PutTracker {
   public static final Logger LOG = LoggerFactory.getLogger(
       MagicCommitTracker.class);
 
@@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker {
   private final Path path;
   private final WriteOperationHelper writer;
   private final String bucket;
-  private static final byte[] EMPTY = new byte[0];
+  protected static final byte[] EMPTY = new byte[0];
   private final PutTrackerStatistics trackerStatistics;
 
   /**
@@ -127,68 +112,11 @@ public class MagicCommitTracker extends PutTracker {
    * @throws IllegalArgumentException bad argument
    */
   @Override
-  public boolean aboutToComplete(String uploadId,
+  public abstract boolean aboutToComplete(String uploadId,
       List<CompletedPart> parts,
       long bytesWritten,
-      final IOStatistics iostatistics)
-      throws IOException {
-    Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
-        "empty/null upload ID: "+ uploadId);
-    Preconditions.checkArgument(parts != null,
-        "No uploaded parts list");
-    Preconditions.checkArgument(!parts.isEmpty(),
-        "No uploaded parts to save");
-
-    // put a 0-byte file with the name of the original under-magic path
-    // Add the final file length as a header
-    // this is done before the task commit, so its duration can be
-    // included in the statistics
-    Map<String, String> headers = new HashMap<>();
-    headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
-    PutObjectRequest originalDestPut = writer.createPutObjectRequest(
-        originalDestKey,
-        0,
-        new PutObjectOptions(true, null, headers), false);
-    upload(originalDestPut, new ByteArrayInputStream(EMPTY));
-
-    // build the commit summary
-    SinglePendingCommit commitData = new SinglePendingCommit();
-    commitData.touch(System.currentTimeMillis());
-    commitData.setDestinationKey(getDestKey());
-    commitData.setBucket(bucket);
-    commitData.setUri(path.toUri().toString());
-    commitData.setUploadId(uploadId);
-    commitData.setText("");
-    commitData.setLength(bytesWritten);
-    commitData.bindCommitData(parts);
-    commitData.setIOStatistics(
-        new IOStatisticsSnapshot(iostatistics));
-
-    byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
-    LOG.info("Uncommitted data pending to file {};"
-            + " commit metadata for {} parts in {}. size: {} byte(s)",
-        path.toUri(), parts.size(), pendingPartKey, bytesWritten);
-    LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
-        path, pendingPartKey, commitData);
-    PutObjectRequest put = writer.createPutObjectRequest(
-        pendingPartKey,
-        bytes.length, null, false);
-    upload(put, new ByteArrayInputStream(bytes));
-    return false;
-
-  }
-  /**
-   * PUT an object.
-   * @param request the request
-   * @param inputStream input stream of data to be uploaded
-   * @throws IOException on problems
-   */
-  @Retries.RetryTranslated
-  private void upload(PutObjectRequest request, InputStream inputStream) 
throws IOException {
-    trackDurationOfInvocation(trackerStatistics, 
COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
-        () -> writer.putObject(request, PutObjectOptions.keepingDirs(),
-            new S3ADataBlocks.BlockUploadData(inputStream), false, null));
-  }
+      IOStatistics iostatistics)
+      throws IOException;
 
   @Override
   public String toString() {
@@ -201,4 +129,28 @@ public class MagicCommitTracker extends PutTracker {
     sb.append('}');
     return sb.toString();
   }
+
+  public String getOriginalDestKey() {
+    return originalDestKey;
+  }
+
+  public String getPendingPartKey() {
+    return pendingPartKey;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public String getBucket() {
+    return bucket;
+  }
+
+  public WriteOperationHelper getWriter() {
+    return writer;
+  }
+
+  public PutTrackerStatistics getTrackerStatistics() {
+    return trackerStatistics;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
new file mode 100644
index 000000000000..2ceac1c8e03d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths;
+
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for the class {@link MagicCommitTracker} and its subclasses.
+ */
+public final class MagicCommitTrackerUtils {
+
+  private MagicCommitTrackerUtils() {
+  }
+
+  /**
+   * The magic path is of the following format.
+   * 
"s3://bucket-name/table-path/__magic_jobId/job-id/taskAttempt/id/tasks/taskAttemptId"
+   * So the third child from the "__magic" path will give the task attempt id.
+   * @param path Path
+   * @return taskAttemptId
+   */
+  public static String extractTaskAttemptIdFromPath(Path path) {
+    List<String> elementsInPath = MagicCommitPaths.splitPathToElements(path);
+    List<String> childrenOfMagicPath = 
MagicCommitPaths.magicPathChildren(elementsInPath);
+
+    checkArgument(childrenOfMagicPath.size() >= 3, "Magic Path is invalid");
+    // 3rd child of the magic path is the taskAttemptId
+    return childrenOfMagicPath.get(3);
+  }
+
+  /**
+   * Is tracking of magic commit data in-memory enabled.
+   * @param conf Configuration
+   * @return true if in memory tracking of commit data is enabled.
+   */
+  public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) 
{
+    return conf.getBoolean(
+        CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
+        
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 518831b7d433..5ed1a3abd464 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a.commit.magic;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -48,8 +49,8 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
-import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
 import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 
 /**
@@ -192,23 +193,9 @@ public class MagicS3GuardCommitter extends 
AbstractS3ACommitter {
    */
   private PendingSet innerCommitTask(
       TaskAttemptContext context) throws IOException {
-    Path taskAttemptPath = getTaskAttemptPath(context);
     // load in all pending commits.
-    CommitOperations actions = getCommitOperations();
-    PendingSet pendingSet;
+    PendingSet pendingSet = loadPendingCommits(context);
     try (CommitContext commitContext = initiateTaskOperation(context)) {
-      Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
-          loaded = actions.loadSinglePendingCommits(
-              taskAttemptPath, true, commitContext);
-      pendingSet = loaded.getKey();
-      List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
-      if (!failures.isEmpty()) {
-        // At least one file failed to load
-        // revert all which did; report failure with first exception
-        LOG.error("At least one commit file could not be read: failing");
-        abortPendingUploads(commitContext, pendingSet.getCommits(), true);
-        throw failures.get(0).getValue();
-      }
       // patch in IDs
       String jobId = getUUID();
       String taskId = String.valueOf(context.getTaskAttemptID());
@@ -248,6 +235,84 @@ public class MagicS3GuardCommitter extends 
AbstractS3ACommitter {
     return pendingSet;
   }
 
+  /**
+   * Loads pending commits from either memory or from the remote store (S3) 
based on the config.
+   * @param context TaskAttemptContext
+   * @return All pending commit data for the given TaskAttemptContext
+   * @throws IOException
+   *           if there is an error trying to read the commit data
+   */
+  protected PendingSet loadPendingCommits(TaskAttemptContext context) throws 
IOException {
+    PendingSet pendingSet = new PendingSet();
+    if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+      // load from memory
+      List<SinglePendingCommit> pendingCommits = 
loadPendingCommitsFromMemory(context);
+
+      for (SinglePendingCommit singleCommit : pendingCommits) {
+        // aggregate stats
+        pendingSet.getIOStatistics()
+            .aggregate(singleCommit.getIOStatistics());
+        // then clear so they aren't marshalled again.
+        singleCommit.getIOStatistics().clear();
+      }
+      pendingSet.setCommits(pendingCommits);
+    } else {
+      // Load from remote store
+      CommitOperations actions = getCommitOperations();
+      Path taskAttemptPath = getTaskAttemptPath(context);
+      try (CommitContext commitContext = initiateTaskOperation(context)) {
+        Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded =
+            actions.loadSinglePendingCommits(taskAttemptPath, true, 
commitContext);
+        pendingSet = loaded.getKey();
+        List<Pair<LocatedFileStatus, IOException>> failures = 
loaded.getValue();
+        if (!failures.isEmpty()) {
+          // At least one file failed to load
+          // revert all which did; report failure with first exception
+          LOG.error("At least one commit file could not be read: failing");
+          abortPendingUploads(commitContext, pendingSet.getCommits(), true);
+          throw failures.get(0).getValue();
+        }
+      }
+    }
+    return pendingSet;
+  }
+
+  /**
+   * Loads the pending commits from the memory data structure for a given 
taskAttemptId.
+   * @param context TaskContext
+   * @return list of pending commits
+   */
+  private List<SinglePendingCommit> 
loadPendingCommitsFromMemory(TaskAttemptContext context) {
+    String taskAttemptId = String.valueOf(context.getTaskAttemptID());
+    // get all the pending commit metadata associated with the taskAttemptId.
+    // This will also remove the entry from the map.
+    List<SinglePendingCommit> pendingCommits =
+        
InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetadata().remove(taskAttemptId);
+    // get all the path/files associated with the taskAttemptId.
+    // This will also remove the entry from the map.
+    List<Path> pathsAssociatedWithTaskAttemptId =
+        
InMemoryMagicCommitTracker.getTaskAttemptIdToPath().remove(taskAttemptId);
+
+    // for each of the path remove the entry from map,
+    // This is done so that there is no memory leak.
+    if (pathsAssociatedWithTaskAttemptId != null) {
+      for (Path path : pathsAssociatedWithTaskAttemptId) {
+        boolean cleared =
+            InMemoryMagicCommitTracker.getPathToBytesWritten().remove(path) != 
null;
+        LOG.debug("Removing path: {} from the memory isSuccess: {}", path, 
cleared);
+      }
+    } else {
+      LOG.debug("No paths to remove for taskAttemptId: {}", taskAttemptId);
+    }
+
+    if (pendingCommits == null || pendingCommits.isEmpty()) {
+      LOG.info("No commit data present for the taskAttemptId: {} in the 
memory", taskAttemptId);
+      return new ArrayList<>();
+    }
+
+    return pendingCommits;
+  }
+
   /**
    * Abort a task. Attempt load then abort all pending files,
    * then try to delete the task attempt path.
@@ -264,9 +329,14 @@ public class MagicS3GuardCommitter extends 
AbstractS3ACommitter {
     try (DurationInfo d = new DurationInfo(LOG,
         "Abort task %s", context.getTaskAttemptID());
         CommitContext commitContext = initiateTaskOperation(context)) {
-      getCommitOperations().abortAllSinglePendingCommits(attemptPath,
-          commitContext,
-          true);
+      if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+        List<SinglePendingCommit> pendingCommits = 
loadPendingCommitsFromMemory(context);
+        for (SinglePendingCommit singleCommit : pendingCommits) {
+          commitContext.abortSingleCommit(singleCommit);
+        }
+      } else {
+        getCommitOperations().abortAllSinglePendingCommits(attemptPath, 
commitContext, true);
+      }
     } finally {
       deleteQuietly(
           attemptPath.getFileSystem(context.getConfiguration()),
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
similarity index 56%
copy from 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
copy to 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
index b2e703e1b088..0ab3cee5201e 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
@@ -27,17 +27,12 @@ import java.util.Map;
 
 import software.amazon.awssdk.services.s3.model.CompletedPart;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Retries;
 import org.apache.hadoop.fs.s3a.S3ADataBlocks;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
-import org.apache.hadoop.fs.s3a.commit.PutTracker;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
 import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
@@ -45,87 +40,25 @@ import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
 import org.apache.hadoop.util.Preconditions;
 
-import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
 import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
 
 /**
- * Put tracker for Magic commits.
- * <p>Important</p>: must not directly or indirectly import a class which
- * uses any datatype in hadoop-mapreduce.
+ * Stores the commit data under the magic path.
  */
-@InterfaceAudience.Private
-public class MagicCommitTracker extends PutTracker {
-  public static final Logger LOG = LoggerFactory.getLogger(
-      MagicCommitTracker.class);
-
-  private final String originalDestKey;
-  private final String pendingPartKey;
-  private final Path path;
-  private final WriteOperationHelper writer;
-  private final String bucket;
-  private static final byte[] EMPTY = new byte[0];
-  private final PutTrackerStatistics trackerStatistics;
+public class S3MagicCommitTracker extends MagicCommitTracker {
 
-  /**
-   * Magic commit tracker.
-   * @param path path nominally being written to
-   * @param bucket dest bucket
-   * @param originalDestKey the original key, in the magic directory.
-   * @param destKey key for the destination
-   * @param pendingsetKey key of the pendingset file
-   * @param writer writer instance to use for operations; includes audit span
-   * @param trackerStatistics tracker statistics
-   */
-  public MagicCommitTracker(Path path,
+  public S3MagicCommitTracker(Path path,
       String bucket,
       String originalDestKey,
       String destKey,
       String pendingsetKey,
       WriteOperationHelper writer,
       PutTrackerStatistics trackerStatistics) {
-    super(destKey);
-    this.bucket = bucket;
-    this.path = path;
-    this.originalDestKey = originalDestKey;
-    this.pendingPartKey = pendingsetKey;
-    this.writer = writer;
-    this.trackerStatistics = requireNonNull(trackerStatistics);
-    LOG.info("File {} is written as magic file to path {}",
-        path, destKey);
-  }
-
-  /**
-   * Initialize the tracker.
-   * @return true, indicating that the multipart commit must start.
-   * @throws IOException any IO problem.
-   */
-  @Override
-  public boolean initialize() throws IOException {
-    return true;
-  }
-
-  /**
-   * Flag to indicate that output is not visible after the stream
-   * is closed.
-   * @return true
-   */
-  @Override
-  public boolean outputImmediatelyVisible() {
-    return false;
+    super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, 
trackerStatistics);
   }
 
-  /**
-   * Complete operation: generate the final commit data, put it.
-   * @param uploadId Upload ID
-   * @param parts list of parts
-   * @param bytesWritten bytes written
-   * @param iostatistics nullable IO statistics
-   * @return false, indicating that the commit must fail.
-   * @throws IOException any IO problem.
-   * @throws IllegalArgumentException bad argument
-   */
   @Override
   public boolean aboutToComplete(String uploadId,
       List<CompletedPart> parts,
@@ -145,8 +78,8 @@ public class MagicCommitTracker extends PutTracker {
     // included in the statistics
     Map<String, String> headers = new HashMap<>();
     headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
-    PutObjectRequest originalDestPut = writer.createPutObjectRequest(
-        originalDestKey,
+    PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
+        getOriginalDestKey(),
         0,
         new PutObjectOptions(true, null, headers), false);
     upload(originalDestPut, new ByteArrayInputStream(EMPTY));
@@ -155,8 +88,8 @@ public class MagicCommitTracker extends PutTracker {
     SinglePendingCommit commitData = new SinglePendingCommit();
     commitData.touch(System.currentTimeMillis());
     commitData.setDestinationKey(getDestKey());
-    commitData.setBucket(bucket);
-    commitData.setUri(path.toUri().toString());
+    commitData.setBucket(getBucket());
+    commitData.setUri(getPath().toUri().toString());
     commitData.setUploadId(uploadId);
     commitData.setText("");
     commitData.setLength(bytesWritten);
@@ -167,16 +100,16 @@ public class MagicCommitTracker extends PutTracker {
     byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
     LOG.info("Uncommitted data pending to file {};"
             + " commit metadata for {} parts in {}. size: {} byte(s)",
-        path.toUri(), parts.size(), pendingPartKey, bytesWritten);
+        getPath().toUri(), parts.size(), getPendingPartKey(), bytesWritten);
     LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
-        path, pendingPartKey, commitData);
-    PutObjectRequest put = writer.createPutObjectRequest(
-        pendingPartKey,
+        getPath(), getPendingPartKey(), commitData);
+    PutObjectRequest put = getWriter().createPutObjectRequest(
+        getPendingPartKey(),
         bytes.length, null, false);
     upload(put, new ByteArrayInputStream(bytes));
     return false;
-
   }
+
   /**
    * PUT an object.
    * @param request the request
@@ -185,20 +118,8 @@ public class MagicCommitTracker extends PutTracker {
    */
   @Retries.RetryTranslated
   private void upload(PutObjectRequest request, InputStream inputStream) 
throws IOException {
-    trackDurationOfInvocation(trackerStatistics, 
COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
-        () -> writer.putObject(request, PutObjectOptions.keepingDirs(),
+    trackDurationOfInvocation(getTrackerStatistics(), 
COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
+        () -> getWriter().putObject(request, PutObjectOptions.keepingDirs(),
             new S3ADataBlocks.BlockUploadData(inputStream), false, null));
   }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder(
-        "MagicCommitTracker{");
-    sb.append(", destKey=").append(getDestKey());
-    sb.append(", pendingPartKey='").append(pendingPartKey).append('\'');
-    sb.append(", path=").append(path);
-    sb.append(", writer=").append(writer);
-    sb.append('}');
-    return sb.toString();
-  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index fb42d507b2d6..895815444932 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -362,6 +362,13 @@ the magic directory path rewriting is enabled by default.
 The Magic Committer has not been field tested to the extent of Netflix's 
committer;
 consider it the least mature of the committers.
 
+When there are less number of files to be written, The Magic committer has an 
option to store the commit data in-memory which can speed up the TaskCommit 
operation as well as save S3 cost. This can be enabled by the following property
+```xml
+<property>
+  <name>fs.s3a.committer.magic.track.commits.in.memory.enabled</name>
+  <value>true</value>
+</property>
+```
 
 ### Which Committer to Use?
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 67c88039aad1..3a7cceb2369e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -82,6 +82,7 @@ import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_
 import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
 import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED;
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -906,7 +907,14 @@ public abstract class AbstractITCommitProtocol extends 
AbstractCommitITest {
     assertNoMultipartUploadsPending(outDir);
 
     // commit task to fail on retry
-    expectFNFEonTaskCommit(committer, tContext);
+    // FNFE is not thrown in case of Magic committer when
+    // in memory commit data is enabled and hence skip the check.
+    boolean skipExpectFNFE = committer instanceof MagicS3GuardCommitter &&
+        isTrackMagicCommitsInMemoryEnabled(tContext.getConfiguration());
+
+    if (!skipExpectFNFE) {
+      expectFNFEonTaskCommit(committer, tContext);
+    }
   }
 
   /**
@@ -1422,7 +1430,10 @@ public abstract class AbstractITCommitProtocol extends 
AbstractCommitITest {
     validateTaskAttemptPathDuringWrite(dest, expectedLength, 
jobData.getCommitter().getUUID());
     recordWriter.close(tContext);
     // at this point
-    validateTaskAttemptPathAfterWrite(dest, expectedLength);
+    // Skip validation when commit data is stored in memory
+    if (!isTrackMagicCommitsInMemoryEnabled(conf)) {
+      validateTaskAttemptPathAfterWrite(dest, expectedLength);
+    }
     assertTrue("Committer does not have data to commit " + committer,
         committer.needsTaskCommit(tContext));
     commitTask(committer, tContext);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java
new file mode 100644
index 000000000000..a08f8d2d34b7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.hadoop.fs.s3a.commit.AbstractCommitITest.randomJobId;
+
+/**
+ * Class to test {@link MagicCommitTrackerUtils}.
+ */
+public final class TestMagicCommitTrackerUtils {
+
+  private String jobId;
+  private String attemptId;
+  private TaskAttemptID taskAttemptId;
+  private static final Path DEST_PATH = new 
Path("s3://dummyBucket/dummyTable");
+
+
+  @Before
+  public void setup() throws Exception {
+    jobId = randomJobId();
+    attemptId = "attempt_" + jobId + "_m_000000_0";
+    taskAttemptId = TaskAttemptID.forName(attemptId);
+  }
+
+  @Test
+  public void testExtractTaskAttemptIdFromPath() {
+    TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(
+        new Configuration(),
+        taskAttemptId);
+    Path path = CommitUtilsWithMR
+        .getBaseMagicTaskAttemptPath(taskAttemptContext, "00001", DEST_PATH);
+    assertEquals("TaskAttemptId didn't match", attemptId,
+        MagicCommitTrackerUtils.extractTaskAttemptIdFromPath(path));
+
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index fa963a4b9706..cbfc23a2a29b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.fs.s3a.commit.magic;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
@@ -39,7 +42,10 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static 
org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath;
@@ -48,8 +54,11 @@ import static 
org.apache.hadoop.util.functional.RemoteIterators.toList;
 /**
  * Test the magic committer's commit protocol.
  */
+@RunWith(Parameterized.class)
 public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
 
+  private final boolean trackCommitsInMemory;
+
   @Override
   protected String suitename() {
     return "ITestMagicCommitProtocol";
@@ -71,6 +80,27 @@ public class ITestMagicCommitProtocol extends 
AbstractITCommitProtocol {
     CommitUtils.verifyIsMagicCommitFS(getFileSystem());
   }
 
+  @Parameterized.Parameters(name = "track-commit-in-memory-{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {false},
+        {true}
+    });
+  }
+
+  public ITestMagicCommitProtocol(boolean trackCommitsInMemory) {
+    this.trackCommitsInMemory = trackCommitsInMemory;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf, 
FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED);
+    conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, 
trackCommitsInMemory);
+
+    return conf;
+  }
+
   @Override
   public void assertJobAbortCleanedUp(JobData jobData)
       throws Exception {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
index d28ee5172b63..be5222083378 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
 import org.apache.hadoop.examples.terasort.TeraValidate;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
 import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
 import org.apache.hadoop.mapred.JobConf;
@@ -97,6 +98,9 @@ public class ITestTerasortOnS3A extends 
AbstractYarnClusterITest {
   /** Name of the committer for this run. */
   private final String committerName;
 
+  /** Should Magic committer track pending commits in-memory. */
+  private final boolean trackCommitsInMemory;
+
   /** Base path for all the terasort input and output paths. */
   private Path terasortPath;
 
@@ -117,12 +121,14 @@ public class ITestTerasortOnS3A extends 
AbstractYarnClusterITest {
   @Parameterized.Parameters(name = "{0}")
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
-        {DirectoryStagingCommitter.NAME},
-        {MagicS3GuardCommitter.NAME}});
+        {DirectoryStagingCommitter.NAME, false},
+        {MagicS3GuardCommitter.NAME, false},
+        {MagicS3GuardCommitter.NAME, true}});
   }
 
-  public ITestTerasortOnS3A(final String committerName) {
+  public ITestTerasortOnS3A(final String committerName, final boolean 
trackCommitsInMemory) {
     this.committerName = committerName;
+    this.trackCommitsInMemory = trackCommitsInMemory;
   }
 
   @Override
@@ -152,6 +158,9 @@ public class ITestTerasortOnS3A extends 
AbstractYarnClusterITest {
     conf.setBoolean(
         TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
         false);
+    conf.setBoolean(
+        CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
+        trackCommitsInMemory);
   }
 
   private int getExpectedPartitionCount() {
@@ -173,7 +182,7 @@ public class ITestTerasortOnS3A extends 
AbstractYarnClusterITest {
    */
   private void prepareToTerasort() {
     // small sample size for faster runs
-    terasortPath = new Path("/terasort-" + committerName)
+    terasortPath = new Path("/terasort-" + committerName + "-" + 
trackCommitsInMemory)
         .makeQualified(getFileSystem());
     sortInput = new Path(terasortPath, "sortin");
     sortOutput = new Path(terasortPath, "sortout");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to