This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6805fd4c36 HDDS-9194. Implement cleanup service for MultipartInfoTable 
(#5325)
6805fd4c36 is described below

commit 6805fd4c36b53884fb1feb252389b00b6fcef590
Author: Ivan Andika <[email protected]>
AuthorDate: Tue Oct 3 13:26:37 2023 +0800

    HDDS-9194. Implement cleanup service for MultipartInfoTable (#5325)
---
 .../common/src/main/resources/ozone-default.xml    |  49 ++
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  20 +
 .../ozone/om/helpers/OmMultipartAbortInfo.java     | 116 ++++
 .../hadoop/ozone/om/helpers/OmMultipartUpload.java |  14 +
 .../src/main/proto/OmClientProtocol.proto          |  23 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  24 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   4 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  26 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  57 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  51 ++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  51 ++
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 +
 .../S3ExpiredMultipartUploadsAbortRequest.java     | 335 +++++++++++
 .../multipart/S3MultipartUploadAbortRequest.java   |   8 +-
 .../S3MultipartUploadAbortRequestWithFSO.java      |  25 -
 .../om/request/util/OMMultipartUploadUtils.java    |  49 ++
 .../AbstractS3MultipartAbortResponse.java          | 156 +++++
 .../S3ExpiredMultipartUploadsAbortResponse.java    |  80 +++
 .../multipart/S3MultipartUploadAbortResponse.java  |  43 +-
 .../om/service/MultipartUploadCleanupService.java  | 234 ++++++++
 .../hadoop/ozone/om/TestOmMetadataManager.java     | 105 ++++
 .../ozone/om/request/OMRequestTestUtils.java       |  43 +-
 .../TestS3ExpiredMultipartUploadsAbortRequest.java | 666 +++++++++++++++++++++
 .../s3/multipart/TestS3MultipartRequest.java       |  14 +-
 ...tS3MultipartUploadCommitPartRequestWithFSO.java |   2 +-
 ...estS3MultipartUploadCompleteRequestWithFSO.java |   2 +-
 ...TestS3ExpiredMultipartUploadsAbortResponse.java | 325 ++++++++++
 .../service/TestMultipartUploadCleanupService.java | 262 ++++++++
 29 files changed, 2694 insertions(+), 94 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 919540870c..3dd65220d7 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1391,6 +1391,55 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.open.mpu.cleanup.service.interval</name>
+    <value>24h</value>
+    <tag>OZONE, OM, PERFORMANCE</tag>
+    <description>
+      A background job that periodically checks inactive multipart info
+      send multipart upload abort requests for them.
+      This entry controls the interval of this
+      cleanup check. Unit could be defined with postfix (ns,ms,s,m,h,d)
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.open.mpu.cleanup.service.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, OM, PERFORMANCE</tag>
+    <description>
+      A timeout value of multipart upload cleanup service. If this is set
+      greater than 0, the service will stop waiting for the multipart info 
abort
+      completion after this time. If timeout happens to a large proportion of
+      multipart aborts, this value needs to be increased or
+      ozone.om.open.key.cleanup.limit.per.task should be decreased.
+      Unit could be defined with postfix (ns,ms,s,m,h,d)
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.open.mpu.expire.threshold</name>
+    <value>30d</value>
+    <tag>OZONE, OM, PERFORMANCE</tag>
+    <description>
+      Controls how long multipart upload is considered active. Specifically, 
if a multipart info
+      has been ongoing longer than the value of this config entry, that 
multipart info is considered as
+      expired (e.g. due to client crash). Unit could be defined with postfix 
(ns,ms,s,m,h,d)
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.open.mpu.parts.cleanup.limit.per.task</name>
+    <value>0</value>
+    <tag>OZONE, OM, PERFORMANCE</tag>
+    <description>
+      The maximum number of parts, rounded up to the nearest
+      number of expired multipart upload. This property is used
+      to approximately throttle the number of MPU parts sent
+      to the OM.
+    </description>
+  </property>
+
   <property>
     <name>hdds.rest.rest-csrf.enabled</name>
     <value>false</value>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 0e0a89a7a2..9cfff8a1e5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -321,6 +321,7 @@ public final class OmUtils {
     case SnapshotPurge:
     case RecoverLease:
     case SetTimes:
+    case AbortExpiredMultiPartUploads:
     case UnknownCommand:
       return false;
     case EchoRPC:
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index ab110f21f3..ef40003189 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -142,6 +142,26 @@ public final class OMConfigKeys {
   public static final int OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT =
       1000;
 
+  public static final String OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL =
+      "ozone.om.open.mpu.cleanup.service.interval";
+  public static final String
+      OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT = "24h";
+
+  public static final String OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT =
+      "ozone.om.open.mpu.cleanup.service.timeout";
+  public static final String OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT_DEFAULT
+      = "300s";
+
+  public static final String OZONE_OM_MPU_EXPIRE_THRESHOLD =
+      "ozone.om.open.mpu.expire.threshold";
+  public static final String OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT =
+      "30d";
+
+  public static final String OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK =
+      "ozone.om.open.mpu.parts.cleanup.limit.per.task";
+  public static final int OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK_DEFAULT =
+      0;
+
   public static final String OZONE_OM_METRICS_SAVE_INTERVAL =
       "ozone.om.save.metrics.interval";
   public static final String OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT = "5m";
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java
new file mode 100644
index 0000000000..245fc773f6
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import java.util.Objects;
+
+/**
+ * This class contains the necessary information to abort MPU keys.
+ */
+public final class OmMultipartAbortInfo {
+
+  private final String multipartKey;
+  private final String multipartOpenKey;
+  private final OmMultipartKeyInfo omMultipartKeyInfo;
+  private final BucketLayout bucketLayout;
+
+  private OmMultipartAbortInfo(String multipartKey, String multipartOpenKey,
+      OmMultipartKeyInfo omMultipartKeyInfo, BucketLayout bucketLayout) {
+    this.multipartKey = multipartKey;
+    this.multipartOpenKey = multipartOpenKey;
+    this.omMultipartKeyInfo = omMultipartKeyInfo;
+    this.bucketLayout = bucketLayout;
+  }
+
+  public String getMultipartKey() {
+    return multipartKey;
+  }
+
+  public String getMultipartOpenKey() {
+    return multipartOpenKey;
+  }
+
+  public OmMultipartKeyInfo getOmMultipartKeyInfo() {
+    return omMultipartKeyInfo;
+  }
+
+  public BucketLayout getBucketLayout() {
+    return bucketLayout;
+  }
+
+  /**
+   * Builder of OmMultipartAbortInfo.
+   */
+  public static class Builder {
+    private String multipartKey;
+    private String multipartOpenKey;
+    private OmMultipartKeyInfo omMultipartKeyInfo;
+    private BucketLayout bucketLayout;
+
+    public Builder setMultipartKey(String mpuKey) {
+      this.multipartKey = mpuKey;
+      return this;
+    }
+
+    public Builder setMultipartOpenKey(String mpuOpenKey) {
+      this.multipartOpenKey = mpuOpenKey;
+      return this;
+    }
+
+    public Builder setMultipartKeyInfo(OmMultipartKeyInfo multipartKeyInfo) {
+      this.omMultipartKeyInfo = multipartKeyInfo;
+      return this;
+    }
+
+    public Builder setBucketLayout(BucketLayout layout) {
+      this.bucketLayout = layout;
+      return this;
+    }
+
+    public OmMultipartAbortInfo build() {
+      return new OmMultipartAbortInfo(multipartKey,
+          multipartOpenKey, omMultipartKeyInfo, bucketLayout);
+    }
+  }
+
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    final OmMultipartAbortInfo that = (OmMultipartAbortInfo) other;
+
+    return this.multipartKey.equals(that.multipartKey) &&
+        this.multipartOpenKey.equals(that.multipartOpenKey) &&
+        this.bucketLayout.equals(that.bucketLayout) &&
+        this.omMultipartKeyInfo.equals(that.omMultipartKeyInfo);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(multipartKey, multipartOpenKey,
+        bucketLayout, omMultipartKeyInfo);
+  }
+
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
index 9bdc90c798..70da36dad5 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
@@ -130,4 +130,18 @@ public class OmMultipartUpload {
   public ReplicationConfig getReplicationConfig() {
     return replicationConfig;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    return other instanceof OmMultipartUpload && uploadId.equals(
+        ((OmMultipartUpload)other).getUploadId());
+  }
+
+  @Override
+  public int hashCode() {
+    return uploadId.hashCode();
+  }
 }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 01d384d5c2..77d2f23715 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -142,6 +142,8 @@ enum Type {
   SetSafeMode = 124;
   PrintCompactionLogDag = 125;
   ListKeysLight = 126;
+  AbortExpiredMultiPartUploads = 127;
+
 }
 
 enum SafeMode {
@@ -273,6 +275,8 @@ message OMRequest {
   optional CancelSnapshotDiffRequest        CancelSnapshotDiffRequest      = 
123;
   optional SetSafeModeRequest               SetSafeModeRequest             = 
124;
   optional PrintCompactionLogDagRequest     PrintCompactionLogDagRequest   = 
125;
+
+  optional MultipartUploadsExpiredAbortRequest 
multipartUploadsExpiredAbortRequest = 126;
 }
 
 message OMResponse {
@@ -390,6 +394,7 @@ message OMResponse {
   optional SetSafeModeResponse               SetSafeModeResponse           = 
124;
   optional PrintCompactionLogDagResponse     PrintCompactionLogDagResponse = 
125;
   optional ListKeysLightResponse             listKeysLightResponse         = 
126;
+  optional MultipartUploadsExpiredAbortResponse 
multipartUploadsExpiredAbortResponse = 127;
 }
 
 enum Status {
@@ -1577,6 +1582,24 @@ message MultipartUploadAbortRequest {
 message MultipartUploadAbortResponse {
 
 }
+
+message MultipartUploadsExpiredAbortRequest {
+  repeated ExpiredMultipartUploadsBucket expiredMultipartUploadsPerBucket = 1;
+}
+
+message ExpiredMultipartUploadsBucket {
+  required string volumeName = 1;
+  required string bucketName = 2;
+  repeated ExpiredMultipartUploadInfo multipartUploads = 3;
+}
+
+message ExpiredMultipartUploadInfo {
+  required string name = 1;
+}
+
+message MultipartUploadsExpiredAbortResponse {
+}
+
 message MultipartUploadListPartsRequest {
     required string volume = 1;
     required string bucket = 2;
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 66baf41184..4dfb135dd7 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import org.apache.hadoop.ozone.storage.proto.
     OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -279,6 +280,23 @@ public interface OMMetadataManager extends 
DBStoreHAManager {
   ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
       BucketLayout bucketLayout) throws IOException;
 
+  /**
+   * Returns the names of up to {@code count} MPU key whose age is greater
+   * than or equal to {@code expireThreshold}.
+   *
+   * @param expireThreshold The threshold of MPU key expiration age.
+   * @param maxParts The threshold of number of MPU parts to return.
+   *                 This is a soft limit, which means if the last MPU has
+   *                 parts that exceed this limit, it will be included
+   *                 in the returned expired MPUs. This is to handle
+   *                 situation where MPU that has more parts than
+   *                 maxParts never get deleted.
+   * @return a {@link List} of {@link ExpiredMultipartUploadsBucket}, the
+   *         expired multipart uploads, grouped by volume and bucket.
+   */
+  List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+      Duration expireThreshold, int maxParts) throws IOException;
+
   /**
    * Retrieve RWLock for the table.
    * @param tableName table name.
@@ -350,7 +368,8 @@ public interface OMMetadataManager extends DBStoreHAManager 
{
   Table<String, OmPrefixInfo> getPrefixTable();
 
   /**
-   * Returns the DB key name of a multipart upload key in OM metadata store.
+   * Returns the DB key name of a multipart upload key in OM metadata store
+   * for LEGACY/OBS buckets.
    *
    * @param volume - volume name
    * @param bucket - bucket name
@@ -512,7 +531,8 @@ public interface OMMetadataManager extends DBStoreHAManager 
{
   String getRenameKey(String volume, String bucket, long objectID);
 
   /**
-   * Returns the DB key name of a multipart upload key in OM metadata store.
+   * Returns the DB key name of a multipart upload key in OM metadata store
+   * for FSO-enabled buckets.
    *
    * @param volumeId       - ID of the volume
    * @param bucketId       - ID of the bucket
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 3b96267c97..ed37313f1c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -99,7 +99,9 @@ public enum OMAction implements AuditAction {
   LIST_SNAPSHOT,
   DELETE_SNAPSHOT,
   SNAPSHOT_MOVE_DELETED_KEYS,
-  SET_TIMES;
+  SET_TIMES,
+
+  ABORT_EXPIRED_MULTIPART_UPLOAD;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 6ba98cbde0..4f7fa6ad2f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -147,6 +148,25 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
   ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
       BucketLayout bucketLayout) throws IOException;
 
+  /**
+   * Returns the MPU infos of up to {@code count} whose age is greater
+   * than or equal to {@code expireThreshold}.
+   *
+   * @param expireThreshold The threshold of expired multipart uploads
+   *                        to return.
+   * @param maxParts The threshold of number of MPU parts to return.
+   *                 This is a soft limit, which means if the last MPU has
+   *                 parts that exceed this limit, it will be included
+   *                 in the returned expired MPUs. This is to handle
+   *                 situation where MPU that has more parts than
+   *                 maxParts never get deleted.
+   * @return a {@link List} of the expired
+   *        {@link ExpiredMultipartUploadsBucket}, the expired multipart
+   *        uploads, grouped by volume and bucket.
+   */
+  List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+      Duration expireThreshold, int maxParts) throws IOException;
+
   /**
    * Returns the metadataManager.
    * @return OMMetadataManager.
@@ -246,6 +266,12 @@ public interface KeyManager extends OzoneManagerFS, 
IOzoneAcl {
    */
   BackgroundService getOpenKeyCleanupService();
 
+  /**
+   * Returns the instance of Multipart Upload Cleanup Service.
+   * @return Background service.
+   */
+  BackgroundService getMultipartUploadCleanupService();
+
   /**
    * Returns the instance of Snapshot SST Filtering service.
    * @return Background service.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 33ea1f779b..a230882112 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -17,8 +17,6 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
 import java.time.Duration;
@@ -82,10 +80,13 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
 import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
+import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService;
 import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -122,6 +123,10 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERI
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
@@ -175,6 +180,7 @@ public class KeyManagerImpl implements KeyManager {
   private final OMPerformanceMetrics metrics;
 
   private BackgroundService openKeyCleanupService;
+  private BackgroundService multipartUploadCleanupService;
 
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
       OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -293,6 +299,21 @@ public class KeyManagerImpl implements KeyManager {
         LOG.error("Error starting Snapshot Deleting Service", e);
       }
     }
+
+    if (multipartUploadCleanupService == null) {
+      long serviceInterval = configuration.getTimeDuration(
+          OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL,
+          OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      long serviceTimeout = configuration.getTimeDuration(
+          OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT,
+          OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      multipartUploadCleanupService = new MultipartUploadCleanupService(
+          serviceInterval, TimeUnit.MILLISECONDS, serviceTimeout,
+          ozoneManager, configuration);
+      multipartUploadCleanupService.start();
+    }
   }
 
   KeyProviderCryptoExtension getKMSProvider() {
@@ -321,6 +342,10 @@ public class KeyManagerImpl implements KeyManager {
       snapshotDeletingService.shutdown();
       snapshotDeletingService = null;
     }
+    if (multipartUploadCleanupService != null) {
+      multipartUploadCleanupService.shutdown();
+      multipartUploadCleanupService = null;
+    }
   }
 
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -617,6 +642,14 @@ public class KeyManagerImpl implements KeyManager {
         bucketLayout);
   }
 
+  @Override
+  public List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+      Duration expireThreshold, int maxParts)
+      throws IOException {
+    return metadataManager.getExpiredMultipartUploads(expireThreshold,
+        maxParts);
+  }
+
   @Override
   public OMMetadataManager getMetadataManager() {
     return metadataManager;
@@ -632,10 +665,16 @@ public class KeyManagerImpl implements KeyManager {
     return dirDeletingService;
   }
 
+  @Override
   public BackgroundService getOpenKeyCleanupService() {
     return openKeyCleanupService;
   }
 
+  @Override
+  public BackgroundService getMultipartUploadCleanupService() {
+    return multipartUploadCleanupService;
+  }
+
   public SstFilteringService getSnapshotSstFilteringService() {
     return snapshotSstFilteringService;
   }
@@ -843,18 +882,8 @@ public class KeyManagerImpl implements KeyManager {
   private String getMultipartOpenKeyFSO(String volumeName, String bucketName,
       String keyName, String uploadID) throws IOException {
     OMMetadataManager metaMgr = metadataManager;
-    String fileName = OzoneFSUtils.getFileName(keyName);
-    Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    final long volumeId = metaMgr.getVolumeId(volumeName);
-    final long bucketId = metaMgr.getBucketId(volumeName, bucketName);
-    long parentID =
-        OMFileRequest.getParentID(volumeId, bucketId, pathComponents,
-                keyName, metaMgr);
-
-    String multipartKey = metaMgr.getMultipartKey(volumeId, bucketId,
-            parentID, fileName, uploadID);
-
-    return multipartKey;
+    return OMMultipartUploadUtils.getMultipartOpenKeyFSO(
+        volumeName, bucketName, keyName, uploadID, metaMgr);
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 97445c0ec2..ccb0fae6fb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -86,6 +86,11 @@ public class OMMetrics implements OmMetadataReaderMetrics {
   private @Metric MutableCounterLong numOpenKeysCleaned;
   private @Metric MutableCounterLong numOpenKeysHSyncCleaned;
 
+  private @Metric MutableCounterLong numExpiredMPUAbortRequests;
+  private @Metric MutableCounterLong numExpiredMPUSubmittedForAbort;
+  private @Metric MutableCounterLong numExpiredMPUAborted;
+  private @Metric MutableCounterLong numExpiredMPUPartsAborted;
+
   private @Metric MutableCounterLong numAddAcl;
   private @Metric MutableCounterLong numSetAcl;
   private @Metric MutableCounterLong numGetAcl;
@@ -124,6 +129,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
   private @Metric MutableCounterLong numListMultipartUploadParts;
   private @Metric MutableCounterLong numListMultipartUploadPartFails;
   private @Metric MutableCounterLong numOpenKeyDeleteRequestFails;
+  private @Metric MutableCounterLong numExpiredMPUAbortRequestFails;
   private @Metric MutableCounterLong numSnapshotCreateFails;
   private @Metric MutableCounterLong numSnapshotDeleteFails;
   private @Metric MutableCounterLong numSnapshotListFails;
@@ -813,6 +819,26 @@ public class OMMetrics implements OmMetadataReaderMetrics {
     numOpenKeyDeleteRequestFails.incr();
   }
 
+  public void incNumExpiredMPUAbortRequests() {
+    numExpiredMPUAbortRequests.incr();
+  }
+
+  public void incNumExpiredMPUSubmittedForAbort(long amount) {
+    numExpiredMPUSubmittedForAbort.incr(amount);
+  }
+
+  public void incNumExpiredMPUAborted() {
+    numExpiredMPUAborted.incr();
+  }
+
+  public void incNumExpiredMPUPartsAborted(long amount) {
+    numExpiredMPUPartsAborted.incr(amount);
+  }
+
+  public void incNumExpiredMpuAbortRequestFails() {
+    numExpiredMPUAbortRequestFails.incr();
+  }
+
   public void incNumAddAcl() {
     numAddAcl.incr();
   }
@@ -1130,6 +1156,31 @@ public class OMMetrics implements 
OmMetadataReaderMetrics {
     return numOpenKeyDeleteRequestFails.value();
   }
 
+  @VisibleForTesting
+  public long getNumExpiredMPUAbortRequests() {
+    return numExpiredMPUAbortRequests.value();
+  }
+
+  @VisibleForTesting
+  public long getNumExpiredMPUSubmittedForAbort() {
+    return numExpiredMPUSubmittedForAbort.value();
+  }
+
+  @VisibleForTesting
+  public long getNumExpiredMPUAborted() {
+    return numExpiredMPUAborted.value();
+  }
+
+  @VisibleForTesting
+  public long getNumExpiredMPUAbortRequestFails() {
+    return numExpiredMPUAbortRequestFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumExpiredMPUPartsAborted() {
+    return numExpiredMPUPartsAborted.value();
+  }
+
   public long getNumAddAcl() {
     return numAddAcl.value();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 6f0fafcff2..e6b027427d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -86,6 +87,8 @@ import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTrans
 import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.storage.proto
     .OzoneManagerStorageProtos.PersistedUserVolumeInfo;
@@ -948,6 +951,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     String volumeId = String.valueOf(getVolumeId(
             omBucketInfo.getVolumeName()));
     String bucketId = String.valueOf(omBucketInfo.getObjectID());
+
     BucketLayout bucketLayout = omBucketInfo.getBucketLayout();
 
     // keyPrefix is different in case of fileTable and keyTable.
@@ -1813,6 +1817,53 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     return expiredKeys;
   }
 
+  @Override
+  public List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+      Duration expireThreshold, int maxParts) throws IOException {
+    Map<String, ExpiredMultipartUploadsBucket.Builder> expiredMPUs =
+        new HashMap<>();
+
+    try (TableIterator<String, ? extends KeyValue<String, OmMultipartKeyInfo>>
+             mpuInfoTableIterator = getMultipartInfoTable().iterator()) {
+
+      final long expiredCreationTimestamp =
+          Instant.now().minus(expireThreshold).toEpochMilli();
+
+      ExpiredMultipartUploadInfo.Builder builder =
+          ExpiredMultipartUploadInfo.newBuilder();
+
+      int numParts = 0;
+      while (numParts < maxParts &&
+          mpuInfoTableIterator.hasNext()) {
+        KeyValue<String, OmMultipartKeyInfo> mpuInfoValue =
+            mpuInfoTableIterator.next();
+        String dbMultipartInfoKey = mpuInfoValue.getKey();
+        OmMultipartKeyInfo omMultipartKeyInfo = mpuInfoValue.getValue();
+
+        if (omMultipartKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
+          OmMultipartUpload expiredMultipartUpload =
+              OmMultipartUpload.from(dbMultipartInfoKey);
+          final String volume =  expiredMultipartUpload.getVolumeName();
+          final String bucket = expiredMultipartUpload.getBucketName();
+          final String mapKey = volume + OM_KEY_PREFIX + bucket;
+          expiredMPUs.computeIfAbsent(mapKey, k ->
+              ExpiredMultipartUploadsBucket.newBuilder()
+                  .setVolumeName(volume)
+                  .setBucketName(bucket));
+          expiredMPUs.get(mapKey)
+              .addMultipartUploads(builder.setName(dbMultipartInfoKey)
+                  .build());
+          numParts += omMultipartKeyInfo.getPartKeyInfoMap().size();
+        }
+
+      }
+    }
+
+    return expiredMPUs.values().stream().map(
+            ExpiredMultipartUploadsBucket.Builder::build)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public <KEY, VALUE> long countRowsInTable(Table<KEY, VALUE> table)
       throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index c07969e6fc..d6ebb44da0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -60,6 +60,7 @@ import 
org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequestWithFSO;
 import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixAddAclRequest;
 import 
org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
+import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3ExpiredMultipartUploadsAbortRequest;
 import org.apache.hadoop.ozone.om.request.s3.security.OMSetSecretRequest;
 import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
 import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSecretRequest;
@@ -322,6 +323,8 @@ public final class OzoneManagerRatisUtils {
       break;
     case EchoRPC:
       return new OMEchoRPCWriteRequest(omRequest);
+    case AbortExpiredMultiPartUploads:
+      return new S3ExpiredMultipartUploadsAbortRequest(omRequest);
     default:
       throw new OMException("Unrecognized write command type request "
           + cmdType, OMException.ResultCodes.INVALID_REQUEST);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java
new file mode 100644
index 0000000000..0898089576
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import 
org.apache.hadoop.ozone.om.response.s3.multipart.S3ExpiredMultipartUploadsAbortResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles requests to move both MPU open keys from the open key/file table and
+ * MPU part keys to delete table. Modifies the open key/file table cache only,
+ * and no underlying databases.
+ * The delete table cache does not need to be modified since it is not used
+ * for client response validation.
+ */
+public class S3ExpiredMultipartUploadsAbortRequest extends OMKeyRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3ExpiredMultipartUploadsAbortRequest.class);
+
+  public S3ExpiredMultipartUploadsAbortRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumExpiredMPUAbortRequests();
+
+    OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest
+        multipartUploadsExpiredAbortRequest = getOmRequest()
+        .getMultipartUploadsExpiredAbortRequest();
+
+    List<ExpiredMultipartUploadsBucket> submittedExpiredMPUsPerBucket =
+        multipartUploadsExpiredAbortRequest
+            .getExpiredMultipartUploadsPerBucketList();
+
+    long numSubmittedMPUs = 0;
+    for (ExpiredMultipartUploadsBucket mpuByBucket:
+        submittedExpiredMPUsPerBucket) {
+      numSubmittedMPUs += mpuByBucket.getMultipartUploadsCount();
+    }
+
+    LOG.debug("{} expired multi-uploads submitted for deletion.",
+        numSubmittedMPUs);
+    omMetrics.incNumExpiredMPUSubmittedForAbort(numSubmittedMPUs);
+
+    OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+
+    IOException exception = null;
+    OMClientResponse omClientResponse = null;
+    Result result = null;
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>>
+        abortedMultipartUploads = new HashMap<>();
+
+    try {
+      for (ExpiredMultipartUploadsBucket mpuByBucket:
+          submittedExpiredMPUsPerBucket) {
+        // For each bucket where the MPU will be aborted from,
+        // get its bucket lock and update the cache accordingly.
+        updateTableCache(ozoneManager, trxnLogIndex, mpuByBucket,
+            abortedMultipartUploads);
+      }
+
+      omClientResponse = new S3ExpiredMultipartUploadsAbortResponse(
+          omResponse.build(), abortedMultipartUploads,
+          ozoneManager.isRatisEnabled());
+
+      result = Result.SUCCESS;
+    } catch (IOException ex) {
+      result = Result.FAILURE;
+      exception = ex;
+      omClientResponse =
+          new S3ExpiredMultipartUploadsAbortResponse(createErrorOMResponse(
+              omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+          omDoubleBufferHelper);
+    }
+
+    // Only successfully aborted MPUs are included in the audit.
+    auditAbortedMPUs(ozoneManager, abortedMultipartUploads);
+
+    processResults(omMetrics, numSubmittedMPUs,
+        abortedMultipartUploads.size(),
+        multipartUploadsExpiredAbortRequest, result);
+
+    return omClientResponse;
+
+  }
+
+  private void auditAbortedMPUs(OzoneManager ozoneManager,
+      Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads) {
+    for (Map.Entry<OmBucketInfo, List<OmMultipartAbortInfo>> entry :
+        abortedMultipartUploads.entrySet()) {
+      KeyArgs.Builder keyArgsAuditBuilder = KeyArgs.newBuilder()
+          .setVolumeName(entry.getKey().getVolumeName())
+          .setBucketName(entry.getKey().getBucketName());
+
+      for (OmMultipartAbortInfo abortInfo: entry.getValue()) {
+        // See RpcClient#abortMultipartUpload
+        KeyArgs keyArgsForAudit = keyArgsAuditBuilder
+            .setKeyName(abortInfo.getMultipartKey())
+            .setMultipartUploadID(abortInfo.getOmMultipartKeyInfo()
+                .getUploadID())
+            .build();
+        Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgsForAudit);
+        auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+            OMAction.ABORT_EXPIRED_MULTIPART_UPLOAD, auditMap,
+            null, getOmRequest().getUserInfo()));
+      }
+    }
+  }
+
+  private void processResults(OMMetrics omMetrics,
+      long numSubmittedExpiredMPUs, long numAbortedMPUs,
+      OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest request,
+      Result result) {
+
+    switch (result) {
+    case SUCCESS:
+      LOG.debug("Aborted {} expired MPUs out of {} submitted MPus.",
+            numAbortedMPUs, numSubmittedExpiredMPUs);
+      break;
+    case FAILURE:
+      omMetrics.incNumExpiredMpuAbortRequestFails();
+      LOG.error("Failure occurred while trying to abort {} submitted " +
+          "expired MPUs.", numAbortedMPUs);
+      break;
+    default:
+      LOG.error("Unrecognized result for " +
+          "MultipartUploadsExpiredAbortRequest: {}", request);
+    }
+
+  }
+
+  private void updateTableCache(OzoneManager ozoneManager,
+        long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket,
+        Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads)
+      throws IOException {
+
+    boolean acquiredLock = false;
+    String volumeName = mpusPerBucket.getVolumeName();
+    String bucketName = mpusPerBucket.getBucketName();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    OmBucketInfo omBucketInfo = null;
+    BucketLayout bucketLayout = null;
+    try {
+      acquiredLock = omMetadataManager.getLock()
+          .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName);
+
+      omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
+
+      if (omBucketInfo == null) {
+        LOG.warn("Volume: {}, Bucket: {} does not exist, skipping deletion.",
+            volumeName, bucketName);
+        return;
+      }
+
+      // Do not use getBucketLayout since the expired MPUs request might
+      // contains MPUs from all kind of buckets
+      bucketLayout = omBucketInfo.getBucketLayout();
+
+      for (ExpiredMultipartUploadInfo expiredMPU:
+          mpusPerBucket.getMultipartUploadsList()) {
+        String expiredMPUKeyName = expiredMPU.getName();
+
+        // If the MPU key is no longer present in the table, MPU
+        // might have been completed / aborted, and should not be
+        // aborted.
+        OmMultipartKeyInfo omMultipartKeyInfo =
+            omMetadataManager.getMultipartInfoTable().get(expiredMPUKeyName);
+
+        if (omMultipartKeyInfo != null) {
+          if (ozoneManager.isRatisEnabled() &&
+              trxnLogIndex < omMultipartKeyInfo.getUpdateID()) {
+            LOG.warn("Transaction log index {} is smaller than " +
+                    "the current updateID {} of MPU key {}, skipping 
deletion.",
+                trxnLogIndex, omMultipartKeyInfo.getUpdateID(),
+                expiredMPUKeyName);
+            continue;
+          }
+
+          // Set the UpdateID to current transactionLogIndex
+          omMultipartKeyInfo.setUpdateID(trxnLogIndex,
+              ozoneManager.isRatisEnabled());
+
+          // Parse the multipart upload components (e.g. volume, bucket, key)
+          // from the multipartInfoTable db key
+
+          OmMultipartUpload multipartUpload;
+          try {
+            multipartUpload =
+                OmMultipartUpload.from(expiredMPUKeyName);
+          } catch (IllegalArgumentException e) {
+            LOG.warn("Aborting expired MPU failed: MPU key: " +
+                expiredMPUKeyName + " has invalid structure, " +
+                "skipping this MPU.");
+            continue;
+          }
+
+          String multipartOpenKey;
+          try {
+            multipartOpenKey =
+                OMMultipartUploadUtils
+                    .getMultipartOpenKey(multipartUpload.getVolumeName(),
+                        multipartUpload.getBucketName(),
+                        multipartUpload.getKeyName(),
+                        multipartUpload.getUploadId(), omMetadataManager,
+                        bucketLayout);
+          } catch (OMException ome) {
+            LOG.warn("Aborting expired MPU Failed: volume: " +
+                multipartUpload.getVolumeName() + ", bucket: " +
+                multipartUpload.getBucketName() + ", key: " +
+                multipartUpload.getKeyName() + ". Cannot parse the open key" +
+                "for this MPU, skipping this MPU.");
+            continue;
+          }
+
+          // When abort uploaded key, we need to subtract the PartKey length
+          // from the volume usedBytes.
+          long quotaReleased = 0;
+          int keyFactor = omMultipartKeyInfo.getReplicationConfig()
+              .getRequiredNodes();
+          for (PartKeyInfo iterPartKeyInfo : omMultipartKeyInfo.
+              getPartKeyInfoMap()) {
+            quotaReleased +=
+                iterPartKeyInfo.getPartKeyInfo().getDataSize() * keyFactor;
+          }
+          omBucketInfo.incrUsedBytes(-quotaReleased);
+
+          OmMultipartAbortInfo omMultipartAbortInfo =
+              new OmMultipartAbortInfo.Builder()
+                  .setMultipartKey(expiredMPUKeyName)
+                  .setMultipartOpenKey(multipartOpenKey)
+                  .setMultipartKeyInfo(omMultipartKeyInfo)
+                  .setBucketLayout(omBucketInfo.getBucketLayout())
+                  .build();
+
+          abortedMultipartUploads.computeIfAbsent(omBucketInfo,
+              k -> new ArrayList<>()).add(omMultipartAbortInfo);
+
+          // Update cache of openKeyTable and multipartInfo table.
+          // No need to add the cache entries to delete table, as the entries
+          // in delete table are not used by any read/write operations.
+
+          // Unlike normal MPU abort request where the MPU open keys needs
+          // to exist. For OpenKeyCleanupService run prior to
+          // HDDS-9017, these MPU open keys might already be deleted,
+          // causing "orphan" MPU keys (MPU entry exist in
+          // multipartInfoTable, but not in openKeyTable).
+          // We can skip this existence check and just delete the
+          // multipartInfoTable. The existence check can be re-added
+          // once there are no "orphan" keys
+          if (omMetadataManager.getOpenKeyTable(bucketLayout)
+              .isExist(multipartOpenKey)) {
+            omMetadataManager.getOpenKeyTable(bucketLayout)
+                .addCacheEntry(new CacheKey<>(multipartOpenKey),
+                    new CacheValue<>(Optional.absent(), trxnLogIndex));
+          }
+          omMetadataManager.getMultipartInfoTable()
+              .addCacheEntry(new CacheKey<>(expiredMPUKeyName),
+                  new CacheValue<>(Optional.absent(), trxnLogIndex));
+
+          long numParts = omMultipartKeyInfo.getPartKeyInfoMap().size();
+          ozoneManager.getMetrics().incNumExpiredMPUAborted();
+          ozoneManager.getMetrics().incNumExpiredMPUPartsAborted(numParts);
+          LOG.debug("Expired MPU {} aborted containing {} parts.",
+              expiredMPUKeyName, numParts);
+        } else {
+          LOG.debug("MPU key {} was not aborted, as it was not " +
+              "found in the multipart info table", expiredMPUKeyName);
+        }
+      }
+    } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
index d8a87e24f2..df2a31825e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
 import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase;
@@ -250,10 +251,9 @@ public class S3MultipartUploadAbortRequest extends 
OMKeyRequest {
   protected String getMultipartOpenKey(String multipartUploadID,
       String volumeName, String bucketName, String keyName,
       OMMetadataManager omMetadataManager) throws IOException {
-
-    String multipartKey = omMetadataManager.getMultipartKey(
-        volumeName, bucketName, keyName, multipartUploadID);
-    return multipartKey;
+    return OMMultipartUploadUtils.getMultipartOpenKey(
+        volumeName, bucketName, keyName, multipartUploadID, omMetadataManager,
+        getBucketLayout());
   }
 
   @RequestFeatureValidator(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
index f15b5dabbb..fbbb994bea 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
@@ -18,13 +18,10 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
-import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import 
org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadAbortResponseWithFSO;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortResponse;
@@ -32,9 +29,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
 import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Iterator;
 
 /**
  * Handles Abort of multipart upload request.
@@ -68,23 +62,4 @@ public class S3MultipartUploadAbortRequestWithFSO
         omBucketInfo.copyObject(), getBucketLayout());
     return omClientResp;
   }
-
-  @Override
-  protected String getMultipartOpenKey(String multipartUploadID,
-      String volumeName, String bucketName, String keyName,
-      OMMetadataManager omMetadataManager) throws IOException {
-
-    String fileName = OzoneFSUtils.getFileName(keyName);
-    Iterator<Path> pathComponents = Paths.get(keyName).iterator();
-    final long volumeId = omMetadataManager.getVolumeId(volumeName);
-    final long bucketId = omMetadataManager.getBucketId(volumeName,
-            bucketName);
-    long parentID = OMFileRequest.getParentID(volumeId, bucketId,
-            pathComponents, keyName, omMetadataManager);
-
-    String multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
-            parentID, fileName, multipartUploadID);
-
-    return multipartKey;
-  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
index 5047949b6d..118eebd620 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
@@ -20,8 +20,16 @@ package org.apache.hadoop.ozone.om.request.util;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.utils.UniqueId;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
 import java.util.UUID;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
@@ -79,6 +87,47 @@ public final class OMMultipartUploadUtils {
     return uploadId;
   }
 
+  /**
+   * Get the multipart open key based on the bucket layout.
+   * @throws IOException
+   */
+  public static String getMultipartOpenKey(String volumeName,
+       String bucketName, String keyName, String multipartUploadId,
+       OMMetadataManager omMetadataManager, BucketLayout bucketLayout)
+      throws IOException {
+    if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      return getMultipartOpenKeyFSO(volumeName, bucketName,
+          keyName, multipartUploadId, omMetadataManager);
+    } else {
+      return getMultipartOpenKey(volumeName, bucketName,
+          keyName, multipartUploadId, omMetadataManager);
+    }
+  }
+
+  public static String getMultipartOpenKey(String volumeName,
+       String bucketName, String keyName, String multipartUploadId,
+       OMMetadataManager omMetadataManager) {
+    return omMetadataManager.getMultipartKey(
+        volumeName, bucketName, keyName, multipartUploadId);
+  }
+
+  public static String getMultipartOpenKeyFSO(String volumeName,
+        String bucketName, String keyName, String uploadID,
+        OMMetadataManager metaMgr) throws IOException {
+    String fileName = OzoneFSUtils.getFileName(keyName);
+    Iterator<Path> pathComponents = Paths.get(keyName).iterator();
+    final long volumeId = metaMgr.getVolumeId(volumeName);
+    final long bucketId = metaMgr.getBucketId(volumeName, bucketName);
+    long parentID =
+        OMFileRequest.getParentID(volumeId, bucketId, pathComponents,
+            keyName, metaMgr);
+
+    String multipartKey = metaMgr.getMultipartKey(volumeId, bucketId,
+        parentID, fileName, uploadID);
+
+    return multipartKey;
+  }
+
 
   /**
    * Check whether key's isMultipartKey flag is set.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java
new file mode 100644
index 0000000000..42267b644c
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+
+/**
+ * Base class for responses that need to move multipart info part keys to the
+ * deleted table.
+ */
+@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, OPEN_FILE_TABLE,
+    DELETED_TABLE, MULTIPARTINFO_TABLE, BUCKET_TABLE})
+public abstract class AbstractS3MultipartAbortResponse extends OmKeyResponse {
+
+  private boolean isRatisEnabled;
+
+  public AbstractS3MultipartAbortResponse(
+      @Nonnull OMResponse omResponse, boolean isRatisEnabled) {
+    super(omResponse);
+    this.isRatisEnabled = isRatisEnabled;
+  }
+
+  public AbstractS3MultipartAbortResponse(
+      @Nonnull OMResponse omResponse, boolean isRatisEnabled,
+      BucketLayout bucketLayout) {
+    super(omResponse, bucketLayout);
+    this.isRatisEnabled =  isRatisEnabled;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public AbstractS3MultipartAbortResponse(@Nonnull OMResponse omResponse,
+        BucketLayout bucketLayout) {
+    super(omResponse, bucketLayout);
+    checkStatusNotOK();
+  }
+
+  /**
+   * Adds the operation of aborting a list of multipart uploads under the
+   * same bucket.
+   * @param omMetadataManager
+   * @param batchOperation
+   * @param omBucketInfo
+   * @param multipartAbortInfo
+   * @throws IOException
+   */
+  protected void addAbortToBatch(
+      OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation,
+      OmBucketInfo omBucketInfo,
+      List<OmMultipartAbortInfo> multipartAbortInfo
+  ) throws IOException {
+    for (OmMultipartAbortInfo abortInfo: multipartAbortInfo) {
+      // Delete from openKey table and multipart info table.
+      omMetadataManager.getOpenKeyTable(abortInfo.getBucketLayout())
+          .deleteWithBatch(batchOperation, abortInfo.getMultipartOpenKey());
+      omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
+          abortInfo.getMultipartKey());
+
+      OmMultipartKeyInfo omMultipartKeyInfo = abortInfo
+          .getOmMultipartKeyInfo();
+      // Move all the parts to delete table
+      for (PartKeyInfo partKeyInfo: omMultipartKeyInfo.getPartKeyInfoMap()) {
+        OmKeyInfo currentKeyPartInfo =
+            OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
+        // TODO: Similar to open key deletion response, we can check if the
+        //  MPU part actually contains blocks, and only move the to
+        //  deletedTable if it does.
+
+        RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+            currentKeyPartInfo, omMultipartKeyInfo.getUpdateID(),
+            isRatisEnabled);
+
+        // multi-part key format is volumeName/bucketName/keyName/uploadId
+        String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+            currentKeyPartInfo.getObjectID(), abortInfo.getMultipartKey());
+
+        omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+            deleteKey, repeatedOmKeyInfo);
+      }
+    }
+    // update bucket usedBytes.
+    omMetadataManager.getBucketTable().putWithBatch(batchOperation,
+        omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+            omBucketInfo.getBucketName()), omBucketInfo);
+  }
+
+  /**
+   * Adds the operation of aborting a multipart upload to the batch operation.
+   * Both LEGACY/OBS and FSO have similar abort logic. The only difference
+   * is the multipartOpenKey used in the openKeyTable and openFileTable.
+   */
+  protected void addAbortToBatch(
+      OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation,
+      String multipartKey,
+      String multipartOpenKey,
+      OmMultipartKeyInfo omMultipartKeyInfo,
+      OmBucketInfo omBucketInfo,
+      BucketLayout bucketLayout) throws IOException {
+    OmMultipartAbortInfo omMultipartAbortInfo =
+        new OmMultipartAbortInfo.Builder()
+            .setMultipartKey(multipartKey)
+            .setMultipartOpenKey(multipartOpenKey)
+            .setMultipartKeyInfo(omMultipartKeyInfo)
+            .setBucketLayout(bucketLayout)
+            .build();
+    addAbortToBatch(omMetadataManager, batchOperation, omBucketInfo,
+        Collections.singletonList(omMultipartAbortInfo));
+  }
+
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java
new file mode 100644
index 0000000000..363074b596
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+
+/**
+ * Handles response to abort expired MPUs.
+ */
+@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, OPEN_FILE_TABLE,
+    DELETED_TABLE, MULTIPARTINFO_TABLE, BUCKET_TABLE})
+public class S3ExpiredMultipartUploadsAbortResponse extends
+    AbstractS3MultipartAbortResponse {
+
+  private Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToDelete;
+
+  public S3ExpiredMultipartUploadsAbortResponse(
+      @Nonnull OMResponse omResponse,
+      @Nonnull Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToDelete,
+      boolean isRatisEnabled) {
+    super(omResponse, isRatisEnabled);
+    this.mpusToDelete = mpusToDelete;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public S3ExpiredMultipartUploadsAbortResponse(
+      @Nonnull OMResponse omResponse) {
+    // Set BucketLayout.DEFAULT just as a placeholder
+    // OmMultipartAbortInfo already contains the bucket layout info
+    super(omResponse, BucketLayout.DEFAULT);
+  }
+
+  @Override
+  protected void addToDBBatch(
+      OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    for (Map.Entry<OmBucketInfo, List<OmMultipartAbortInfo>> mpuInfoPair :
+        mpusToDelete.entrySet()) {
+      addAbortToBatch(omMetadataManager, batchOperation,
+          mpuInfoPair.getKey(), mpuInfoPair.getValue());
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
index 567b28da02..f3e2054fad 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
@@ -18,19 +18,13 @@
 
 package org.apache.hadoop.ozone.om.response.s3.multipart;
 
-import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .PartKeyInfo;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
 import java.io.IOException;
@@ -46,23 +40,22 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
  */
 @CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, DELETED_TABLE,
     MULTIPARTINFO_TABLE, BUCKET_TABLE})
-public class S3MultipartUploadAbortResponse extends OmKeyResponse {
+public class S3MultipartUploadAbortResponse extends
+    AbstractS3MultipartAbortResponse {
 
   private String multipartKey;
   private String multipartOpenKey;
   private OmMultipartKeyInfo omMultipartKeyInfo;
-  private boolean isRatisEnabled;
   private OmBucketInfo omBucketInfo;
 
   public S3MultipartUploadAbortResponse(@Nonnull OMResponse omResponse,
       String multipartKey, String multipartOpenKey,
       @Nonnull OmMultipartKeyInfo omMultipartKeyInfo, boolean isRatisEnabled,
       @Nonnull OmBucketInfo omBucketInfo, @Nonnull BucketLayout bucketLayout) {
-    super(omResponse, bucketLayout);
+    super(omResponse, isRatisEnabled, bucketLayout);
     this.multipartKey = multipartKey;
     this.multipartOpenKey = multipartOpenKey;
     this.omMultipartKeyInfo = omMultipartKeyInfo;
-    this.isRatisEnabled = isRatisEnabled;
     this.omBucketInfo = omBucketInfo;
   }
 
@@ -79,32 +72,8 @@ public class S3MultipartUploadAbortResponse extends 
OmKeyResponse {
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
-
-    // Delete from openKey table and multipart info table.
-    omMetadataManager.getOpenKeyTable(getBucketLayout())
-        .deleteWithBatch(batchOperation, multipartOpenKey);
-    omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
-        multipartKey);
-
-    // Move all the parts to delete table
-    for (PartKeyInfo partKeyInfo: omMultipartKeyInfo.getPartKeyInfoMap()) {
-      OmKeyInfo currentKeyPartInfo =
-          OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
-
-      RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
-          currentKeyPartInfo, omMultipartKeyInfo.getUpdateID(),
-          isRatisEnabled);
-      // multi-part key format is volumeName/bucketName/keyName/uploadId
-      String deleteKey = omMetadataManager.getOzoneDeletePathKey(
-          currentKeyPartInfo.getObjectID(), multipartKey);
-
-      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
-          deleteKey, repeatedOmKeyInfo);
-    }
-
-    // update bucket usedBytes.
-    omMetadataManager.getBucketTable().putWithBatch(batchOperation,
-        omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
-            omBucketInfo.getBucketName()), omBucketInfo);
+    addAbortToBatch(omMetadataManager, batchOperation,
+        multipartKey, multipartOpenKey, omMultipartKeyInfo, omBucketInfo,
+        getBucketLayout());
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
new file mode 100644
index 0000000000..ea175ac001
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is the background service to abort incomplete Multipart Upload.
+ * Scan the MultipartInfoTable periodically to get MPU keys with
+ * creationTimestamp older than a certain threshold, and delete them.
+ */
+public class MultipartUploadCleanupService extends BackgroundService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MultipartUploadCleanupService.class);
+
+  // Similar to OpenKeyCleanupService, use a single thread.
+  private static final int MPU_INFO_DELETING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+  private final KeyManager keyManager;
+  // Dummy client ID to use for response.
+  private final ClientId clientId = ClientId.randomId();
+  private final Duration expireThreshold;
+  private final int mpuPartsLimitPerTask;
+  private final AtomicLong submittedMpuInfoCount;
+  private final AtomicLong runCount;
+  private final AtomicBoolean suspended;
+
+  public MultipartUploadCleanupService(long interval, TimeUnit unit,
+        long timeout, OzoneManager ozoneManager, ConfigurationSource conf) {
+    super("MultipartUploadCleanupService", interval, unit,
+        MPU_INFO_DELETING_CORE_POOL_SIZE, timeout);
+    this.ozoneManager = ozoneManager;
+    this.keyManager = ozoneManager.getKeyManager();
+
+    long expireMillis = conf.getTimeDuration(
+        OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD,
+        OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.expireThreshold = Duration.ofMillis(expireMillis);
+
+    this.mpuPartsLimitPerTask = conf.getInt(
+        OMConfigKeys.OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK,
+        OMConfigKeys.OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK_DEFAULT);
+
+    this.submittedMpuInfoCount = new AtomicLong(0);
+    this.runCount = new AtomicLong(0);
+    this.suspended = new AtomicBoolean(false);
+  }
+
+  /**
+   * Returns the number of times this Background service has run.
+   *
+   * @return Long, run count.
+   */
+  @VisibleForTesting
+  public long getRunCount() {
+    return runCount.get();
+  }
+
+  /**
+   * Suspend the service (for testing).
+   */
+  @VisibleForTesting
+  public void suspend() {
+    suspended.set(true);
+  }
+
+  /**
+   * Resume the service if suspended (for testing).
+   */
+  @VisibleForTesting
+  public void resume() {
+    suspended.set(false);
+  }
+
+  /**
+   * Returns the number of MPU info that were submitted for deletion by this
+   * service. If the MPUInfoTable were completed/aborted
+   * from the MPUInfoTable between being submitted for deletion
+   * and the actual delete operation, they will not be deleted.
+   *
+   * @return long count.
+   */
+  @VisibleForTesting
+  public long getSubmittedMpuInfoCount() {
+    return submittedMpuInfoCount.get();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new MultipartUploadCleanupTask());
+    return queue;
+  }
+
+  private boolean shouldRun() {
+    return !suspended.get() && ozoneManager.isLeaderReady();
+  }
+
+  private boolean isRatisEnabled() {
+    return ozoneManager.isRatisEnabled();
+  }
+
+  private class MultipartUploadCleanupTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      if (!shouldRun()) {
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
+      }
+
+      runCount.incrementAndGet();
+      long startTime = Time.monotonicNow();
+      List<ExpiredMultipartUploadsBucket> expiredMultipartUploads = null;
+      try {
+        expiredMultipartUploads = keyManager.getExpiredMultipartUploads(
+            expireThreshold, mpuPartsLimitPerTask);
+      } catch (IOException e) {
+        LOG.error("Unable to get expired MPU info, retry in next interval", e);
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
+      }
+
+      if (expiredMultipartUploads != null &&
+          !expiredMultipartUploads.isEmpty()) {
+        int numExpiredMultipartUploads = expiredMultipartUploads.stream()
+            .mapToInt(ExpiredMultipartUploadsBucket::getMultipartUploadsCount)
+            .sum();
+
+        OMRequest omRequest = createRequest(expiredMultipartUploads);
+        submitRequest(omRequest);
+
+        LOG.debug("Number of expired multipart info submitted for deletion: "
+                + "{}, elapsed time: {}ms", numExpiredMultipartUploads,
+            Time.monotonicNow() - startTime);
+        submittedMpuInfoCount.addAndGet(numExpiredMultipartUploads);
+      }
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    private OMRequest createRequest(List<ExpiredMultipartUploadsBucket>
+                                        expiredMultipartUploadsBuckets) {
+      MultipartUploadsExpiredAbortRequest request =
+          MultipartUploadsExpiredAbortRequest.newBuilder()
+              .addAllExpiredMultipartUploadsPerBucket(
+                  expiredMultipartUploadsBuckets)
+              .build();
+
+      OMRequest omRequest = OMRequest.newBuilder()
+          .setCmdType(Type.AbortExpiredMultiPartUploads)
+          .setMultipartUploadsExpiredAbortRequest(request)
+          .setClientId(clientId.toString())
+          .build();
+
+      return omRequest;
+    }
+
+    private void submitRequest(OMRequest omRequest) {
+      try {
+        if (isRatisEnabled()) {
+          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
+
+          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+              .setClientId(clientId)
+              .setServerId(server.getRaftPeerId())
+              .setGroupId(server.getRaftGroupId())
+              .setCallId(runCount.get())
+              .setMessage(Message.valueOf(
+                  OMRatisHelper.convertRequestToByteString(omRequest)))
+              .setType(RaftClientRequest.writeRequestType())
+              .build();
+
+          server.submitRequest(omRequest, raftClientRequest);
+        } else {
+          ozoneManager.getOmServerProtocol().submitRequest(null,
+              omRequest);
+        }
+      } catch (ServiceException e) {
+        LOG.error("Expired multipart info delete request failed. " +
+            "Will retry at next run.", e);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 34bd721602..58dfeb7b83 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -33,8 +33,11 @@ import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.util.Time;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +48,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -56,6 +60,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
@@ -581,6 +587,11 @@ public class TestOmMetadataManager {
     testGetExpiredOpenKeysExcludeMPUKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
   }
 
+  @Test
+  public void testGetExpiredMultipartUploads() throws Exception {
+    testGetExpiredMPUs();
+  }
+
   private void testGetExpiredOpenKeys(BucketLayout bucketLayout)
       throws Exception {
     final String bucketName = UUID.randomUUID().toString();
@@ -747,6 +758,91 @@ public class TestOmMetadataManager {
         .isEmpty());
   }
 
+  private void testGetExpiredMPUs() throws Exception {
+    final String bucketName = UUID.randomUUID().toString();
+    final String volumeName = UUID.randomUUID().toString();
+    final int numExpiredMPUs = 4;
+    final int numUnexpiredMPUs = 1;
+    final int numPartsPerMPU = 5;
+    // To create expired keys, they will be assigned a creation time as
+    // old as the minimum expiration time.
+    final long expireThresholdMillis = ozoneConfiguration.getTimeDuration(
+        OZONE_OM_MPU_EXPIRE_THRESHOLD,
+        OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    final Duration expireThreshold = Duration.ofMillis(expireThresholdMillis);
+
+    final long expiredMPUCreationTime =
+        Instant.now().minus(expireThreshold).toEpochMilli();
+
+    // Add expired MPUs to multipartInfoTable.
+    // The method under test does not check for expired open keys in the
+    // cache, since they will be picked up once the cache is flushed.
+    Set<String> expiredMPUs = new HashSet<>();
+    for (int i = 0; i < numExpiredMPUs + numUnexpiredMPUs; i++) {
+      final long creationTime = i < numExpiredMPUs ?
+          expiredMPUCreationTime : Instant.now().toEpochMilli();
+
+      String uploadId = OMMultipartUploadUtils.getMultipartUploadId();
+      final OmMultipartKeyInfo mpuKeyInfo = OMRequestTestUtils
+          .createOmMultipartKeyInfo(uploadId, creationTime,
+              HddsProtos.ReplicationType.RATIS,
+              HddsProtos.ReplicationFactor.ONE, 0L);
+
+      String keyName = "expired" + i;
+      // Key info to construct the MPU DB key
+      final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
+          bucketName, keyName, HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.ONE, 0L, creationTime);
+
+
+      for (int j = 1; j <= numPartsPerMPU; j++) {
+        PartKeyInfo partKeyInfo = OMRequestTestUtils
+            .createPartKeyInfo(volumeName, bucketName, keyName, uploadId, j);
+        OMRequestTestUtils.addPart(partKeyInfo, mpuKeyInfo);
+      }
+
+      final String mpuDbKey = OMRequestTestUtils.addMultipartInfoToTable(
+          false, keyInfo, mpuKeyInfo, 0L, omMetadataManager);
+
+      expiredMPUs.add(mpuDbKey);
+    }
+
+    // Test retrieving fewer expire MPU parts than actually exist (exact).
+    List<ExpiredMultipartUploadsBucket>
+        someExpiredMPUs = omMetadataManager.getExpiredMultipartUploads(
+        expireThreshold,
+        (numExpiredMPUs * numPartsPerMPU) - (numPartsPerMPU));
+    List<String> names = getMultipartKeyNames(someExpiredMPUs);
+    assertEquals(numExpiredMPUs - 1, names.size());
+    assertTrue(expiredMPUs.containsAll(names));
+
+    // Test retrieving fewer expire MPU parts than actually exist (round up).
+    someExpiredMPUs = omMetadataManager.getExpiredMultipartUploads(
+        expireThreshold,
+        (numExpiredMPUs * numPartsPerMPU) - (numPartsPerMPU + 1));
+    names = getMultipartKeyNames(someExpiredMPUs);
+    assertEquals(numExpiredMPUs - 1, names.size());
+    assertTrue(expiredMPUs.containsAll(names));
+
+    // Test attempting to retrieving more expire MPU parts than actually exist.
+    List<ExpiredMultipartUploadsBucket> allExpiredMPUs =
+        omMetadataManager.getExpiredMultipartUploads(expireThreshold,
+            (numExpiredMPUs * numPartsPerMPU) + numPartsPerMPU);
+    names = getMultipartKeyNames(allExpiredMPUs);
+    assertEquals(numExpiredMPUs, names.size());
+    assertTrue(expiredMPUs.containsAll(names));
+
+    // Test retrieving exact amount of MPU parts than actually exist.
+    allExpiredMPUs =
+        omMetadataManager.getExpiredMultipartUploads(expireThreshold,
+            (numExpiredMPUs * numPartsPerMPU));
+    names = getMultipartKeyNames(allExpiredMPUs);
+    assertEquals(numExpiredMPUs, names.size());
+    assertTrue(expiredMPUs.containsAll(names));
+  }
+
   private List<String> getOpenKeyNames(
       Collection<OpenKeyBucket.Builder> openKeyBuckets) {
     return openKeyBuckets.stream()
@@ -756,6 +852,15 @@ public class TestOmMetadataManager {
         .collect(Collectors.toList());
   }
 
+  private List<String> getMultipartKeyNames(
+      List<ExpiredMultipartUploadsBucket> expiredMultipartUploadsBuckets) {
+    return expiredMultipartUploadsBuckets.stream()
+        .map(ExpiredMultipartUploadsBucket::getMultipartUploadsList)
+        .flatMap(List::stream)
+        .map(ExpiredMultipartUploadInfo::getName)
+        .collect(Collectors.toList());
+  }
+
   private void addKeysToOM(String volumeName, String bucketName,
       String keyName, int i) throws Exception {
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 4c64b6b645..3e3433fab4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
@@ -58,6 +59,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketI
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateTenantRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3VolumeContextRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListTenantRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartUploadAbortRequest;
@@ -71,6 +73,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .SetVolumePropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -380,7 +383,7 @@ public final class OMRequestTestUtils {
    * Add multipart info entry to the multipartInfoTable.
    * @throws Exception
    */
-  public static void addMultipartInfoToTable(boolean addToCache,
+  public static String addMultipartInfoToTable(boolean addToCache,
       OmKeyInfo omKeyInfo, OmMultipartKeyInfo omMultipartKeyInfo,
       long trxnLogIndex, OMMetadataManager omMetadataManager)
       throws IOException {
@@ -396,6 +399,33 @@ public final class OMRequestTestUtils {
 
     omMetadataManager.getMultipartInfoTable().put(ozoneDBKey,
         omMultipartKeyInfo);
+
+    return ozoneDBKey;
+  }
+
+  public static PartKeyInfo createPartKeyInfo(String volumeName,
+      String bucketName, String keyName, String uploadId, int partNumber) {
+    return PartKeyInfo.newBuilder()
+        .setPartNumber(partNumber)
+        .setPartName(OmMultipartUpload.getDbKey(
+            volumeName, bucketName, keyName, uploadId))
+        .setPartKeyInfo(KeyInfo.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setDataSize(100L) // Just set dummy size for testing
+            .setCreationTime(Time.now())
+            .setModificationTime(Time.now())
+            .setType(HddsProtos.ReplicationType.RATIS)
+            .setFactor(HddsProtos.ReplicationFactor.ONE).build()).build();
+  }
+
+  /**
+   * Append a {@link PartKeyInfo} to an {@link OmMultipartKeyInfo}.
+   */
+  public static void addPart(PartKeyInfo partKeyInfo,
+       OmMultipartKeyInfo omMultipartKeyInfo) {
+    omMultipartKeyInfo.addPartKeyInfo(partKeyInfo);
   }
 
   /**
@@ -684,17 +714,18 @@ public final class OMRequestTestUtils {
         BucketLayout.DEFAULT);
   }
 
-  public static void addBucketToDB(String volumeName, String bucketName,
-      OMMetadataManager omMetadataManager, BucketLayout bucketLayout)
+  public static OmBucketInfo addBucketToDB(String volumeName,
+      String bucketName, OMMetadataManager omMetadataManager,
+      BucketLayout bucketLayout)
       throws Exception {
-    addBucketToDB(omMetadataManager,
+    return addBucketToDB(omMetadataManager,
         OmBucketInfo.newBuilder().setVolumeName(volumeName)
             .setBucketName(bucketName)
             .setBucketLayout(bucketLayout)
     );
   }
 
-  public static void addBucketToDB(OMMetadataManager omMetadataManager,
+  public static OmBucketInfo addBucketToDB(OMMetadataManager omMetadataManager,
       OmBucketInfo.Builder builder) throws Exception {
 
     OmBucketInfo omBucketInfo = builder
@@ -709,6 +740,8 @@ public final class OMRequestTestUtils {
     omMetadataManager.getBucketTable().addCacheEntry(
         new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
         CacheValue.get(1L, omBucketInfo));
+
+    return omBucketInfo;
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3ExpiredMultipartUploadsAbortRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3ExpiredMultipartUploadsAbortRequest.java
new file mode 100644
index 0000000000..d23bf4b13a
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3ExpiredMultipartUploadsAbortRequest.java
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.UniqueId;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .Status;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.WithObjectID;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Tests S3ExpiredMultipartUploadsAbortRequest.
+ */
+@RunWith(Parameterized.class)
+public class TestS3ExpiredMultipartUploadsAbortRequest
+    extends TestS3MultipartRequest {
+
+  private final BucketLayout bucketLayout;
+
+  public TestS3ExpiredMultipartUploadsAbortRequest(BucketLayout bucketLayout) {
+    this.bucketLayout = bucketLayout;
+  }
+
+  @Override
+  public BucketLayout getBucketLayout() {
+    return bucketLayout;
+  }
+
+  @Parameters
+  public static Collection<BucketLayout> bucketLayouts() {
+    return Arrays.asList(
+        BucketLayout.DEFAULT,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED
+    );
+  }
+
+  /**
+   * Tests removing MPU from multipartInfoTable cache that never existed there.
+   * The operation should complete without errors.
+   * <p>
+   * This simulates a run of MPU cleanup service where a set
+   * of expired MPUs are identified and passed to the request,
+   * but before the request can process them, those MPUs are
+   * completed/aborted and therefore removed from the multipartInfoTable.
+   */
+  @Test
+  public void testAbortMPUsNotInTable() throws Exception {
+    final String volumeName = UUID.randomUUID().toString();
+    final String bucketName = UUID.randomUUID().toString();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, getBucketLayout());
+    List<String> mpuKeys = createMPUs(volumeName, bucketName, 5, 5);
+    abortExpiredMPUsFromCache(volumeName, bucketName, mpuKeys);
+    assertNotInMultipartInfoTable(mpuKeys);
+  }
+
+  /**
+   * Tests adding multiple MPUs to the multipartInfoTable,
+   * and updating the table cache to only remove some of them.
+   * MPUs not removed should still be present in the multipartInfoTable.
+   * Mixes which MPUs will be kept and deleted among different volumes and
+   * buckets.
+   */
+  @Test
+  public void testAbortSubsetOfMPUs() throws Exception {
+    final String volume1 = UUID.randomUUID().toString();
+    final String volume2 = UUID.randomUUID().toString();
+    final String bucket1 = UUID.randomUUID().toString();
+    final String bucket2 = UUID.randomUUID().toString();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume1, bucket1,
+        omMetadataManager, getBucketLayout());
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume1, bucket2,
+        omMetadataManager, getBucketLayout());
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume2, bucket2,
+        omMetadataManager, getBucketLayout());
+
+    List<String> v1b1MPUsToAbort =
+        createMPUs(volume1, bucket1, 3, 3);
+    List<String> v1b1MPUsToKeep =
+        createMPUs(volume1, bucket1, 3, 3);
+
+    List<String> v1b2MPUsToAbort =
+        createMPUs(volume1, bucket2, 3, 3);
+    List<String> v1b2MPUsToKeep =
+        createMPUs(volume1, bucket2, 2, 2);
+
+    List<String> v2b2MPUsToAbort =
+        createMPUs(volume2, bucket2, 2, 2);
+    List<String> v2b2MPUsToKeep =
+        createMPUs(volume2, bucket2, 3, 3);
+
+    abortExpiredMPUsFromCache(volume1, bucket1, v1b1MPUsToAbort);
+    abortExpiredMPUsFromCache(volume1, bucket2, v1b2MPUsToAbort);
+    abortExpiredMPUsFromCache(volume2, bucket2, v2b2MPUsToAbort);
+
+    assertNotInMultipartInfoTable(v1b1MPUsToAbort);
+    assertNotInMultipartInfoTable(v1b2MPUsToAbort);
+    assertNotInMultipartInfoTable(v2b2MPUsToAbort);
+
+    assertInMultipartInfoTable(v1b1MPUsToKeep);
+    assertInMultipartInfoTable(v1b2MPUsToKeep);
+    assertInMultipartInfoTable(v2b2MPUsToKeep);
+  }
+
+  /**
+   * Tests removing MPUs from the multipart info table cache that have higher
+   * updateID than the transactionID. Those MPUs should be ignored.
+   * It is OK if updateID equals to or less than transactionID.
+   * See {@link WithObjectID#setUpdateID(long, boolean)}.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAbortMPUsWithHigherUpdateID() throws Exception {
+    final String volumeName = UUID.randomUUID().toString();
+    final String bucketName = UUID.randomUUID().toString();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, getBucketLayout());
+
+    final long updateId = 200L;
+    final long transactionId = 100L;
+
+    // Used only to build the MPU db key
+    OmKeyInfo.Builder keyBuilder = new OmKeyInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName);
+
+    OmMultipartKeyInfo.Builder mpuBuilder = new OmMultipartKeyInfo.Builder()
+        .setReplicationConfig(ReplicationConfig.fromTypeAndFactor(
+            ReplicationType.RATIS, ReplicationFactor.THREE));
+
+    if (getBucketLayout().equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+      mpuBuilder.setParentID(UniqueId.next());
+    }
+
+    OmKeyInfo keyWithHigherUpdateID = keyBuilder
+        .setKeyName("key").build();
+    OmMultipartKeyInfo mpuWithHigherUpdateID = mpuBuilder
+        .setUpdateID(updateId)
+        .setUploadID(OMMultipartUploadUtils.getMultipartUploadId())
+        .build();
+
+    OmKeyInfo keyWithSameUpdateID = keyBuilder
+        .setKeyName("key2").build();
+    OmMultipartKeyInfo mpuWithSameUpdateID = mpuBuilder
+        .setUpdateID(transactionId)
+        .setUploadID(OMMultipartUploadUtils.getMultipartUploadId())
+        .build();
+
+    String mpuDBKeyWithHigherUpdateId = OMRequestTestUtils
+        .addMultipartInfoToTable(false,
+            keyWithHigherUpdateID, mpuWithHigherUpdateID,
+            mpuWithHigherUpdateID.getUpdateID(), omMetadataManager);
+
+    String mpuDBKeyWithSameUpdateId = OMRequestTestUtils
+        .addMultipartInfoToTable(false,
+            keyWithSameUpdateID, mpuWithSameUpdateID,
+            mpuWithSameUpdateID.getUpdateID(), omMetadataManager);
+
+
+    OMRequest omRequest = doPreExecute(createAbortExpiredMPURequest(
+        volumeName, bucketName, Arrays.asList(mpuDBKeyWithHigherUpdateId,
+            mpuDBKeyWithSameUpdateId)));
+    S3ExpiredMultipartUploadsAbortRequest expiredMultipartUploadsAbortRequest =
+        new S3ExpiredMultipartUploadsAbortRequest(omRequest);
+
+    OMClientResponse omClientResponse =
+        
expiredMultipartUploadsAbortRequest.validateAndUpdateCache(ozoneManager,
+            transactionId, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+
+    assertInMultipartInfoTable(Collections.singletonList(
+        mpuDBKeyWithHigherUpdateId));
+    assertNotInMultipartInfoTable(Collections.singletonList(
+        mpuDBKeyWithSameUpdateId));
+  }
+
+  /**
+   * Tests on cleaning up the MPUs whose open keys have been
+   * cleaned by open key clean up service prior to HDDS-9098.
+   * Where for normal MPU complete/abort request, the request
+   * should fail if the MPU open key doesn't exist in MPU table,
+   * aborting expired orphan MPUs should not fail.
+   */
+  @Test
+  public void testAbortOrphanMPUs() throws Exception {
+    final String volumeName = UUID.randomUUID().toString();
+    final String bucketName = UUID.randomUUID().toString();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, getBucketLayout());
+    List<String> mpuKeys = createMPUs(volumeName, bucketName, 5, 5);
+
+    // Remove the open MPU keys to simulate orphan MPU
+    removeFromOpenKeyTable(mpuKeys);
+
+    abortExpiredMPUsFromCache(volumeName, bucketName, mpuKeys);
+
+    assertNotInMultipartInfoTable(mpuKeys);
+  }
+
+  /**
+   * Tests metrics set by {@link S3ExpiredMultipartUploadsAbortRequest}.
+   * Submits a set of MPUs for abort where only some of the keys actually
+   * exist in the multipart info table, and asserts that the metrics count
+   * MPUs that were submitted for deletion versus those that were actually
+   * deleted.
+   * @throws Exception
+   */
+  @Test
+  public void testMetrics() throws Exception {
+    final String volume = UUID.randomUUID().toString();
+    final String bucket = UUID.randomUUID().toString();
+    final String key = UUID.randomUUID().toString();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volume, bucket,
+        omMetadataManager, getBucketLayout());
+
+    final int numExistentMPUs = 3;
+    final int numNonExistentMPUs = 5;
+    final int numParts = 5;
+
+    OMMetrics metrics = ozoneManager.getMetrics();
+    Assert.assertEquals(0, metrics.getNumExpiredMPUAbortRequests());
+    Assert.assertEquals(0, metrics.getNumOpenKeyDeleteRequestFails());
+    Assert.assertEquals(0, metrics.getNumExpiredMPUSubmittedForAbort());
+    Assert.assertEquals(0, metrics.getNumExpiredMPUPartsAborted());
+    Assert.assertEquals(0, metrics.getNumExpiredMPUAbortRequestFails());
+
+    List<String> existentMPUs =
+        createMPUs(volume, bucket, key, numExistentMPUs, numParts,
+            getBucketLayout());
+
+    List<String> nonExistentMPUs =
+        createMockMPUKeys(volume, bucket, key, numNonExistentMPUs);
+
+    abortExpiredMPUsFromCache(volume, bucket, existentMPUs, nonExistentMPUs);
+
+    assertNotInMultipartInfoTable(existentMPUs);
+    assertNotInMultipartInfoTable(nonExistentMPUs);
+
+    Assert.assertEquals(1, metrics.getNumExpiredMPUAbortRequests());
+    Assert.assertEquals(0,
+        metrics.getNumExpiredMPUAbortRequestFails());
+    Assert.assertEquals(numExistentMPUs + numNonExistentMPUs,
+        metrics.getNumExpiredMPUSubmittedForAbort());
+    Assert.assertEquals(numExistentMPUs,
+        metrics.getNumExpiredMPUAborted());
+    Assert.assertEquals(numExistentMPUs * numParts,
+        metrics.getNumExpiredMPUPartsAborted());
+  }
+
+  /**
+   * Constructs a new {@link S3ExpiredMultipartUploadsAbortRequest} objects,
+   * and calls its {@link S3ExpiredMultipartUploadsAbortRequest#preExecute}
+   * method with {@code originalOMRequest}. It verifies that
+   * {@code originalOMRequest} is modified after the call, and returns it.
+   * @throws Exception
+   */
+  private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception 
{
+    S3ExpiredMultipartUploadsAbortRequest expiredMultipartUploadsAbortRequest =
+        new S3ExpiredMultipartUploadsAbortRequest(originalOMRequest);
+
+    OMRequest modifiedOmRequest =
+        expiredMultipartUploadsAbortRequest.preExecute(ozoneManager);
+
+    // Will not be equal, as UserInfo will be set.
+    Assert.assertNotEquals(originalOMRequest, modifiedOmRequest);
+
+    return modifiedOmRequest;
+  }
+
+  private void abortExpiredMPUsFromCache(String volumeName, String bucketName,
+      List<String>... allMPUKeys) throws Exception {
+    abortExpiredMPUsFromCache(volumeName, bucketName,
+        Arrays.stream(allMPUKeys).flatMap(List::stream)
+            .collect(Collectors.toList()));
+  }
+
+
+  /**
+   * Runs the validate and update cache step of
+   * {@link S3ExpiredMultipartUploadsAbortRequest} to mark the MPUs
+   * as deleted in the multipartInfoTable cache.
+   * Asserts that the call's response status is {@link Status#OK}.
+   * @throws Exception
+   */
+  private void abortExpiredMPUsFromCache(String volumeName, String bucketName,
+      List<String> mpuKeys) throws Exception {
+
+    OMRequest omRequest =
+        doPreExecute(
+            createAbortExpiredMPURequest(volumeName, bucketName, mpuKeys));
+
+    S3ExpiredMultipartUploadsAbortRequest expiredMultipartUploadsAbortRequest =
+        new S3ExpiredMultipartUploadsAbortRequest(omRequest);
+
+    OMClientResponse omClientResponse =
+        expiredMultipartUploadsAbortRequest.validateAndUpdateCache(
+            ozoneManager, 100L, ozoneManagerDoubleBufferHelper);
+
+    Assert.assertEquals(Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+  }
+
+  private OMRequest createAbortExpiredMPURequest(String volumeName,
+      String bucketName, List<String> mpuKeysToAbort) {
+
+    List<ExpiredMultipartUploadInfo> expiredMultipartUploads = mpuKeysToAbort
+        .stream().map(name ->
+            ExpiredMultipartUploadInfo.newBuilder().setName(name).build())
+        .collect(Collectors.toList());
+    ExpiredMultipartUploadsBucket expiredMultipartUploadsBucket =
+        ExpiredMultipartUploadsBucket.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .addAllMultipartUploads(expiredMultipartUploads)
+            .build();
+
+    MultipartUploadsExpiredAbortRequest mpuExpiredAbortRequest =
+        MultipartUploadsExpiredAbortRequest.newBuilder()
+            .addExpiredMultipartUploadsPerBucket(expiredMultipartUploadsBucket)
+            .build();
+
+    return OMRequest.newBuilder()
+        .setMultipartUploadsExpiredAbortRequest(mpuExpiredAbortRequest)
+        .setCmdType(OzoneManagerProtocolProtos
+            .Type.AbortExpiredMultiPartUploads)
+        .setClientId(UUID.randomUUID().toString())
+        .build();
+  }
+
+  /**
+   * Create MPus with randomized key name.
+   */
+  private List<String> createMPUs(String volume, String bucket, int count,
+                                  int numParts) throws Exception {
+    return createMPUs(volume, bucket, null, count, numParts,
+        getBucketLayout());
+  }
+
+  /*
+   * Make MPUs with same key name and randomized upload ID.
+   * If key is specified, simulate scenarios where there are
+   * concurrent multipart uploads happening at the same time.
+   */
+  private List<String> createMPUs(String volume, String bucket,
+      String key, int count, int numParts, BucketLayout buckLayout)
+      throws Exception {
+    if (buckLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      return createMPUsWithFSO(volume, bucket, key, count, numParts);
+    } else {
+      return createMPUs(volume, bucket, key, count, numParts);
+    }
+  }
+
+  /**
+   * Make MPUs with same key name and randomized upload ID for FSO-enabled
+   * bucket.
+   * If key is specified, simulate scenarios where there are
+   * concurrent multipart uploads happening at the same time.
+   */
+  private List<String> createMPUsWithFSO(String volume, String bucket,
+      String key, int count, int numParts) throws Exception {
+    List<String> mpuKeys = new ArrayList<>();
+
+    long trxnLogIndex = 1L;
+
+    String dirName = "a/b/c/";
+
+    final long volumeId = omMetadataManager.getVolumeId(volume);
+    final long bucketId = omMetadataManager.getBucketId(volume, bucket);
+
+    for (int i = 0; i < count; i++) {
+      // Initiate MPU
+      final String keyName = dirName + (key != null ? key :
+          UUID.randomUUID().toString());
+
+      long parentID = OMRequestTestUtils.addParentsToDirTable(
+          volume, bucket, dirName, omMetadataManager);
+
+      OMRequest initiateMPURequest =
+          doPreExecuteInitiateMPUWithFSO(volume, bucket, keyName);
+
+      S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+          new S3InitiateMultipartUploadRequestWithFSO(initiateMPURequest,
+              BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+      OMClientResponse omClientResponse = s3InitiateMultipartUploadRequest
+          .validateAndUpdateCache(ozoneManager, trxnLogIndex,
+              ozoneManagerDoubleBufferHelper);
+
+      Assert.assertTrue(omClientResponse.getOMResponse().getStatus() ==
+          OzoneManagerProtocolProtos.Status.OK);
+
+      trxnLogIndex++;
+
+      String multipartUploadID = omClientResponse.getOMResponse()
+          .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+      String mpuKey = omMetadataManager.getMultipartKey(
+          volume, bucket, keyName, multipartUploadID);
+
+      String mpuOpenKey = OMMultipartUploadUtils
+          .getMultipartOpenKey(volume, bucket, keyName, multipartUploadID,
+              omMetadataManager, getBucketLayout());
+      Assert.assertNotNull(omMetadataManager.getOpenKeyTable(getBucketLayout())
+          .get(mpuOpenKey));
+
+      mpuKeys.add(mpuKey);
+
+      // Commit MPU parts
+      for (int j = 1; j <= numParts; j++) {
+        long clientID = UniqueId.next();
+        OMRequest commitMultipartRequest = doPreExecuteCommitMPU(
+            volume, bucket, keyName, clientID, multipartUploadID, j);
+
+        S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+            new S3MultipartUploadCommitPartRequestWithFSO(
+                commitMultipartRequest, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+        // Add key to open key table to be used in MPU commit processing
+        OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volume,
+            bucket, keyName, HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.ONE, parentID + j, parentID,
+            trxnLogIndex, Time.now(), true);
+        String fileName = OzoneFSUtils.getFileName(keyName);
+        OMRequestTestUtils.addFileToKeyTable(true, false,
+            fileName, omKeyInfo, clientID, trxnLogIndex, omMetadataManager);
+
+        OMClientResponse commitResponse =
+            s3MultipartUploadCommitPartRequest.validateAndUpdateCache(
+                ozoneManager, trxnLogIndex, ozoneManagerDoubleBufferHelper);
+        trxnLogIndex++;
+
+        Assert.assertTrue(commitResponse.getOMResponse().getStatus() ==
+            OzoneManagerProtocolProtos.Status.OK);
+
+        // MPU part open key should be deleted after commit
+        String partKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            parentID, fileName, clientID);
+        Assert.assertNull(
+            omMetadataManager.getOpenKeyTable(getBucketLayout()).get(partKey));
+      }
+    }
+
+    return mpuKeys;
+  }
+
+
+  /**
+   * Make MPUs with same key name and randomized upload ID for LEGACY/OBS
+   * bucket.
+   * If key is specified, simulate scenarios where there are
+   * concurrent multipart uploads happening at the same time.
+   */
+  private List<String> createMPUs(String volume, String bucket,
+      String key, int count, int numParts) throws Exception {
+    List<String> mpuKeys = new ArrayList<>();
+
+    long trxnLogIndex = 1L;
+
+    for (int i = 0; i < count; i++) {
+      // Initiate MPU
+      final String keyName = key != null ? key : UUID.randomUUID().toString();
+      OMRequest initiateMPURequest  =
+          doPreExecuteInitiateMPU(volume, bucket, keyName);
+
+      S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+          getS3InitiateMultipartUploadReq(initiateMPURequest);
+
+      OMClientResponse omClientResponse = s3InitiateMultipartUploadRequest
+          .validateAndUpdateCache(ozoneManager, trxnLogIndex,
+              ozoneManagerDoubleBufferHelper);
+
+      Assert.assertTrue(omClientResponse.getOMResponse().getStatus() ==
+          OzoneManagerProtocolProtos.Status.OK);
+
+      trxnLogIndex++;
+
+      String multipartUploadID = omClientResponse.getOMResponse()
+          .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+      String mpuKey = omMetadataManager.getMultipartKey(
+          volume, bucket, keyName, multipartUploadID);
+
+      String mpuOpenKey = OMMultipartUploadUtils
+          .getMultipartOpenKey(volume, bucket, keyName, multipartUploadID,
+              omMetadataManager, getBucketLayout());
+      Assert.assertNotNull(omMetadataManager.getOpenKeyTable(getBucketLayout())
+          .get(mpuOpenKey));
+
+      mpuKeys.add(mpuKey);
+
+      // Commit MPU parts
+      for (int j = 1; j <= numParts; j++) {
+        long clientID = UniqueId.next();
+        OMRequest commitMultipartRequest = doPreExecuteCommitMPU(
+            volume, bucket, keyName, clientID, multipartUploadID, j);
+
+        S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+            getS3MultipartUploadCommitReq(commitMultipartRequest);
+
+        // Add key to open key table to be used in MPU commit processing
+        OMRequestTestUtils.addKeyToTable(
+            true, true,
+            volume, bucket, keyName, clientID, 
HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+        OMClientResponse commitResponse =
+            s3MultipartUploadCommitPartRequest.validateAndUpdateCache(
+                ozoneManager, trxnLogIndex, ozoneManagerDoubleBufferHelper);
+        trxnLogIndex++;
+
+        Assert.assertTrue(commitResponse.getOMResponse().getStatus() ==
+            OzoneManagerProtocolProtos.Status.OK);
+
+        // MPU part open key should be deleted after commit
+        String partKey = omMetadataManager.getOpenKey(volume, bucket, keyName,
+            clientID);
+        Assert.assertNull(
+            omMetadataManager.getOpenKeyTable(getBucketLayout()).get(partKey));
+      }
+    }
+
+    return mpuKeys;
+  }
+
+  /**
+   * Create mock MPU keys that do not actuall exist in the multipartInfoTable.
+   */
+  private List<String> createMockMPUKeys(String volume, String bucket,
+                                         String key, int count) {
+    List<String> mpuKeys = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      final String keyName = key != null ? key : UUID.randomUUID().toString();
+      String multipartUploadID = OMMultipartUploadUtils.getMultipartUploadId();
+      String mpuKey = omMetadataManager.getMultipartKey(
+          volume, bucket, keyName, multipartUploadID);
+      mpuKeys.add(mpuKey);
+    }
+    return mpuKeys;
+  }
+
+  private void assertInMultipartInfoTable(List<String> mpuKeys)
+      throws Exception {
+    for (String mpuKey: mpuKeys) {
+      Assert.assertTrue(omMetadataManager.getMultipartInfoTable()
+          .isExist(mpuKey));
+    }
+  }
+
+  private void assertNotInMultipartInfoTable(List<String> mpuKeys)
+      throws Exception {
+    for (String mpuKey: mpuKeys) {
+      Assert.assertFalse(omMetadataManager.getMultipartInfoTable()
+          .isExist(mpuKey));
+    }
+  }
+
+  private void assertNotInOpenKeyTable(List<String> mpuOpenKeys)
+      throws Exception {
+    for (String mpuOpenKey: mpuOpenKeys) {
+      Assert.assertFalse(omMetadataManager.getOpenKeyTable(getBucketLayout())
+          .isExist(mpuOpenKey));
+    }
+  }
+
+  private void assertInOpenKeyTable(List<String> mpuOpenKeys)
+      throws Exception {
+    for (String mpuOpenKey: mpuOpenKeys) {
+      Assert.assertTrue(omMetadataManager.getOpenKeyTable(getBucketLayout())
+          .isExist(mpuOpenKey));
+    }
+  }
+
+  /**
+   * From the MPU DB keys, we will remove the corresponding MPU open keys
+   * from the openKeyTable. This is used to simulate orphan MPU keys.
+   */
+  private void removeFromOpenKeyTable(List<String> mpuKeys)
+      throws Exception {
+    List<OmMultipartUpload> omMultipartUploads = mpuKeys.stream()
+        .map(OmMultipartUpload::from)
+        .collect(Collectors.toList());
+
+    List<String> mpuOpenKeys = new ArrayList<>();
+
+    for (OmMultipartUpload omMultipartUpload: omMultipartUploads) {
+      mpuOpenKeys.add(OMMultipartUploadUtils
+          .getMultipartOpenKey(
+              omMultipartUpload.getVolumeName(),
+              omMultipartUpload.getBucketName(),
+              omMultipartUpload.getKeyName(),
+              omMultipartUpload.getUploadId(),
+              omMetadataManager,
+              getBucketLayout()));
+    }
+
+    assertInOpenKeyTable(mpuOpenKeys);
+    for (String mpuOpenKey: mpuOpenKeys) {
+      omMetadataManager.getOpenKeyTable(getBucketLayout())
+          .addCacheEntry(new CacheKey<>(mpuOpenKey),
+              new CacheValue<>(Optional.absent(), 100L));
+      omMetadataManager.getOpenKeyTable(getBucketLayout())
+          .delete(mpuOpenKey);
+    }
+    assertNotInOpenKeyTable(mpuOpenKeys);
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 4a2ced8e60..afe5a00add 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
 import org.junit.After;
 import org.junit.Assert;
@@ -55,6 +56,7 @@ import 
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
@@ -79,7 +81,7 @@ public class TestS3MultipartRequest {
 
   @Before
   public void setup() throws Exception {
-    ozoneManager = Mockito.mock(OzoneManager.class);
+    ozoneManager = mock(OzoneManager.class);
     omMetrics = OMMetrics.create();
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
@@ -88,12 +90,12 @@ public class TestS3MultipartRequest {
         ozoneManager);
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
-    auditLogger = Mockito.mock(AuditLogger.class);
+    auditLogger = mock(AuditLogger.class);
     ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmMetadataReader =
-        Mockito.mock(ReferenceCounted.class);
+        mock(ReferenceCounted.class);
     when(ozoneManager.getOmMetadataReader()).thenReturn(rcOmMetadataReader);
     // Init OmMetadataReader to let the test pass
-    OmMetadataReader omMetadataReader = Mockito.mock(OmMetadataReader.class);
+    OmMetadataReader omMetadataReader = mock(OmMetadataReader.class);
     when(omMetadataReader.isNativeAuthorizerEnabled()).thenReturn(true);
     when(rcOmMetadataReader.get()).thenReturn(omMetadataReader);
     when(ozoneManager.getAccessAuthorizer())
@@ -110,6 +112,10 @@ public class TestS3MultipartRequest {
               Pair.of(args.getVolumeName(), args.getBucketName()),
               Pair.of(args.getVolumeName(), args.getBucketName()));
         });
+    OMLayoutVersionManager lvm = mock(OMLayoutVersionManager.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
+    when(ozoneManager.isRatisEnabled()).thenReturn(true);
   }
 
 
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
index 12f0c200dd..25cedec80f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
@@ -64,7 +64,7 @@ public class TestS3MultipartUploadCommitPartRequestWithFSO
   @Override
   protected void addKeyToOpenKeyTable(String volumeName, String bucketName,
       String keyName, long clientID) throws Exception {
-    long txnLogId = 10000;
+    long txnLogId = 0L;
     OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
             bucketName, keyName, HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, parentID + 1, parentID,
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
index 87ba9a2a6e..19ae723fcf 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
@@ -71,7 +71,7 @@ public class TestS3MultipartUploadCompleteRequestWithFSO
 
     // add parentDir to dirTable
     long parentID = getParentID(volumeName, bucketName, keyName);
-    long txnId = 50;
+    long txnId = 2;
     long objectId = parentID + 1;
 
     OmKeyInfo omKeyInfoFSO =
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3ExpiredMultipartUploadsAbortResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3ExpiredMultipartUploadsAbortResponse.java
new file mode 100644
index 0000000000..3a7a8a92a4
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3ExpiredMultipartUploadsAbortResponse.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.utils.UniqueId;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests S3ExpiredMultipartUploadsAbortResponse.
+ */
+@RunWith(Parameterized.class)
+public class TestS3ExpiredMultipartUploadsAbortResponse
+    extends TestS3MultipartResponse {
+
+  private final BucketLayout bucketLayout;
+
+  public TestS3ExpiredMultipartUploadsAbortResponse(
+      BucketLayout bucketLayout) {
+    this.bucketLayout = bucketLayout;
+  }
+
+  @Override
+  public BucketLayout getBucketLayout() {
+    return bucketLayout;
+  }
+
+  @Parameters
+  public static Collection<BucketLayout> bucketLayouts() {
+    return Arrays.asList(
+        BucketLayout.DEFAULT,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED
+    );
+  }
+
+  /**
+   * Tests deleting MPUs from multipartInfoTable when the MPU have no
+   * associated parts.
+   * @throws Exception
+   */
+  @Test
+  public void testAddToDBBatchWithEmptyParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort =
+        addMPUsToDB(volumeName, 3);
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToKeep =
+        addMPUsToDB(volumeName, 3);
+
+    createAndCommitResponse(mpusToAbort, Status.OK);
+
+    for (List<OmMultipartAbortInfo> abortInfos: mpusToAbort.values()) {
+      for (OmMultipartAbortInfo abortInfo: abortInfos) {
+        // MPUs with no associated parts should have been removed
+        // from the multipartInfoTable. Since there are no parts
+        // these parts will not added to the deletedTable
+        Assert.assertFalse(omMetadataManager.getMultipartInfoTable().isExist(
+            abortInfo.getMultipartKey()));
+        Assert.assertFalse(omMetadataManager.getOpenKeyTable(getBucketLayout())
+            .isExist(abortInfo.getMultipartOpenKey()));
+      }
+    }
+
+    for (List<OmMultipartAbortInfo> abortInfos: mpusToKeep.values()) {
+      for (OmMultipartAbortInfo abortInfo: abortInfos) {
+        // These MPUs should not have been removed from the multipartInfoTable
+        Assert.assertTrue(omMetadataManager.getMultipartInfoTable().isExist(
+            abortInfo.getMultipartKey()));
+        Assert.assertTrue(omMetadataManager.getOpenKeyTable(getBucketLayout())
+            .isExist(abortInfo.getMultipartOpenKey()));
+      }
+    }
+  }
+
+
+  /**
+   * Tests deleting MPUs from multipartInfoTable when the MPU have some MPU
+   * parts.
+   * @throws Exception
+   */
+  @Test
+  public void testAddToDBBatchWithNonEmptyParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort =
+        addMPUsToDB(volumeName, 3, 3);
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToKeep =
+        addMPUsToDB(volumeName, 3, 3);
+
+    createAndCommitResponse(mpusToAbort, Status.OK);
+
+    for (List<OmMultipartAbortInfo> abortInfos: mpusToAbort.values()) {
+      for (OmMultipartAbortInfo abortInfo: abortInfos) {
+        // All the associated parts of the MPU should have been moved from
+        // the multipartInfoTable to the deleted table.
+        Assert.assertFalse(omMetadataManager.getMultipartInfoTable().isExist(
+            abortInfo.getMultipartKey()));
+        Assert.assertFalse(omMetadataManager.getOpenKeyTable(getBucketLayout())
+            .isExist(abortInfo.getMultipartOpenKey()));
+
+        for (PartKeyInfo partKeyInfo: abortInfo
+            .getOmMultipartKeyInfo().getPartKeyInfoMap()) {
+          OmKeyInfo currentPartKeyInfo =
+              OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+          String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+              currentPartKeyInfo.getObjectID(), abortInfo.getMultipartKey());
+          Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+              deleteKey));
+        }
+
+      }
+    }
+
+    for (List<OmMultipartAbortInfo> abortInfos: mpusToKeep.values()) {
+      for (OmMultipartAbortInfo abortInfo: abortInfos) {
+        // These MPUs should not have been removed from the multipartInfoTable
+        // and its parts should not be in the deletedTable.
+        Assert.assertTrue(omMetadataManager.getMultipartInfoTable().isExist(
+            abortInfo.getMultipartKey()));
+        Assert.assertTrue(omMetadataManager.getOpenKeyTable(getBucketLayout())
+            .isExist(abortInfo.getMultipartOpenKey()));
+
+        for (PartKeyInfo partKeyInfo: abortInfo
+            .getOmMultipartKeyInfo().getPartKeyInfoMap()) {
+          OmKeyInfo currentPartKeyInfo =
+              OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+          String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+              currentPartKeyInfo.getObjectID(), abortInfo.getMultipartKey());
+          Assert.assertFalse(omMetadataManager.getDeletedTable().isExist(
+              deleteKey));
+        }
+      }
+    }
+  }
+
+  /**
+   * Tests attempting aborting MPUs from the multipartINfoTable when the
+   * submitted repsponse has an error status. In this case, no changes to the
+   * DB should be mmade.
+   */
+  @Test
+  public void testAddToDBBatchWithErrorResponse() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort =
+        addMPUsToDB(volumeName, 3, 3);
+
+    createAndCommitResponse(mpusToAbort, Status.INTERNAL_ERROR);
+
+    for (List<OmMultipartAbortInfo> multipartAbortInfos:
+        mpusToAbort.values()) {
+      for (OmMultipartAbortInfo multipartAbortInfo: multipartAbortInfos) {
+        // if an error occurs in the response, the batch operation moving MPUs
+        // parts from the multipartInfoTable to the deleted table should not be
+        // committed
+        Assert.assertTrue(
+            omMetadataManager.getMultipartInfoTable().isExist(
+                multipartAbortInfo.getMultipartKey()));
+
+        for (PartKeyInfo partKeyInfo: multipartAbortInfo
+            .getOmMultipartKeyInfo().getPartKeyInfoMap()) {
+          OmKeyInfo currentPartKeyInfo =
+              OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+          String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+              currentPartKeyInfo.getObjectID(), multipartAbortInfo
+                  .getMultipartKey());
+          Assert.assertFalse(omMetadataManager.getDeletedTable()
+              .isExist(deleteKey));
+        }
+      }
+    }
+
+  }
+
+
+  /**
+   * Constructs an {@link S3ExpiredMultipartUploadsAbortResponse} to abort
+   * MPus in (@code mpusToAbort}, with the completion status set to
+   * {@code status}. If {@code status} is {@link Status#OK}, the MPUs to
+   * abort will be added to a batch operation and committed to the database.
+   *
+   * @throws Exception
+   */
+  private void createAndCommitResponse(
+      Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort,
+      Status status) throws Exception {
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setStatus(status)
+        .setCmdType(OzoneManagerProtocolProtos.Type.
+            AbortExpiredMultiPartUploads)
+        .build();
+
+    S3ExpiredMultipartUploadsAbortResponse response = new
+        S3ExpiredMultipartUploadsAbortResponse(omResponse, mpusToAbort, true);
+
+    // Operations are only added to the batch by this method when status is OK
+    response.checkAndUpdateDB(omMetadataManager, batchOperation);
+
+    // If status is not OK, this will do nothing.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+  }
+
+  private Map<OmBucketInfo, List<OmMultipartAbortInfo>> addMPUsToDB(
+      String volume, int numMPUs) throws Exception {
+    return addMPUsToDB(volume, numMPUs, 0);
+  }
+
+  /**
+   *
+   * Creates {@code numMPUs} multipart uploads with random names, each with
+   * {@code numParts} parts, maps each one to a new {@link OmKeyInfo} to be
+   * added to the open key table and {@link OmMultipartKeyInfo} to be added
+   * to the multipartInfoTable. Return list of
+   * {@link OmMultipartAbortInfo} for each unique bucket.
+   *
+   */
+  private Map<OmBucketInfo, List<OmMultipartAbortInfo>> addMPUsToDB(
+      String volume, int numMPUs, int numParts)
+      throws Exception {
+
+    Map<OmBucketInfo, List<OmMultipartAbortInfo>> newMPUs = new HashMap<>();
+
+    OMRequestTestUtils.addVolumeToDB(volume, omMetadataManager);
+
+    for (int i = 0; i < numMPUs; i++) {
+      String bucket = UUID.randomUUID().toString();
+      String keyName = UUID.randomUUID().toString();
+      long objectID = UniqueId.next();
+      String uploadId = OMMultipartUploadUtils.getMultipartUploadId();
+
+      OmBucketInfo omBucketInfo = OMRequestTestUtils.addBucketToDB(volume,
+          bucket, omMetadataManager, getBucketLayout());
+
+      final OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volume,
+          bucket, keyName,
+          HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE,
+          0L, Time.now(), true);
+
+      if (getBucketLayout().equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+        omKeyInfo.setParentObjectID(omBucketInfo.getObjectID());
+        omKeyInfo.setFileName(OzoneFSUtils.getFileName(keyName));
+        OMRequestTestUtils.addMultipartKeyToOpenFileTable(false,
+            omKeyInfo.getFileName(), omKeyInfo, uploadId, 0L,
+            omMetadataManager);
+      } else {
+        OMRequestTestUtils.addMultipartKeyToOpenKeyTable(false,
+            omKeyInfo, uploadId, 0L, omMetadataManager);
+      }
+
+      OmMultipartKeyInfo multipartKeyInfo = OMRequestTestUtils
+          .createOmMultipartKeyInfo(uploadId, Time.now(),
+              ReplicationType.RATIS, ReplicationFactor.THREE, objectID);
+
+      for (int j = 1; j <= numParts; j++) {
+        PartKeyInfo part = createPartKeyInfo(volume, bucket, keyName, j);
+        addPart(j, part, multipartKeyInfo);
+      }
+
+      String multipartKey = OMRequestTestUtils.addMultipartInfoToTable(false,
+          omKeyInfo, multipartKeyInfo, 0L, omMetadataManager);
+      String multipartOpenKey = OMMultipartUploadUtils
+          .getMultipartOpenKey(volume, bucket, keyName, uploadId,
+              omMetadataManager, getBucketLayout());
+
+      OmMultipartAbortInfo omMultipartAbortInfo = new OmMultipartAbortInfo
+          .Builder()
+          .setMultipartKey(multipartKey)
+          .setMultipartOpenKey(multipartOpenKey)
+          .setBucketLayout(getBucketLayout())
+          .setMultipartKeyInfo(multipartKeyInfo)
+          .build();
+
+      newMPUs.computeIfAbsent(omBucketInfo, k -> new ArrayList<>())
+          .add(omMultipartAbortInfo);
+    }
+
+    return newMPUs;
+  }
+
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestMultipartUploadCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestMultipartUploadCleanupService.java
new file mode 100644
index 0000000000..b65cfd0484
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestMultipartUploadCleanupService.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Multipart Upload Cleanup Service.
+ */
+public class TestMultipartUploadCleanupService {
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMultipartUploadCleanupService.class);
+
+  private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500);
+  private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000);
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+
+  @BeforeAll
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @BeforeEach
+  public void createConfAndInitValues(@TempDir Path tempDir) throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString());
+    conf.setTimeDuration(OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL,
+        SERVICE_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_OM_MPU_EXPIRE_THRESHOLD,
+        EXPIRE_THRESHOLD.toMillis(), TimeUnit.MILLISECONDS);
+    conf.setInt(OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK, 1000);
+    conf.setQuietMode(false);
+    OmTestManagers omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @AfterEach
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  /**
+   * Create a bunch incomplete/inflight multipart upload info. Then we start
+   * the MultipartUploadCleanupService. We make sure that all the multipart
+   * upload info is picked up and aborted by OzoneManager.
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @CsvSource({
+      "99,0",
+      "0, 88",
+      "66, 77"
+  })
+  @Timeout(300)
+  public void checkIfCleanupServiceIsDeletingExpiredMultipartUpload(
+      int numDEFKeys, int numFSOKeys) throws Exception {
+
+    MultipartUploadCleanupService multipartUploadCleanupService =
+        (MultipartUploadCleanupService)
+            keyManager.getMultipartUploadCleanupService();
+
+    multipartUploadCleanupService.suspend();
+    // wait for submitted tasks to complete
+    Thread.sleep(SERVICE_INTERVAL.toMillis());
+    final long oldMpuInfoCount =
+        multipartUploadCleanupService.getSubmittedMpuInfoCount();
+    final long oldRunCount =
+        multipartUploadCleanupService.getRunCount();
+
+    createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT);
+    createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    // wait for MPU info to expire
+    Thread.sleep(EXPIRE_THRESHOLD.toMillis());
+
+    assertFalse(keyManager.getExpiredMultipartUploads(EXPIRE_THRESHOLD,
+        10000).isEmpty());
+
+    multipartUploadCleanupService.resume();
+
+    GenericTestUtils.waitFor(() -> multipartUploadCleanupService
+            .getRunCount() > oldRunCount,
+        (int) SERVICE_INTERVAL.toMillis(),
+        5 * (int) SERVICE_INTERVAL.toMillis());
+
+    // wait for requests to complete
+    Thread.sleep(10 * SERVICE_INTERVAL.toMillis());
+
+    assertTrue(multipartUploadCleanupService.getSubmittedMpuInfoCount() >=
+        oldMpuInfoCount + numDEFKeys + numFSOKeys);
+    assertTrue(keyManager.getExpiredMultipartUploads(EXPIRE_THRESHOLD,
+        10000).isEmpty());
+
+  }
+
+  private void createIncompleteMPUKeys(int mpuKeyCount,
+      BucketLayout bucketLayout) throws IOException {
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    for (int x = 0; x < mpuKeyCount; x++) {
+      if (RandomUtils.nextBoolean()) {
+        bucket = UUID.randomUUID().toString();
+        if (RandomUtils.nextBoolean()) {
+          volume = UUID.randomUUID().toString();
+        }
+      }
+      String key = UUID.randomUUID().toString();
+      createVolumeAndBucket(volume, bucket, bucketLayout);
+
+      final int numParts = RandomUtils.nextInt(0, 5);
+      // Create the MPU key
+      createIncompleteMPUKey(volume, bucket, key, numParts);
+    }
+
+  }
+
+  private void createVolumeAndBucket(String volumeName, String bucketName,
+      BucketLayout bucketLayout) throws IOException {
+    // cheat here, just create a volume and bucket entry so that we can
+    // create the keys, we put the same data for key and value since the
+    // system does not decode the object
+    OMRequestTestUtils.addVolumeToOM(omMetadataManager,
+        OmVolumeArgs.newBuilder()
+            .setOwnerName("o")
+            .setAdminName("a")
+            .setVolume(volumeName)
+            .build());
+
+    OMRequestTestUtils.addBucketToOM(omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(bucketLayout)
+            .build());
+  }
+
+  /**
+   * Create inflight multipart upload that are not completed / aborted yet.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @throws IOException
+   */
+  private void createIncompleteMPUKey(String volumeName, String bucketName,
+      String keyName, int numParts) throws IOException {
+    // Initiate MPU
+    OmKeyArgs keyArgs =
+        new OmKeyArgs.Builder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setAcls(Collections.emptyList())
+            .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
+            .setLocationInfoList(new ArrayList<>())
+            .build();
+
+    OmMultipartInfo omMultipartInfo = writeClient.
+        initiateMultipartUpload(keyArgs);
+
+    // Commit MPU parts
+    for (int i = 1; i <= numParts; i++) {
+      OmKeyArgs partKeyArgs =
+          new OmKeyArgs.Builder()
+              .setVolumeName(volumeName)
+              .setBucketName(bucketName)
+              .setKeyName(keyName)
+              .setIsMultipartKey(true)
+              .setMultipartUploadID(omMultipartInfo.getUploadID())
+              .setMultipartUploadPartNumber(i)
+              .setAcls(Collections.emptyList())
+              .setReplicationConfig(
+                  StandaloneReplicationConfig.getInstance(ONE))
+              .build();
+
+      OpenKeySession openKey = writeClient.openKey(partKeyArgs);
+
+      OmKeyArgs commitPartKeyArgs =
+          new OmKeyArgs.Builder()
+              .setVolumeName(volumeName)
+              .setBucketName(bucketName)
+              .setKeyName(keyName)
+              .setIsMultipartKey(true)
+              .setMultipartUploadID(omMultipartInfo.getUploadID())
+              .setMultipartUploadPartNumber(i)
+              .setAcls(Collections.emptyList())
+              .setReplicationConfig(
+                  StandaloneReplicationConfig.getInstance(ONE))
+              .setLocationInfoList(Collections.emptyList())
+              .build();
+
+      writeClient.commitMultipartUploadPart(commitPartKeyArgs, 
openKey.getId());
+    }
+
+    // MPU key is not completed / aborted, so it's still in the
+    // multipartInfoTable
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to