This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6805fd4c36 HDDS-9194. Implement cleanup service for MultipartInfoTable
(#5325)
6805fd4c36 is described below
commit 6805fd4c36b53884fb1feb252389b00b6fcef590
Author: Ivan Andika <[email protected]>
AuthorDate: Tue Oct 3 13:26:37 2023 +0800
HDDS-9194. Implement cleanup service for MultipartInfoTable (#5325)
---
.../common/src/main/resources/ozone-default.xml | 49 ++
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 1 +
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 20 +
.../ozone/om/helpers/OmMultipartAbortInfo.java | 116 ++++
.../hadoop/ozone/om/helpers/OmMultipartUpload.java | 14 +
.../src/main/proto/OmClientProtocol.proto | 23 +
.../apache/hadoop/ozone/om/OMMetadataManager.java | 24 +-
.../org/apache/hadoop/ozone/audit/OMAction.java | 4 +-
.../org/apache/hadoop/ozone/om/KeyManager.java | 26 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 57 +-
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 51 ++
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 51 ++
.../om/ratis/utils/OzoneManagerRatisUtils.java | 3 +
.../S3ExpiredMultipartUploadsAbortRequest.java | 335 +++++++++++
.../multipart/S3MultipartUploadAbortRequest.java | 8 +-
.../S3MultipartUploadAbortRequestWithFSO.java | 25 -
.../om/request/util/OMMultipartUploadUtils.java | 49 ++
.../AbstractS3MultipartAbortResponse.java | 156 +++++
.../S3ExpiredMultipartUploadsAbortResponse.java | 80 +++
.../multipart/S3MultipartUploadAbortResponse.java | 43 +-
.../om/service/MultipartUploadCleanupService.java | 234 ++++++++
.../hadoop/ozone/om/TestOmMetadataManager.java | 105 ++++
.../ozone/om/request/OMRequestTestUtils.java | 43 +-
.../TestS3ExpiredMultipartUploadsAbortRequest.java | 666 +++++++++++++++++++++
.../s3/multipart/TestS3MultipartRequest.java | 14 +-
...tS3MultipartUploadCommitPartRequestWithFSO.java | 2 +-
...estS3MultipartUploadCompleteRequestWithFSO.java | 2 +-
...TestS3ExpiredMultipartUploadsAbortResponse.java | 325 ++++++++++
.../service/TestMultipartUploadCleanupService.java | 262 ++++++++
29 files changed, 2694 insertions(+), 94 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 919540870c..3dd65220d7 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1391,6 +1391,55 @@
</description>
</property>
+ <property>
+ <name>ozone.om.open.mpu.cleanup.service.interval</name>
+ <value>24h</value>
+ <tag>OZONE, OM, PERFORMANCE</tag>
+ <description>
+ A background job that periodically checks inactive multipart info
+ send multipart upload abort requests for them.
+ This entry controls the interval of this
+ cleanup check. Unit could be defined with postfix (ns,ms,s,m,h,d)
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.open.mpu.cleanup.service.timeout</name>
+ <value>300s</value>
+ <tag>OZONE, OM, PERFORMANCE</tag>
+ <description>
+ A timeout value of multipart upload cleanup service. If this is set
+ greater than 0, the service will stop waiting for the multipart info
abort
+ completion after this time. If timeout happens to a large proportion of
+ multipart aborts, this value needs to be increased or
+ ozone.om.open.key.cleanup.limit.per.task should be decreased.
+ Unit could be defined with postfix (ns,ms,s,m,h,d)
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.open.mpu.expire.threshold</name>
+ <value>30d</value>
+ <tag>OZONE, OM, PERFORMANCE</tag>
+ <description>
+ Controls how long multipart upload is considered active. Specifically,
if a multipart info
+ has been ongoing longer than the value of this config entry, that
multipart info is considered as
+ expired (e.g. due to client crash). Unit could be defined with postfix
(ns,ms,s,m,h,d)
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.open.mpu.parts.cleanup.limit.per.task</name>
+ <value>0</value>
+ <tag>OZONE, OM, PERFORMANCE</tag>
+ <description>
+ The maximum number of parts, rounded up to the nearest
+ number of expired multipart upload. This property is used
+ to approximately throttle the number of MPU parts sent
+ to the OM.
+ </description>
+ </property>
+
<property>
<name>hdds.rest.rest-csrf.enabled</name>
<value>false</value>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 0e0a89a7a2..9cfff8a1e5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -321,6 +321,7 @@ public final class OmUtils {
case SnapshotPurge:
case RecoverLease:
case SetTimes:
+ case AbortExpiredMultiPartUploads:
case UnknownCommand:
return false;
case EchoRPC:
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index ab110f21f3..ef40003189 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -142,6 +142,26 @@ public final class OMConfigKeys {
public static final int OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT =
1000;
+ public static final String OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL =
+ "ozone.om.open.mpu.cleanup.service.interval";
+ public static final String
+ OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT = "24h";
+
+ public static final String OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT =
+ "ozone.om.open.mpu.cleanup.service.timeout";
+ public static final String OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT_DEFAULT
+ = "300s";
+
+ public static final String OZONE_OM_MPU_EXPIRE_THRESHOLD =
+ "ozone.om.open.mpu.expire.threshold";
+ public static final String OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT =
+ "30d";
+
+ public static final String OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK =
+ "ozone.om.open.mpu.parts.cleanup.limit.per.task";
+ public static final int OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK_DEFAULT =
+ 0;
+
public static final String OZONE_OM_METRICS_SAVE_INTERVAL =
"ozone.om.save.metrics.interval";
public static final String OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT = "5m";
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java
new file mode 100644
index 0000000000..245fc773f6
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import java.util.Objects;
+
+/**
+ * This class contains the necessary information to abort MPU keys.
+ */
+public final class OmMultipartAbortInfo {
+
+ private final String multipartKey;
+ private final String multipartOpenKey;
+ private final OmMultipartKeyInfo omMultipartKeyInfo;
+ private final BucketLayout bucketLayout;
+
+ private OmMultipartAbortInfo(String multipartKey, String multipartOpenKey,
+ OmMultipartKeyInfo omMultipartKeyInfo, BucketLayout bucketLayout) {
+ this.multipartKey = multipartKey;
+ this.multipartOpenKey = multipartOpenKey;
+ this.omMultipartKeyInfo = omMultipartKeyInfo;
+ this.bucketLayout = bucketLayout;
+ }
+
+ public String getMultipartKey() {
+ return multipartKey;
+ }
+
+ public String getMultipartOpenKey() {
+ return multipartOpenKey;
+ }
+
+ public OmMultipartKeyInfo getOmMultipartKeyInfo() {
+ return omMultipartKeyInfo;
+ }
+
+ public BucketLayout getBucketLayout() {
+ return bucketLayout;
+ }
+
+ /**
+ * Builder of OmMultipartAbortInfo.
+ */
+ public static class Builder {
+ private String multipartKey;
+ private String multipartOpenKey;
+ private OmMultipartKeyInfo omMultipartKeyInfo;
+ private BucketLayout bucketLayout;
+
+ public Builder setMultipartKey(String mpuKey) {
+ this.multipartKey = mpuKey;
+ return this;
+ }
+
+ public Builder setMultipartOpenKey(String mpuOpenKey) {
+ this.multipartOpenKey = mpuOpenKey;
+ return this;
+ }
+
+ public Builder setMultipartKeyInfo(OmMultipartKeyInfo multipartKeyInfo) {
+ this.omMultipartKeyInfo = multipartKeyInfo;
+ return this;
+ }
+
+ public Builder setBucketLayout(BucketLayout layout) {
+ this.bucketLayout = layout;
+ return this;
+ }
+
+ public OmMultipartAbortInfo build() {
+ return new OmMultipartAbortInfo(multipartKey,
+ multipartOpenKey, omMultipartKeyInfo, bucketLayout);
+ }
+ }
+
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ final OmMultipartAbortInfo that = (OmMultipartAbortInfo) other;
+
+ return this.multipartKey.equals(that.multipartKey) &&
+ this.multipartOpenKey.equals(that.multipartOpenKey) &&
+ this.bucketLayout.equals(that.bucketLayout) &&
+ this.omMultipartKeyInfo.equals(that.omMultipartKeyInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(multipartKey, multipartOpenKey,
+ bucketLayout, omMultipartKeyInfo);
+ }
+
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
index 9bdc90c798..70da36dad5 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUpload.java
@@ -130,4 +130,18 @@ public class OmMultipartUpload {
public ReplicationConfig getReplicationConfig() {
return replicationConfig;
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ return other instanceof OmMultipartUpload && uploadId.equals(
+ ((OmMultipartUpload)other).getUploadId());
+ }
+
+ @Override
+ public int hashCode() {
+ return uploadId.hashCode();
+ }
}
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 01d384d5c2..77d2f23715 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -142,6 +142,8 @@ enum Type {
SetSafeMode = 124;
PrintCompactionLogDag = 125;
ListKeysLight = 126;
+ AbortExpiredMultiPartUploads = 127;
+
}
enum SafeMode {
@@ -273,6 +275,8 @@ message OMRequest {
optional CancelSnapshotDiffRequest CancelSnapshotDiffRequest =
123;
optional SetSafeModeRequest SetSafeModeRequest =
124;
optional PrintCompactionLogDagRequest PrintCompactionLogDagRequest =
125;
+
+ optional MultipartUploadsExpiredAbortRequest
multipartUploadsExpiredAbortRequest = 126;
}
message OMResponse {
@@ -390,6 +394,7 @@ message OMResponse {
optional SetSafeModeResponse SetSafeModeResponse =
124;
optional PrintCompactionLogDagResponse PrintCompactionLogDagResponse =
125;
optional ListKeysLightResponse listKeysLightResponse =
126;
+ optional MultipartUploadsExpiredAbortResponse
multipartUploadsExpiredAbortResponse = 127;
}
enum Status {
@@ -1577,6 +1582,24 @@ message MultipartUploadAbortRequest {
message MultipartUploadAbortResponse {
}
+
+message MultipartUploadsExpiredAbortRequest {
+ repeated ExpiredMultipartUploadsBucket expiredMultipartUploadsPerBucket = 1;
+}
+
+message ExpiredMultipartUploadsBucket {
+ required string volumeName = 1;
+ required string bucketName = 2;
+ repeated ExpiredMultipartUploadInfo multipartUploads = 3;
+}
+
+message ExpiredMultipartUploadInfo {
+ required string name = 1;
+}
+
+message MultipartUploadsExpiredAbortResponse {
+}
+
message MultipartUploadListPartsRequest {
required string volume = 1;
required string bucket = 2;
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 66baf41184..4dfb135dd7 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.hdds.utils.TransactionInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -279,6 +280,23 @@ public interface OMMetadataManager extends
DBStoreHAManager {
ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;
+ /**
+ * Returns the names of up to {@code count} MPU key whose age is greater
+ * than or equal to {@code expireThreshold}.
+ *
+ * @param expireThreshold The threshold of MPU key expiration age.
+ * @param maxParts The threshold of number of MPU parts to return.
+ * This is a soft limit, which means if the last MPU has
+ * parts that exceed this limit, it will be included
+ * in the returned expired MPUs. This is to handle
+ * situation where MPU that has more parts than
+ * maxParts never get deleted.
+ * @return a {@link List} of {@link ExpiredMultipartUploadsBucket}, the
+ * expired multipart uploads, grouped by volume and bucket.
+ */
+ List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+ Duration expireThreshold, int maxParts) throws IOException;
+
/**
* Retrieve RWLock for the table.
* @param tableName table name.
@@ -350,7 +368,8 @@ public interface OMMetadataManager extends DBStoreHAManager
{
Table<String, OmPrefixInfo> getPrefixTable();
/**
- * Returns the DB key name of a multipart upload key in OM metadata store.
+ * Returns the DB key name of a multipart upload key in OM metadata store
+ * for LEGACY/OBS buckets.
*
* @param volume - volume name
* @param bucket - bucket name
@@ -512,7 +531,8 @@ public interface OMMetadataManager extends DBStoreHAManager
{
String getRenameKey(String volume, String bucket, long objectID);
/**
- * Returns the DB key name of a multipart upload key in OM metadata store.
+ * Returns the DB key name of a multipart upload key in OM metadata store
+ * for FSO-enabled buckets.
*
* @param volumeId - ID of the volume
* @param bucketId - ID of the bucket
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 3b96267c97..ed37313f1c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -99,7 +99,9 @@ public enum OMAction implements AuditAction {
LIST_SNAPSHOT,
DELETE_SNAPSHOT,
SNAPSHOT_MOVE_DELETED_KEYS,
- SET_TIMES;
+ SET_TIMES,
+
+ ABORT_EXPIRED_MULTIPART_UPLOAD;
@Override
public String getAction() {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 6ba98cbde0..4f7fa6ad2f 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import java.io.IOException;
import java.time.Duration;
@@ -147,6 +148,25 @@ public interface KeyManager extends OzoneManagerFS,
IOzoneAcl {
ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;
+ /**
+ * Returns the MPU infos of up to {@code count} whose age is greater
+ * than or equal to {@code expireThreshold}.
+ *
+ * @param expireThreshold The threshold of expired multipart uploads
+ * to return.
+ * @param maxParts The threshold of number of MPU parts to return.
+ * This is a soft limit, which means if the last MPU has
+ * parts that exceed this limit, it will be included
+ * in the returned expired MPUs. This is to handle
+ * situation where MPU that has more parts than
+ * maxParts never get deleted.
+ * @return a {@link List} of the expired
+ * {@link ExpiredMultipartUploadsBucket}, the expired multipart
+ * uploads, grouped by volume and bucket.
+ */
+ List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+ Duration expireThreshold, int maxParts) throws IOException;
+
/**
* Returns the metadataManager.
* @return OMMetadataManager.
@@ -246,6 +266,12 @@ public interface KeyManager extends OzoneManagerFS,
IOzoneAcl {
*/
BackgroundService getOpenKeyCleanupService();
+ /**
+ * Returns the instance of Multipart Upload Cleanup Service.
+ * @return Background service.
+ */
+ BackgroundService getMultipartUploadCleanupService();
+
/**
* Returns the instance of Snapshot SST Filtering service.
* @return Background service.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 33ea1f779b..a230882112 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -17,8 +17,6 @@
package org.apache.hadoop.ozone.om;
import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
@@ -82,10 +80,13 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
+import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService;
import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -122,6 +123,10 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERI
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
@@ -175,6 +180,7 @@ public class KeyManagerImpl implements KeyManager {
private final OMPerformanceMetrics metrics;
private BackgroundService openKeyCleanupService;
+ private BackgroundService multipartUploadCleanupService;
public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -293,6 +299,21 @@ public class KeyManagerImpl implements KeyManager {
LOG.error("Error starting Snapshot Deleting Service", e);
}
}
+
+ if (multipartUploadCleanupService == null) {
+ long serviceInterval = configuration.getTimeDuration(
+ OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL,
+ OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = configuration.getTimeDuration(
+ OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT,
+ OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ multipartUploadCleanupService = new MultipartUploadCleanupService(
+ serviceInterval, TimeUnit.MILLISECONDS, serviceTimeout,
+ ozoneManager, configuration);
+ multipartUploadCleanupService.start();
+ }
}
KeyProviderCryptoExtension getKMSProvider() {
@@ -321,6 +342,10 @@ public class KeyManagerImpl implements KeyManager {
snapshotDeletingService.shutdown();
snapshotDeletingService = null;
}
+ if (multipartUploadCleanupService != null) {
+ multipartUploadCleanupService.shutdown();
+ multipartUploadCleanupService = null;
+ }
}
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -617,6 +642,14 @@ public class KeyManagerImpl implements KeyManager {
bucketLayout);
}
+ @Override
+ public List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+ Duration expireThreshold, int maxParts)
+ throws IOException {
+ return metadataManager.getExpiredMultipartUploads(expireThreshold,
+ maxParts);
+ }
+
@Override
public OMMetadataManager getMetadataManager() {
return metadataManager;
@@ -632,10 +665,16 @@ public class KeyManagerImpl implements KeyManager {
return dirDeletingService;
}
+ @Override
public BackgroundService getOpenKeyCleanupService() {
return openKeyCleanupService;
}
+ @Override
+ public BackgroundService getMultipartUploadCleanupService() {
+ return multipartUploadCleanupService;
+ }
+
public SstFilteringService getSnapshotSstFilteringService() {
return snapshotSstFilteringService;
}
@@ -843,18 +882,8 @@ public class KeyManagerImpl implements KeyManager {
private String getMultipartOpenKeyFSO(String volumeName, String bucketName,
String keyName, String uploadID) throws IOException {
OMMetadataManager metaMgr = metadataManager;
- String fileName = OzoneFSUtils.getFileName(keyName);
- Iterator<Path> pathComponents = Paths.get(keyName).iterator();
- final long volumeId = metaMgr.getVolumeId(volumeName);
- final long bucketId = metaMgr.getBucketId(volumeName, bucketName);
- long parentID =
- OMFileRequest.getParentID(volumeId, bucketId, pathComponents,
- keyName, metaMgr);
-
- String multipartKey = metaMgr.getMultipartKey(volumeId, bucketId,
- parentID, fileName, uploadID);
-
- return multipartKey;
+ return OMMultipartUploadUtils.getMultipartOpenKeyFSO(
+ volumeName, bucketName, keyName, uploadID, metaMgr);
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 97445c0ec2..ccb0fae6fb 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -86,6 +86,11 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong numOpenKeysCleaned;
private @Metric MutableCounterLong numOpenKeysHSyncCleaned;
+ private @Metric MutableCounterLong numExpiredMPUAbortRequests;
+ private @Metric MutableCounterLong numExpiredMPUSubmittedForAbort;
+ private @Metric MutableCounterLong numExpiredMPUAborted;
+ private @Metric MutableCounterLong numExpiredMPUPartsAborted;
+
private @Metric MutableCounterLong numAddAcl;
private @Metric MutableCounterLong numSetAcl;
private @Metric MutableCounterLong numGetAcl;
@@ -124,6 +129,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong numListMultipartUploadParts;
private @Metric MutableCounterLong numListMultipartUploadPartFails;
private @Metric MutableCounterLong numOpenKeyDeleteRequestFails;
+ private @Metric MutableCounterLong numExpiredMPUAbortRequestFails;
private @Metric MutableCounterLong numSnapshotCreateFails;
private @Metric MutableCounterLong numSnapshotDeleteFails;
private @Metric MutableCounterLong numSnapshotListFails;
@@ -813,6 +819,26 @@ public class OMMetrics implements OmMetadataReaderMetrics {
numOpenKeyDeleteRequestFails.incr();
}
+ public void incNumExpiredMPUAbortRequests() {
+ numExpiredMPUAbortRequests.incr();
+ }
+
+ public void incNumExpiredMPUSubmittedForAbort(long amount) {
+ numExpiredMPUSubmittedForAbort.incr(amount);
+ }
+
+ public void incNumExpiredMPUAborted() {
+ numExpiredMPUAborted.incr();
+ }
+
+ public void incNumExpiredMPUPartsAborted(long amount) {
+ numExpiredMPUPartsAborted.incr(amount);
+ }
+
+ public void incNumExpiredMpuAbortRequestFails() {
+ numExpiredMPUAbortRequestFails.incr();
+ }
+
public void incNumAddAcl() {
numAddAcl.incr();
}
@@ -1130,6 +1156,31 @@ public class OMMetrics implements
OmMetadataReaderMetrics {
return numOpenKeyDeleteRequestFails.value();
}
+ @VisibleForTesting
+ public long getNumExpiredMPUAbortRequests() {
+ return numExpiredMPUAbortRequests.value();
+ }
+
+ @VisibleForTesting
+ public long getNumExpiredMPUSubmittedForAbort() {
+ return numExpiredMPUSubmittedForAbort.value();
+ }
+
+ @VisibleForTesting
+ public long getNumExpiredMPUAborted() {
+ return numExpiredMPUAborted.value();
+ }
+
+ @VisibleForTesting
+ public long getNumExpiredMPUAbortRequestFails() {
+ return numExpiredMPUAbortRequestFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumExpiredMPUPartsAborted() {
+ return numExpiredMPUPartsAborted.value();
+ }
+
public long getNumAddAcl() {
return numAddAcl.value();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 6f0fafcff2..e6b027427d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -86,6 +87,8 @@ import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTrans
import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.storage.proto
.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
@@ -948,6 +951,7 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
String volumeId = String.valueOf(getVolumeId(
omBucketInfo.getVolumeName()));
String bucketId = String.valueOf(omBucketInfo.getObjectID());
+
BucketLayout bucketLayout = omBucketInfo.getBucketLayout();
// keyPrefix is different in case of fileTable and keyTable.
@@ -1813,6 +1817,53 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
return expiredKeys;
}
+ @Override
+ public List<ExpiredMultipartUploadsBucket> getExpiredMultipartUploads(
+ Duration expireThreshold, int maxParts) throws IOException {
+ Map<String, ExpiredMultipartUploadsBucket.Builder> expiredMPUs =
+ new HashMap<>();
+
+ try (TableIterator<String, ? extends KeyValue<String, OmMultipartKeyInfo>>
+ mpuInfoTableIterator = getMultipartInfoTable().iterator()) {
+
+ final long expiredCreationTimestamp =
+ Instant.now().minus(expireThreshold).toEpochMilli();
+
+ ExpiredMultipartUploadInfo.Builder builder =
+ ExpiredMultipartUploadInfo.newBuilder();
+
+ int numParts = 0;
+ while (numParts < maxParts &&
+ mpuInfoTableIterator.hasNext()) {
+ KeyValue<String, OmMultipartKeyInfo> mpuInfoValue =
+ mpuInfoTableIterator.next();
+ String dbMultipartInfoKey = mpuInfoValue.getKey();
+ OmMultipartKeyInfo omMultipartKeyInfo = mpuInfoValue.getValue();
+
+ if (omMultipartKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
+ OmMultipartUpload expiredMultipartUpload =
+ OmMultipartUpload.from(dbMultipartInfoKey);
+ final String volume = expiredMultipartUpload.getVolumeName();
+ final String bucket = expiredMultipartUpload.getBucketName();
+ final String mapKey = volume + OM_KEY_PREFIX + bucket;
+ expiredMPUs.computeIfAbsent(mapKey, k ->
+ ExpiredMultipartUploadsBucket.newBuilder()
+ .setVolumeName(volume)
+ .setBucketName(bucket));
+ expiredMPUs.get(mapKey)
+ .addMultipartUploads(builder.setName(dbMultipartInfoKey)
+ .build());
+ numParts += omMultipartKeyInfo.getPartKeyInfoMap().size();
+ }
+
+ }
+ }
+
+ return expiredMPUs.values().stream().map(
+ ExpiredMultipartUploadsBucket.Builder::build)
+ .collect(Collectors.toList());
+ }
+
@Override
public <KEY, VALUE> long countRowsInTable(Table<KEY, VALUE> table)
throws IOException {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index c07969e6fc..d6ebb44da0 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -60,6 +60,7 @@ import
org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequestWithFSO;
import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixAddAclRequest;
import
org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixRemoveAclRequest;
import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
+import
org.apache.hadoop.ozone.om.request.s3.multipart.S3ExpiredMultipartUploadsAbortRequest;
import org.apache.hadoop.ozone.om.request.s3.security.OMSetSecretRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSecretRequest;
@@ -322,6 +323,8 @@ public final class OzoneManagerRatisUtils {
break;
case EchoRPC:
return new OMEchoRPCWriteRequest(omRequest);
+ case AbortExpiredMultiPartUploads:
+ return new S3ExpiredMultipartUploadsAbortRequest(omRequest);
default:
throw new OMException("Unrecognized write command type request "
+ cmdType, OMException.ResultCodes.INVALID_REQUEST);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java
new file mode 100644
index 0000000000..0898089576
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import
org.apache.hadoop.ozone.om.response.s3.multipart.S3ExpiredMultipartUploadsAbortResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles requests to move both MPU open keys from the open key/file table and
+ * MPU part keys to delete table. Modifies the open key/file table cache only,
+ * and no underlying databases.
+ * The delete table cache does not need to be modified since it is not used
+ * for client response validation.
+ */
+public class S3ExpiredMultipartUploadsAbortRequest extends OMKeyRequest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ExpiredMultipartUploadsAbortRequest.class);
+
+ public S3ExpiredMultipartUploadsAbortRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+ long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+ OMMetrics omMetrics = ozoneManager.getMetrics();
+ omMetrics.incNumExpiredMPUAbortRequests();
+
+ OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest
+ multipartUploadsExpiredAbortRequest = getOmRequest()
+ .getMultipartUploadsExpiredAbortRequest();
+
+ List<ExpiredMultipartUploadsBucket> submittedExpiredMPUsPerBucket =
+ multipartUploadsExpiredAbortRequest
+ .getExpiredMultipartUploadsPerBucketList();
+
+ long numSubmittedMPUs = 0;
+ for (ExpiredMultipartUploadsBucket mpuByBucket:
+ submittedExpiredMPUsPerBucket) {
+ numSubmittedMPUs += mpuByBucket.getMultipartUploadsCount();
+ }
+
+ LOG.debug("{} expired multi-uploads submitted for deletion.",
+ numSubmittedMPUs);
+ omMetrics.incNumExpiredMPUSubmittedForAbort(numSubmittedMPUs);
+
+ OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
+ OmResponseUtil.getOMResponseBuilder(getOmRequest());
+
+ IOException exception = null;
+ OMClientResponse omClientResponse = null;
+ Result result = null;
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>>
+ abortedMultipartUploads = new HashMap<>();
+
+ try {
+ for (ExpiredMultipartUploadsBucket mpuByBucket:
+ submittedExpiredMPUsPerBucket) {
+ // For each bucket where the MPU will be aborted from,
+ // get its bucket lock and update the cache accordingly.
+ updateTableCache(ozoneManager, trxnLogIndex, mpuByBucket,
+ abortedMultipartUploads);
+ }
+
+ omClientResponse = new S3ExpiredMultipartUploadsAbortResponse(
+ omResponse.build(), abortedMultipartUploads,
+ ozoneManager.isRatisEnabled());
+
+ result = Result.SUCCESS;
+ } catch (IOException ex) {
+ result = Result.FAILURE;
+ exception = ex;
+ omClientResponse =
+ new S3ExpiredMultipartUploadsAbortResponse(createErrorOMResponse(
+ omResponse, exception));
+ } finally {
+ addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+ omDoubleBufferHelper);
+ }
+
+ // Only successfully aborted MPUs are included in the audit.
+ auditAbortedMPUs(ozoneManager, abortedMultipartUploads);
+
+ processResults(omMetrics, numSubmittedMPUs,
+ abortedMultipartUploads.size(),
+ multipartUploadsExpiredAbortRequest, result);
+
+ return omClientResponse;
+
+ }
+
+ private void auditAbortedMPUs(OzoneManager ozoneManager,
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads) {
+ for (Map.Entry<OmBucketInfo, List<OmMultipartAbortInfo>> entry :
+ abortedMultipartUploads.entrySet()) {
+ KeyArgs.Builder keyArgsAuditBuilder = KeyArgs.newBuilder()
+ .setVolumeName(entry.getKey().getVolumeName())
+ .setBucketName(entry.getKey().getBucketName());
+
+ for (OmMultipartAbortInfo abortInfo: entry.getValue()) {
+ // See RpcClient#abortMultipartUpload
+ KeyArgs keyArgsForAudit = keyArgsAuditBuilder
+ .setKeyName(abortInfo.getMultipartKey())
+ .setMultipartUploadID(abortInfo.getOmMultipartKeyInfo()
+ .getUploadID())
+ .build();
+ Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgsForAudit);
+ auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+ OMAction.ABORT_EXPIRED_MULTIPART_UPLOAD, auditMap,
+ null, getOmRequest().getUserInfo()));
+ }
+ }
+ }
+
+ private void processResults(OMMetrics omMetrics,
+ long numSubmittedExpiredMPUs, long numAbortedMPUs,
+ OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest request,
+ Result result) {
+
+ switch (result) {
+ case SUCCESS:
+ LOG.debug("Aborted {} expired MPUs out of {} submitted MPus.",
+ numAbortedMPUs, numSubmittedExpiredMPUs);
+ break;
+ case FAILURE:
+ omMetrics.incNumExpiredMpuAbortRequestFails();
+ LOG.error("Failure occurred while trying to abort {} submitted " +
+ "expired MPUs.", numAbortedMPUs);
+ break;
+ default:
+ LOG.error("Unrecognized result for " +
+ "MultipartUploadsExpiredAbortRequest: {}", request);
+ }
+
+ }
+
+ private void updateTableCache(OzoneManager ozoneManager,
+ long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket,
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads)
+ throws IOException {
+
+ boolean acquiredLock = false;
+ String volumeName = mpusPerBucket.getVolumeName();
+ String bucketName = mpusPerBucket.getBucketName();
+ OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+ OmBucketInfo omBucketInfo = null;
+ BucketLayout bucketLayout = null;
+ try {
+ acquiredLock = omMetadataManager.getLock()
+ .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName);
+
+ omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
+
+ if (omBucketInfo == null) {
+ LOG.warn("Volume: {}, Bucket: {} does not exist, skipping deletion.",
+ volumeName, bucketName);
+ return;
+ }
+
+ // Do not use getBucketLayout since the expired MPUs request might
+ // contains MPUs from all kind of buckets
+ bucketLayout = omBucketInfo.getBucketLayout();
+
+ for (ExpiredMultipartUploadInfo expiredMPU:
+ mpusPerBucket.getMultipartUploadsList()) {
+ String expiredMPUKeyName = expiredMPU.getName();
+
+ // If the MPU key is no longer present in the table, MPU
+ // might have been completed / aborted, and should not be
+ // aborted.
+ OmMultipartKeyInfo omMultipartKeyInfo =
+ omMetadataManager.getMultipartInfoTable().get(expiredMPUKeyName);
+
+ if (omMultipartKeyInfo != null) {
+ if (ozoneManager.isRatisEnabled() &&
+ trxnLogIndex < omMultipartKeyInfo.getUpdateID()) {
+ LOG.warn("Transaction log index {} is smaller than " +
+ "the current updateID {} of MPU key {}, skipping
deletion.",
+ trxnLogIndex, omMultipartKeyInfo.getUpdateID(),
+ expiredMPUKeyName);
+ continue;
+ }
+
+ // Set the UpdateID to current transactionLogIndex
+ omMultipartKeyInfo.setUpdateID(trxnLogIndex,
+ ozoneManager.isRatisEnabled());
+
+ // Parse the multipart upload components (e.g. volume, bucket, key)
+ // from the multipartInfoTable db key
+
+ OmMultipartUpload multipartUpload;
+ try {
+ multipartUpload =
+ OmMultipartUpload.from(expiredMPUKeyName);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Aborting expired MPU failed: MPU key: " +
+ expiredMPUKeyName + " has invalid structure, " +
+ "skipping this MPU.");
+ continue;
+ }
+
+ String multipartOpenKey;
+ try {
+ multipartOpenKey =
+ OMMultipartUploadUtils
+ .getMultipartOpenKey(multipartUpload.getVolumeName(),
+ multipartUpload.getBucketName(),
+ multipartUpload.getKeyName(),
+ multipartUpload.getUploadId(), omMetadataManager,
+ bucketLayout);
+ } catch (OMException ome) {
+ LOG.warn("Aborting expired MPU Failed: volume: " +
+ multipartUpload.getVolumeName() + ", bucket: " +
+ multipartUpload.getBucketName() + ", key: " +
+ multipartUpload.getKeyName() + ". Cannot parse the open key" +
+ "for this MPU, skipping this MPU.");
+ continue;
+ }
+
+ // When abort uploaded key, we need to subtract the PartKey length
+ // from the volume usedBytes.
+ long quotaReleased = 0;
+ int keyFactor = omMultipartKeyInfo.getReplicationConfig()
+ .getRequiredNodes();
+ for (PartKeyInfo iterPartKeyInfo : omMultipartKeyInfo.
+ getPartKeyInfoMap()) {
+ quotaReleased +=
+ iterPartKeyInfo.getPartKeyInfo().getDataSize() * keyFactor;
+ }
+ omBucketInfo.incrUsedBytes(-quotaReleased);
+
+ OmMultipartAbortInfo omMultipartAbortInfo =
+ new OmMultipartAbortInfo.Builder()
+ .setMultipartKey(expiredMPUKeyName)
+ .setMultipartOpenKey(multipartOpenKey)
+ .setMultipartKeyInfo(omMultipartKeyInfo)
+ .setBucketLayout(omBucketInfo.getBucketLayout())
+ .build();
+
+ abortedMultipartUploads.computeIfAbsent(omBucketInfo,
+ k -> new ArrayList<>()).add(omMultipartAbortInfo);
+
+ // Update cache of openKeyTable and multipartInfo table.
+ // No need to add the cache entries to delete table, as the entries
+ // in delete table are not used by any read/write operations.
+
+ // Unlike normal MPU abort request where the MPU open keys needs
+ // to exist. For OpenKeyCleanupService run prior to
+ // HDDS-9017, these MPU open keys might already be deleted,
+ // causing "orphan" MPU keys (MPU entry exist in
+ // multipartInfoTable, but not in openKeyTable).
+ // We can skip this existence check and just delete the
+ // multipartInfoTable. The existence check can be re-added
+ // once there are no "orphan" keys
+ if (omMetadataManager.getOpenKeyTable(bucketLayout)
+ .isExist(multipartOpenKey)) {
+ omMetadataManager.getOpenKeyTable(bucketLayout)
+ .addCacheEntry(new CacheKey<>(multipartOpenKey),
+ new CacheValue<>(Optional.absent(), trxnLogIndex));
+ }
+ omMetadataManager.getMultipartInfoTable()
+ .addCacheEntry(new CacheKey<>(expiredMPUKeyName),
+ new CacheValue<>(Optional.absent(), trxnLogIndex));
+
+ long numParts = omMultipartKeyInfo.getPartKeyInfoMap().size();
+ ozoneManager.getMetrics().incNumExpiredMPUAborted();
+ ozoneManager.getMetrics().incNumExpiredMPUPartsAborted(numParts);
+ LOG.debug("Expired MPU {} aborted containing {} parts.",
+ expiredMPUKeyName, numParts);
+ } else {
+ LOG.debug("MPU key {} was not aborted, as it was not " +
+ "found in the multipart info table", expiredMPUKeyName);
+ }
+ }
+ } finally {
+ if (acquiredLock) {
+ omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+ bucketName);
+ }
+ }
+
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
index d8a87e24f2..df2a31825e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase;
@@ -250,10 +251,9 @@ public class S3MultipartUploadAbortRequest extends
OMKeyRequest {
protected String getMultipartOpenKey(String multipartUploadID,
String volumeName, String bucketName, String keyName,
OMMetadataManager omMetadataManager) throws IOException {
-
- String multipartKey = omMetadataManager.getMultipartKey(
- volumeName, bucketName, keyName, multipartUploadID);
- return multipartKey;
+ return OMMultipartUploadUtils.getMultipartOpenKey(
+ volumeName, bucketName, keyName, multipartUploadID, omMetadataManager,
+ getBucketLayout());
}
@RequestFeatureValidator(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
index f15b5dabbb..fbbb994bea 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java
@@ -18,13 +18,10 @@
package org.apache.hadoop.ozone.om.request.s3.multipart;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
-import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import
org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadAbortResponseWithFSO;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortResponse;
@@ -32,9 +29,6 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Iterator;
/**
* Handles Abort of multipart upload request.
@@ -68,23 +62,4 @@ public class S3MultipartUploadAbortRequestWithFSO
omBucketInfo.copyObject(), getBucketLayout());
return omClientResp;
}
-
- @Override
- protected String getMultipartOpenKey(String multipartUploadID,
- String volumeName, String bucketName, String keyName,
- OMMetadataManager omMetadataManager) throws IOException {
-
- String fileName = OzoneFSUtils.getFileName(keyName);
- Iterator<Path> pathComponents = Paths.get(keyName).iterator();
- final long volumeId = omMetadataManager.getVolumeId(volumeName);
- final long bucketId = omMetadataManager.getBucketId(volumeName,
- bucketName);
- long parentID = OMFileRequest.getParentID(volumeId, bucketId,
- pathComponents, keyName, omMetadataManager);
-
- String multipartKey = omMetadataManager.getMultipartKey(volumeId, bucketId,
- parentID, fileName, multipartUploadID);
-
- return multipartKey;
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
index 5047949b6d..118eebd620 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/OMMultipartUploadUtils.java
@@ -20,8 +20,16 @@ package org.apache.hadoop.ozone.om.request.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.UniqueId;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
import java.util.UUID;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
@@ -79,6 +87,47 @@ public final class OMMultipartUploadUtils {
return uploadId;
}
+ /**
+ * Get the multipart open key based on the bucket layout.
+ * @throws IOException
+ */
+ public static String getMultipartOpenKey(String volumeName,
+ String bucketName, String keyName, String multipartUploadId,
+ OMMetadataManager omMetadataManager, BucketLayout bucketLayout)
+ throws IOException {
+ if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ return getMultipartOpenKeyFSO(volumeName, bucketName,
+ keyName, multipartUploadId, omMetadataManager);
+ } else {
+ return getMultipartOpenKey(volumeName, bucketName,
+ keyName, multipartUploadId, omMetadataManager);
+ }
+ }
+
+ public static String getMultipartOpenKey(String volumeName,
+ String bucketName, String keyName, String multipartUploadId,
+ OMMetadataManager omMetadataManager) {
+ return omMetadataManager.getMultipartKey(
+ volumeName, bucketName, keyName, multipartUploadId);
+ }
+
+ public static String getMultipartOpenKeyFSO(String volumeName,
+ String bucketName, String keyName, String uploadID,
+ OMMetadataManager metaMgr) throws IOException {
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ Iterator<Path> pathComponents = Paths.get(keyName).iterator();
+ final long volumeId = metaMgr.getVolumeId(volumeName);
+ final long bucketId = metaMgr.getBucketId(volumeName, bucketName);
+ long parentID =
+ OMFileRequest.getParentID(volumeId, bucketId, pathComponents,
+ keyName, metaMgr);
+
+ String multipartKey = metaMgr.getMultipartKey(volumeId, bucketId,
+ parentID, fileName, uploadID);
+
+ return multipartKey;
+ }
+
/**
* Check whether key's isMultipartKey flag is set.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java
new file mode 100644
index 0000000000..42267b644c
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+
+/**
+ * Base class for responses that need to move multipart info part keys to the
+ * deleted table.
+ */
+@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, OPEN_FILE_TABLE,
+ DELETED_TABLE, MULTIPARTINFO_TABLE, BUCKET_TABLE})
+public abstract class AbstractS3MultipartAbortResponse extends OmKeyResponse {
+
+ private boolean isRatisEnabled;
+
+ public AbstractS3MultipartAbortResponse(
+ @Nonnull OMResponse omResponse, boolean isRatisEnabled) {
+ super(omResponse);
+ this.isRatisEnabled = isRatisEnabled;
+ }
+
+ public AbstractS3MultipartAbortResponse(
+ @Nonnull OMResponse omResponse, boolean isRatisEnabled,
+ BucketLayout bucketLayout) {
+ super(omResponse, bucketLayout);
+ this.isRatisEnabled = isRatisEnabled;
+ }
+
+ /**
+ * For when the request is not successful.
+ * For a successful request, the other constructor should be used.
+ */
+ public AbstractS3MultipartAbortResponse(@Nonnull OMResponse omResponse,
+ BucketLayout bucketLayout) {
+ super(omResponse, bucketLayout);
+ checkStatusNotOK();
+ }
+
+ /**
+ * Adds the operation of aborting a list of multipart uploads under the
+ * same bucket.
+ * @param omMetadataManager
+ * @param batchOperation
+ * @param omBucketInfo
+ * @param multipartAbortInfo
+ * @throws IOException
+ */
+ protected void addAbortToBatch(
+ OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation,
+ OmBucketInfo omBucketInfo,
+ List<OmMultipartAbortInfo> multipartAbortInfo
+ ) throws IOException {
+ for (OmMultipartAbortInfo abortInfo: multipartAbortInfo) {
+ // Delete from openKey table and multipart info table.
+ omMetadataManager.getOpenKeyTable(abortInfo.getBucketLayout())
+ .deleteWithBatch(batchOperation, abortInfo.getMultipartOpenKey());
+ omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
+ abortInfo.getMultipartKey());
+
+ OmMultipartKeyInfo omMultipartKeyInfo = abortInfo
+ .getOmMultipartKeyInfo();
+ // Move all the parts to delete table
+ for (PartKeyInfo partKeyInfo: omMultipartKeyInfo.getPartKeyInfoMap()) {
+ OmKeyInfo currentKeyPartInfo =
+ OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
+ // TODO: Similar to open key deletion response, we can check if the
+ // MPU part actually contains blocks, and only move the to
+ // deletedTable if it does.
+
+ RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+ currentKeyPartInfo, omMultipartKeyInfo.getUpdateID(),
+ isRatisEnabled);
+
+ // multi-part key format is volumeName/bucketName/keyName/uploadId
+ String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+ currentKeyPartInfo.getObjectID(), abortInfo.getMultipartKey());
+
+ omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+ deleteKey, repeatedOmKeyInfo);
+ }
+ }
+ // update bucket usedBytes.
+ omMetadataManager.getBucketTable().putWithBatch(batchOperation,
+ omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+ omBucketInfo.getBucketName()), omBucketInfo);
+ }
+
+ /**
+ * Adds the operation of aborting a multipart upload to the batch operation.
+ * Both LEGACY/OBS and FSO have similar abort logic. The only difference
+ * is the multipartOpenKey used in the openKeyTable and openFileTable.
+ */
+ protected void addAbortToBatch(
+ OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation,
+ String multipartKey,
+ String multipartOpenKey,
+ OmMultipartKeyInfo omMultipartKeyInfo,
+ OmBucketInfo omBucketInfo,
+ BucketLayout bucketLayout) throws IOException {
+ OmMultipartAbortInfo omMultipartAbortInfo =
+ new OmMultipartAbortInfo.Builder()
+ .setMultipartKey(multipartKey)
+ .setMultipartOpenKey(multipartOpenKey)
+ .setMultipartKeyInfo(omMultipartKeyInfo)
+ .setBucketLayout(bucketLayout)
+ .build();
+ addAbortToBatch(omMetadataManager, batchOperation, omBucketInfo,
+ Collections.singletonList(omMultipartAbortInfo));
+ }
+
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java
new file mode 100644
index 0000000000..363074b596
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.MULTIPARTINFO_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+
+/**
+ * Handles response to abort expired MPUs.
+ */
+@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, OPEN_FILE_TABLE,
+ DELETED_TABLE, MULTIPARTINFO_TABLE, BUCKET_TABLE})
+public class S3ExpiredMultipartUploadsAbortResponse extends
+ AbstractS3MultipartAbortResponse {
+
+ private Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToDelete;
+
+ public S3ExpiredMultipartUploadsAbortResponse(
+ @Nonnull OMResponse omResponse,
+ @Nonnull Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToDelete,
+ boolean isRatisEnabled) {
+ super(omResponse, isRatisEnabled);
+ this.mpusToDelete = mpusToDelete;
+ }
+
+ /**
+ * For when the request is not successful.
+ * For a successful request, the other constructor should be used.
+ */
+ public S3ExpiredMultipartUploadsAbortResponse(
+ @Nonnull OMResponse omResponse) {
+ // Set BucketLayout.DEFAULT just as a placeholder
+ // OmMultipartAbortInfo already contains the bucket layout info
+ super(omResponse, BucketLayout.DEFAULT);
+ }
+
+ @Override
+ protected void addToDBBatch(
+ OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation) throws IOException {
+ for (Map.Entry<OmBucketInfo, List<OmMultipartAbortInfo>> mpuInfoPair :
+ mpusToDelete.entrySet()) {
+ addAbortToBatch(omMetadataManager, batchOperation,
+ mpuInfoPair.getKey(), mpuInfoPair.getValue());
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
index 567b28da02..f3e2054fad 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
@@ -18,19 +18,13 @@
package org.apache.hadoop.ozone.om.response.s3.multipart;
-import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .PartKeyInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import java.io.IOException;
@@ -46,23 +40,22 @@ import static
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
*/
@CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, DELETED_TABLE,
MULTIPARTINFO_TABLE, BUCKET_TABLE})
-public class S3MultipartUploadAbortResponse extends OmKeyResponse {
+public class S3MultipartUploadAbortResponse extends
+ AbstractS3MultipartAbortResponse {
private String multipartKey;
private String multipartOpenKey;
private OmMultipartKeyInfo omMultipartKeyInfo;
- private boolean isRatisEnabled;
private OmBucketInfo omBucketInfo;
public S3MultipartUploadAbortResponse(@Nonnull OMResponse omResponse,
String multipartKey, String multipartOpenKey,
@Nonnull OmMultipartKeyInfo omMultipartKeyInfo, boolean isRatisEnabled,
@Nonnull OmBucketInfo omBucketInfo, @Nonnull BucketLayout bucketLayout) {
- super(omResponse, bucketLayout);
+ super(omResponse, isRatisEnabled, bucketLayout);
this.multipartKey = multipartKey;
this.multipartOpenKey = multipartOpenKey;
this.omMultipartKeyInfo = omMultipartKeyInfo;
- this.isRatisEnabled = isRatisEnabled;
this.omBucketInfo = omBucketInfo;
}
@@ -79,32 +72,8 @@ public class S3MultipartUploadAbortResponse extends
OmKeyResponse {
@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
-
- // Delete from openKey table and multipart info table.
- omMetadataManager.getOpenKeyTable(getBucketLayout())
- .deleteWithBatch(batchOperation, multipartOpenKey);
- omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
- multipartKey);
-
- // Move all the parts to delete table
- for (PartKeyInfo partKeyInfo: omMultipartKeyInfo.getPartKeyInfoMap()) {
- OmKeyInfo currentKeyPartInfo =
- OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
-
- RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
- currentKeyPartInfo, omMultipartKeyInfo.getUpdateID(),
- isRatisEnabled);
- // multi-part key format is volumeName/bucketName/keyName/uploadId
- String deleteKey = omMetadataManager.getOzoneDeletePathKey(
- currentKeyPartInfo.getObjectID(), multipartKey);
-
- omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
- deleteKey, repeatedOmKeyInfo);
- }
-
- // update bucket usedBytes.
- omMetadataManager.getBucketTable().putWithBatch(batchOperation,
- omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
- omBucketInfo.getBucketName()), omBucketInfo);
+ addAbortToBatch(omMetadataManager, batchOperation,
+ multipartKey, multipartOpenKey, omMultipartKeyInfo, omBucketInfo,
+ getBucketLayout());
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
new file mode 100644
index 0000000000..ea175ac001
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is the background service to abort incomplete Multipart Upload.
+ * Scan the MultipartInfoTable periodically to get MPU keys with
+ * creationTimestamp older than a certain threshold, and delete them.
+ */
+public class MultipartUploadCleanupService extends BackgroundService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MultipartUploadCleanupService.class);
+
+ // Similar to OpenKeyCleanupService, use a single thread.
+ private static final int MPU_INFO_DELETING_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private final KeyManager keyManager;
+ // Dummy client ID to use for response.
+ private final ClientId clientId = ClientId.randomId();
+ private final Duration expireThreshold;
+ private final int mpuPartsLimitPerTask;
+ private final AtomicLong submittedMpuInfoCount;
+ private final AtomicLong runCount;
+ private final AtomicBoolean suspended;
+
+ public MultipartUploadCleanupService(long interval, TimeUnit unit,
+ long timeout, OzoneManager ozoneManager, ConfigurationSource conf) {
+ super("MultipartUploadCleanupService", interval, unit,
+ MPU_INFO_DELETING_CORE_POOL_SIZE, timeout);
+ this.ozoneManager = ozoneManager;
+ this.keyManager = ozoneManager.getKeyManager();
+
+ long expireMillis = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD,
+ OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.expireThreshold = Duration.ofMillis(expireMillis);
+
+ this.mpuPartsLimitPerTask = conf.getInt(
+ OMConfigKeys.OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK,
+ OMConfigKeys.OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK_DEFAULT);
+
+ this.submittedMpuInfoCount = new AtomicLong(0);
+ this.runCount = new AtomicLong(0);
+ this.suspended = new AtomicBoolean(false);
+ }
+
+ /**
+ * Returns the number of times this Background service has run.
+ *
+ * @return Long, run count.
+ */
+ @VisibleForTesting
+ public long getRunCount() {
+ return runCount.get();
+ }
+
+ /**
+ * Suspend the service (for testing).
+ */
+ @VisibleForTesting
+ public void suspend() {
+ suspended.set(true);
+ }
+
+ /**
+ * Resume the service if suspended (for testing).
+ */
+ @VisibleForTesting
+ public void resume() {
+ suspended.set(false);
+ }
+
+ /**
+ * Returns the number of MPU info that were submitted for deletion by this
+ * service. If the MPUInfoTable were completed/aborted
+ * from the MPUInfoTable between being submitted for deletion
+ * and the actual delete operation, they will not be deleted.
+ *
+ * @return long count.
+ */
+ @VisibleForTesting
+ public long getSubmittedMpuInfoCount() {
+ return submittedMpuInfoCount.get();
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new MultipartUploadCleanupTask());
+ return queue;
+ }
+
+ private boolean shouldRun() {
+ return !suspended.get() && ozoneManager.isLeaderReady();
+ }
+
+ private boolean isRatisEnabled() {
+ return ozoneManager.isRatisEnabled();
+ }
+
+ private class MultipartUploadCleanupTask implements BackgroundTask {
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ if (!shouldRun()) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ runCount.incrementAndGet();
+ long startTime = Time.monotonicNow();
+ List<ExpiredMultipartUploadsBucket> expiredMultipartUploads = null;
+ try {
+ expiredMultipartUploads = keyManager.getExpiredMultipartUploads(
+ expireThreshold, mpuPartsLimitPerTask);
+ } catch (IOException e) {
+ LOG.error("Unable to get expired MPU info, retry in next interval", e);
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ if (expiredMultipartUploads != null &&
+ !expiredMultipartUploads.isEmpty()) {
+ int numExpiredMultipartUploads = expiredMultipartUploads.stream()
+ .mapToInt(ExpiredMultipartUploadsBucket::getMultipartUploadsCount)
+ .sum();
+
+ OMRequest omRequest = createRequest(expiredMultipartUploads);
+ submitRequest(omRequest);
+
+ LOG.debug("Number of expired multipart info submitted for deletion: "
+ + "{}, elapsed time: {}ms", numExpiredMultipartUploads,
+ Time.monotonicNow() - startTime);
+ submittedMpuInfoCount.addAndGet(numExpiredMultipartUploads);
+ }
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ private OMRequest createRequest(List<ExpiredMultipartUploadsBucket>
+ expiredMultipartUploadsBuckets) {
+ MultipartUploadsExpiredAbortRequest request =
+ MultipartUploadsExpiredAbortRequest.newBuilder()
+ .addAllExpiredMultipartUploadsPerBucket(
+ expiredMultipartUploadsBuckets)
+ .build();
+
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.AbortExpiredMultiPartUploads)
+ .setMultipartUploadsExpiredAbortRequest(request)
+ .setClientId(clientId.toString())
+ .build();
+
+ return omRequest;
+ }
+
+ private void submitRequest(OMRequest omRequest) {
+ try {
+ if (isRatisEnabled()) {
+ OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
+
+ RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+ .setClientId(clientId)
+ .setServerId(server.getRaftPeerId())
+ .setGroupId(server.getRaftGroupId())
+ .setCallId(runCount.get())
+ .setMessage(Message.valueOf(
+ OMRatisHelper.convertRequestToByteString(omRequest)))
+ .setType(RaftClientRequest.writeRequestType())
+ .build();
+
+ server.submitRequest(omRequest, raftClientRequest);
+ } else {
+ ozoneManager.getOmServerProtocol().submitRequest(null,
+ omRequest);
+ }
+ } catch (ServiceException e) {
+ LOG.error("Expired multipart info delete request failed. " +
+ "Will retry at next run.", e);
+ }
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 34bd721602..58dfeb7b83 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -33,8 +33,11 @@ import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +48,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.time.Duration;
+import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -56,6 +60,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
@@ -581,6 +587,11 @@ public class TestOmMetadataManager {
testGetExpiredOpenKeysExcludeMPUKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
}
+ @Test
+ public void testGetExpiredMultipartUploads() throws Exception {
+ testGetExpiredMPUs();
+ }
+
private void testGetExpiredOpenKeys(BucketLayout bucketLayout)
throws Exception {
final String bucketName = UUID.randomUUID().toString();
@@ -747,6 +758,91 @@ public class TestOmMetadataManager {
.isEmpty());
}
+ private void testGetExpiredMPUs() throws Exception {
+ final String bucketName = UUID.randomUUID().toString();
+ final String volumeName = UUID.randomUUID().toString();
+ final int numExpiredMPUs = 4;
+ final int numUnexpiredMPUs = 1;
+ final int numPartsPerMPU = 5;
+ // To create expired keys, they will be assigned a creation time as
+ // old as the minimum expiration time.
+ final long expireThresholdMillis = ozoneConfiguration.getTimeDuration(
+ OZONE_OM_MPU_EXPIRE_THRESHOLD,
+ OZONE_OM_MPU_EXPIRE_THRESHOLD_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ final Duration expireThreshold = Duration.ofMillis(expireThresholdMillis);
+
+ final long expiredMPUCreationTime =
+ Instant.now().minus(expireThreshold).toEpochMilli();
+
+ // Add expired MPUs to multipartInfoTable.
+ // The method under test does not check for expired open keys in the
+ // cache, since they will be picked up once the cache is flushed.
+ Set<String> expiredMPUs = new HashSet<>();
+ for (int i = 0; i < numExpiredMPUs + numUnexpiredMPUs; i++) {
+ final long creationTime = i < numExpiredMPUs ?
+ expiredMPUCreationTime : Instant.now().toEpochMilli();
+
+ String uploadId = OMMultipartUploadUtils.getMultipartUploadId();
+ final OmMultipartKeyInfo mpuKeyInfo = OMRequestTestUtils
+ .createOmMultipartKeyInfo(uploadId, creationTime,
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, 0L);
+
+ String keyName = "expired" + i;
+ // Key info to construct the MPU DB key
+ final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
+ bucketName, keyName, HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, 0L, creationTime);
+
+
+ for (int j = 1; j <= numPartsPerMPU; j++) {
+ PartKeyInfo partKeyInfo = OMRequestTestUtils
+ .createPartKeyInfo(volumeName, bucketName, keyName, uploadId, j);
+ OMRequestTestUtils.addPart(partKeyInfo, mpuKeyInfo);
+ }
+
+ final String mpuDbKey = OMRequestTestUtils.addMultipartInfoToTable(
+ false, keyInfo, mpuKeyInfo, 0L, omMetadataManager);
+
+ expiredMPUs.add(mpuDbKey);
+ }
+
+ // Test retrieving fewer expire MPU parts than actually exist (exact).
+ List<ExpiredMultipartUploadsBucket>
+ someExpiredMPUs = omMetadataManager.getExpiredMultipartUploads(
+ expireThreshold,
+ (numExpiredMPUs * numPartsPerMPU) - (numPartsPerMPU));
+ List<String> names = getMultipartKeyNames(someExpiredMPUs);
+ assertEquals(numExpiredMPUs - 1, names.size());
+ assertTrue(expiredMPUs.containsAll(names));
+
+ // Test retrieving fewer expire MPU parts than actually exist (round up).
+ someExpiredMPUs = omMetadataManager.getExpiredMultipartUploads(
+ expireThreshold,
+ (numExpiredMPUs * numPartsPerMPU) - (numPartsPerMPU + 1));
+ names = getMultipartKeyNames(someExpiredMPUs);
+ assertEquals(numExpiredMPUs - 1, names.size());
+ assertTrue(expiredMPUs.containsAll(names));
+
+ // Test attempting to retrieving more expire MPU parts than actually exist.
+ List<ExpiredMultipartUploadsBucket> allExpiredMPUs =
+ omMetadataManager.getExpiredMultipartUploads(expireThreshold,
+ (numExpiredMPUs * numPartsPerMPU) + numPartsPerMPU);
+ names = getMultipartKeyNames(allExpiredMPUs);
+ assertEquals(numExpiredMPUs, names.size());
+ assertTrue(expiredMPUs.containsAll(names));
+
+ // Test retrieving exact amount of MPU parts than actually exist.
+ allExpiredMPUs =
+ omMetadataManager.getExpiredMultipartUploads(expireThreshold,
+ (numExpiredMPUs * numPartsPerMPU));
+ names = getMultipartKeyNames(allExpiredMPUs);
+ assertEquals(numExpiredMPUs, names.size());
+ assertTrue(expiredMPUs.containsAll(names));
+ }
+
private List<String> getOpenKeyNames(
Collection<OpenKeyBucket.Builder> openKeyBuckets) {
return openKeyBuckets.stream()
@@ -756,6 +852,15 @@ public class TestOmMetadataManager {
.collect(Collectors.toList());
}
+ private List<String> getMultipartKeyNames(
+ List<ExpiredMultipartUploadsBucket> expiredMultipartUploadsBuckets) {
+ return expiredMultipartUploadsBuckets.stream()
+ .map(ExpiredMultipartUploadsBucket::getMultipartUploadsList)
+ .flatMap(List::stream)
+ .map(ExpiredMultipartUploadInfo::getName)
+ .collect(Collectors.toList());
+ }
+
private void addKeysToOM(String volumeName, String bucketName,
String keyName, int i) throws Exception {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 4c64b6b645..3e3433fab4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
@@ -58,6 +59,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketI
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateTenantRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteTenantRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3VolumeContextRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListTenantRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadAbortRequest;
@@ -71,6 +73,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -380,7 +383,7 @@ public final class OMRequestTestUtils {
* Add multipart info entry to the multipartInfoTable.
* @throws Exception
*/
- public static void addMultipartInfoToTable(boolean addToCache,
+ public static String addMultipartInfoToTable(boolean addToCache,
OmKeyInfo omKeyInfo, OmMultipartKeyInfo omMultipartKeyInfo,
long trxnLogIndex, OMMetadataManager omMetadataManager)
throws IOException {
@@ -396,6 +399,33 @@ public final class OMRequestTestUtils {
omMetadataManager.getMultipartInfoTable().put(ozoneDBKey,
omMultipartKeyInfo);
+
+ return ozoneDBKey;
+ }
+
+ public static PartKeyInfo createPartKeyInfo(String volumeName,
+ String bucketName, String keyName, String uploadId, int partNumber) {
+ return PartKeyInfo.newBuilder()
+ .setPartNumber(partNumber)
+ .setPartName(OmMultipartUpload.getDbKey(
+ volumeName, bucketName, keyName, uploadId))
+ .setPartKeyInfo(KeyInfo.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(100L) // Just set dummy size for testing
+ .setCreationTime(Time.now())
+ .setModificationTime(Time.now())
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).build()).build();
+ }
+
+ /**
+ * Append a {@link PartKeyInfo} to an {@link OmMultipartKeyInfo}.
+ */
+ public static void addPart(PartKeyInfo partKeyInfo,
+ OmMultipartKeyInfo omMultipartKeyInfo) {
+ omMultipartKeyInfo.addPartKeyInfo(partKeyInfo);
}
/**
@@ -684,17 +714,18 @@ public final class OMRequestTestUtils {
BucketLayout.DEFAULT);
}
- public static void addBucketToDB(String volumeName, String bucketName,
- OMMetadataManager omMetadataManager, BucketLayout bucketLayout)
+ public static OmBucketInfo addBucketToDB(String volumeName,
+ String bucketName, OMMetadataManager omMetadataManager,
+ BucketLayout bucketLayout)
throws Exception {
- addBucketToDB(omMetadataManager,
+ return addBucketToDB(omMetadataManager,
OmBucketInfo.newBuilder().setVolumeName(volumeName)
.setBucketName(bucketName)
.setBucketLayout(bucketLayout)
);
}
- public static void addBucketToDB(OMMetadataManager omMetadataManager,
+ public static OmBucketInfo addBucketToDB(OMMetadataManager omMetadataManager,
OmBucketInfo.Builder builder) throws Exception {
OmBucketInfo omBucketInfo = builder
@@ -709,6 +740,8 @@ public final class OMRequestTestUtils {
omMetadataManager.getBucketTable().addCacheEntry(
new CacheKey<>(omMetadataManager.getBucketKey(volumeName, bucketName)),
CacheValue.get(1L, omBucketInfo));
+
+ return omBucketInfo;
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3ExpiredMultipartUploadsAbortRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3ExpiredMultipartUploadsAbortRequest.java
new file mode 100644
index 0000000000..d23bf4b13a
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3ExpiredMultipartUploadsAbortRequest.java
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.UniqueId;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .Status;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.WithObjectID;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Tests S3ExpiredMultipartUploadsAbortRequest.
+ */
+@RunWith(Parameterized.class)
+public class TestS3ExpiredMultipartUploadsAbortRequest
+ extends TestS3MultipartRequest {
+
+ private final BucketLayout bucketLayout;
+
+ public TestS3ExpiredMultipartUploadsAbortRequest(BucketLayout bucketLayout) {
+ this.bucketLayout = bucketLayout;
+ }
+
+ @Override
+ public BucketLayout getBucketLayout() {
+ return bucketLayout;
+ }
+
+ @Parameters
+ public static Collection<BucketLayout> bucketLayouts() {
+ return Arrays.asList(
+ BucketLayout.DEFAULT,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED
+ );
+ }
+
+ /**
+ * Tests removing MPU from multipartInfoTable cache that never existed there.
+ * The operation should complete without errors.
+ * <p>
+ * This simulates a run of MPU cleanup service where a set
+ * of expired MPUs are identified and passed to the request,
+ * but before the request can process them, those MPUs are
+ * completed/aborted and therefore removed from the multipartInfoTable.
+ */
+ @Test
+ public void testAbortMPUsNotInTable() throws Exception {
+ final String volumeName = UUID.randomUUID().toString();
+ final String bucketName = UUID.randomUUID().toString();
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager, getBucketLayout());
+ List<String> mpuKeys = createMPUs(volumeName, bucketName, 5, 5);
+ abortExpiredMPUsFromCache(volumeName, bucketName, mpuKeys);
+ assertNotInMultipartInfoTable(mpuKeys);
+ }
+
+ /**
+ * Tests adding multiple MPUs to the multipartInfoTable,
+ * and updating the table cache to only remove some of them.
+ * MPUs not removed should still be present in the multipartInfoTable.
+ * Mixes which MPUs will be kept and deleted among different volumes and
+ * buckets.
+ */
+ @Test
+ public void testAbortSubsetOfMPUs() throws Exception {
+ final String volume1 = UUID.randomUUID().toString();
+ final String volume2 = UUID.randomUUID().toString();
+ final String bucket1 = UUID.randomUUID().toString();
+ final String bucket2 = UUID.randomUUID().toString();
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volume1, bucket1,
+ omMetadataManager, getBucketLayout());
+ OMRequestTestUtils.addVolumeAndBucketToDB(volume1, bucket2,
+ omMetadataManager, getBucketLayout());
+ OMRequestTestUtils.addVolumeAndBucketToDB(volume2, bucket2,
+ omMetadataManager, getBucketLayout());
+
+ List<String> v1b1MPUsToAbort =
+ createMPUs(volume1, bucket1, 3, 3);
+ List<String> v1b1MPUsToKeep =
+ createMPUs(volume1, bucket1, 3, 3);
+
+ List<String> v1b2MPUsToAbort =
+ createMPUs(volume1, bucket2, 3, 3);
+ List<String> v1b2MPUsToKeep =
+ createMPUs(volume1, bucket2, 2, 2);
+
+ List<String> v2b2MPUsToAbort =
+ createMPUs(volume2, bucket2, 2, 2);
+ List<String> v2b2MPUsToKeep =
+ createMPUs(volume2, bucket2, 3, 3);
+
+ abortExpiredMPUsFromCache(volume1, bucket1, v1b1MPUsToAbort);
+ abortExpiredMPUsFromCache(volume1, bucket2, v1b2MPUsToAbort);
+ abortExpiredMPUsFromCache(volume2, bucket2, v2b2MPUsToAbort);
+
+ assertNotInMultipartInfoTable(v1b1MPUsToAbort);
+ assertNotInMultipartInfoTable(v1b2MPUsToAbort);
+ assertNotInMultipartInfoTable(v2b2MPUsToAbort);
+
+ assertInMultipartInfoTable(v1b1MPUsToKeep);
+ assertInMultipartInfoTable(v1b2MPUsToKeep);
+ assertInMultipartInfoTable(v2b2MPUsToKeep);
+ }
+
+ /**
+ * Tests removing MPUs from the multipart info table cache that have higher
+ * updateID than the transactionID. Those MPUs should be ignored.
+ * It is OK if updateID equals to or less than transactionID.
+ * See {@link WithObjectID#setUpdateID(long, boolean)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAbortMPUsWithHigherUpdateID() throws Exception {
+ final String volumeName = UUID.randomUUID().toString();
+ final String bucketName = UUID.randomUUID().toString();
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager, getBucketLayout());
+
+ final long updateId = 200L;
+ final long transactionId = 100L;
+
+ // Used only to build the MPU db key
+ OmKeyInfo.Builder keyBuilder = new OmKeyInfo.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName);
+
+ OmMultipartKeyInfo.Builder mpuBuilder = new OmMultipartKeyInfo.Builder()
+ .setReplicationConfig(ReplicationConfig.fromTypeAndFactor(
+ ReplicationType.RATIS, ReplicationFactor.THREE));
+
+ if (getBucketLayout().equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+ mpuBuilder.setParentID(UniqueId.next());
+ }
+
+ OmKeyInfo keyWithHigherUpdateID = keyBuilder
+ .setKeyName("key").build();
+ OmMultipartKeyInfo mpuWithHigherUpdateID = mpuBuilder
+ .setUpdateID(updateId)
+ .setUploadID(OMMultipartUploadUtils.getMultipartUploadId())
+ .build();
+
+ OmKeyInfo keyWithSameUpdateID = keyBuilder
+ .setKeyName("key2").build();
+ OmMultipartKeyInfo mpuWithSameUpdateID = mpuBuilder
+ .setUpdateID(transactionId)
+ .setUploadID(OMMultipartUploadUtils.getMultipartUploadId())
+ .build();
+
+ String mpuDBKeyWithHigherUpdateId = OMRequestTestUtils
+ .addMultipartInfoToTable(false,
+ keyWithHigherUpdateID, mpuWithHigherUpdateID,
+ mpuWithHigherUpdateID.getUpdateID(), omMetadataManager);
+
+ String mpuDBKeyWithSameUpdateId = OMRequestTestUtils
+ .addMultipartInfoToTable(false,
+ keyWithSameUpdateID, mpuWithSameUpdateID,
+ mpuWithSameUpdateID.getUpdateID(), omMetadataManager);
+
+
+ OMRequest omRequest = doPreExecute(createAbortExpiredMPURequest(
+ volumeName, bucketName, Arrays.asList(mpuDBKeyWithHigherUpdateId,
+ mpuDBKeyWithSameUpdateId)));
+ S3ExpiredMultipartUploadsAbortRequest expiredMultipartUploadsAbortRequest =
+ new S3ExpiredMultipartUploadsAbortRequest(omRequest);
+
+ OMClientResponse omClientResponse =
+
expiredMultipartUploadsAbortRequest.validateAndUpdateCache(ozoneManager,
+ transactionId, ozoneManagerDoubleBufferHelper);
+
+ Assert.assertEquals(Status.OK,
+ omClientResponse.getOMResponse().getStatus());
+
+ assertInMultipartInfoTable(Collections.singletonList(
+ mpuDBKeyWithHigherUpdateId));
+ assertNotInMultipartInfoTable(Collections.singletonList(
+ mpuDBKeyWithSameUpdateId));
+ }
+
+ /**
+ * Tests on cleaning up the MPUs whose open keys have been
+ * cleaned by open key clean up service prior to HDDS-9098.
+ * Where for normal MPU complete/abort request, the request
+ * should fail if the MPU open key doesn't exist in MPU table,
+ * aborting expired orphan MPUs should not fail.
+ */
+ @Test
+ public void testAbortOrphanMPUs() throws Exception {
+ final String volumeName = UUID.randomUUID().toString();
+ final String bucketName = UUID.randomUUID().toString();
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager, getBucketLayout());
+ List<String> mpuKeys = createMPUs(volumeName, bucketName, 5, 5);
+
+ // Remove the open MPU keys to simulate orphan MPU
+ removeFromOpenKeyTable(mpuKeys);
+
+ abortExpiredMPUsFromCache(volumeName, bucketName, mpuKeys);
+
+ assertNotInMultipartInfoTable(mpuKeys);
+ }
+
+ /**
+ * Tests metrics set by {@link S3ExpiredMultipartUploadsAbortRequest}.
+ * Submits a set of MPUs for abort where only some of the keys actually
+ * exist in the multipart info table, and asserts that the metrics count
+ * MPUs that were submitted for deletion versus those that were actually
+ * deleted.
+ * @throws Exception
+ */
+ @Test
+ public void testMetrics() throws Exception {
+ final String volume = UUID.randomUUID().toString();
+ final String bucket = UUID.randomUUID().toString();
+ final String key = UUID.randomUUID().toString();
+
+ OMRequestTestUtils.addVolumeAndBucketToDB(volume, bucket,
+ omMetadataManager, getBucketLayout());
+
+ final int numExistentMPUs = 3;
+ final int numNonExistentMPUs = 5;
+ final int numParts = 5;
+
+ OMMetrics metrics = ozoneManager.getMetrics();
+ Assert.assertEquals(0, metrics.getNumExpiredMPUAbortRequests());
+ Assert.assertEquals(0, metrics.getNumOpenKeyDeleteRequestFails());
+ Assert.assertEquals(0, metrics.getNumExpiredMPUSubmittedForAbort());
+ Assert.assertEquals(0, metrics.getNumExpiredMPUPartsAborted());
+ Assert.assertEquals(0, metrics.getNumExpiredMPUAbortRequestFails());
+
+ List<String> existentMPUs =
+ createMPUs(volume, bucket, key, numExistentMPUs, numParts,
+ getBucketLayout());
+
+ List<String> nonExistentMPUs =
+ createMockMPUKeys(volume, bucket, key, numNonExistentMPUs);
+
+ abortExpiredMPUsFromCache(volume, bucket, existentMPUs, nonExistentMPUs);
+
+ assertNotInMultipartInfoTable(existentMPUs);
+ assertNotInMultipartInfoTable(nonExistentMPUs);
+
+ Assert.assertEquals(1, metrics.getNumExpiredMPUAbortRequests());
+ Assert.assertEquals(0,
+ metrics.getNumExpiredMPUAbortRequestFails());
+ Assert.assertEquals(numExistentMPUs + numNonExistentMPUs,
+ metrics.getNumExpiredMPUSubmittedForAbort());
+ Assert.assertEquals(numExistentMPUs,
+ metrics.getNumExpiredMPUAborted());
+ Assert.assertEquals(numExistentMPUs * numParts,
+ metrics.getNumExpiredMPUPartsAborted());
+ }
+
+ /**
+ * Constructs a new {@link S3ExpiredMultipartUploadsAbortRequest} objects,
+ * and calls its {@link S3ExpiredMultipartUploadsAbortRequest#preExecute}
+ * method with {@code originalOMRequest}. It verifies that
+ * {@code originalOMRequest} is modified after the call, and returns it.
+ * @throws Exception
+ */
+ private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception
{
+ S3ExpiredMultipartUploadsAbortRequest expiredMultipartUploadsAbortRequest =
+ new S3ExpiredMultipartUploadsAbortRequest(originalOMRequest);
+
+ OMRequest modifiedOmRequest =
+ expiredMultipartUploadsAbortRequest.preExecute(ozoneManager);
+
+ // Will not be equal, as UserInfo will be set.
+ Assert.assertNotEquals(originalOMRequest, modifiedOmRequest);
+
+ return modifiedOmRequest;
+ }
+
+ private void abortExpiredMPUsFromCache(String volumeName, String bucketName,
+ List<String>... allMPUKeys) throws Exception {
+ abortExpiredMPUsFromCache(volumeName, bucketName,
+ Arrays.stream(allMPUKeys).flatMap(List::stream)
+ .collect(Collectors.toList()));
+ }
+
+
+ /**
+ * Runs the validate and update cache step of
+ * {@link S3ExpiredMultipartUploadsAbortRequest} to mark the MPUs
+ * as deleted in the multipartInfoTable cache.
+ * Asserts that the call's response status is {@link Status#OK}.
+ * @throws Exception
+ */
+ private void abortExpiredMPUsFromCache(String volumeName, String bucketName,
+ List<String> mpuKeys) throws Exception {
+
+ OMRequest omRequest =
+ doPreExecute(
+ createAbortExpiredMPURequest(volumeName, bucketName, mpuKeys));
+
+ S3ExpiredMultipartUploadsAbortRequest expiredMultipartUploadsAbortRequest =
+ new S3ExpiredMultipartUploadsAbortRequest(omRequest);
+
+ OMClientResponse omClientResponse =
+ expiredMultipartUploadsAbortRequest.validateAndUpdateCache(
+ ozoneManager, 100L, ozoneManagerDoubleBufferHelper);
+
+ Assert.assertEquals(Status.OK,
+ omClientResponse.getOMResponse().getStatus());
+ }
+
+ private OMRequest createAbortExpiredMPURequest(String volumeName,
+ String bucketName, List<String> mpuKeysToAbort) {
+
+ List<ExpiredMultipartUploadInfo> expiredMultipartUploads = mpuKeysToAbort
+ .stream().map(name ->
+ ExpiredMultipartUploadInfo.newBuilder().setName(name).build())
+ .collect(Collectors.toList());
+ ExpiredMultipartUploadsBucket expiredMultipartUploadsBucket =
+ ExpiredMultipartUploadsBucket.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .addAllMultipartUploads(expiredMultipartUploads)
+ .build();
+
+ MultipartUploadsExpiredAbortRequest mpuExpiredAbortRequest =
+ MultipartUploadsExpiredAbortRequest.newBuilder()
+ .addExpiredMultipartUploadsPerBucket(expiredMultipartUploadsBucket)
+ .build();
+
+ return OMRequest.newBuilder()
+ .setMultipartUploadsExpiredAbortRequest(mpuExpiredAbortRequest)
+ .setCmdType(OzoneManagerProtocolProtos
+ .Type.AbortExpiredMultiPartUploads)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
+ }
+
+ /**
+ * Create MPus with randomized key name.
+ */
+ private List<String> createMPUs(String volume, String bucket, int count,
+ int numParts) throws Exception {
+ return createMPUs(volume, bucket, null, count, numParts,
+ getBucketLayout());
+ }
+
+ /*
+ * Make MPUs with same key name and randomized upload ID.
+ * If key is specified, simulate scenarios where there are
+ * concurrent multipart uploads happening at the same time.
+ */
+ private List<String> createMPUs(String volume, String bucket,
+ String key, int count, int numParts, BucketLayout buckLayout)
+ throws Exception {
+ if (buckLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ return createMPUsWithFSO(volume, bucket, key, count, numParts);
+ } else {
+ return createMPUs(volume, bucket, key, count, numParts);
+ }
+ }
+
+ /**
+ * Make MPUs with same key name and randomized upload ID for FSO-enabled
+ * bucket.
+ * If key is specified, simulate scenarios where there are
+ * concurrent multipart uploads happening at the same time.
+ */
+ private List<String> createMPUsWithFSO(String volume, String bucket,
+ String key, int count, int numParts) throws Exception {
+ List<String> mpuKeys = new ArrayList<>();
+
+ long trxnLogIndex = 1L;
+
+ String dirName = "a/b/c/";
+
+ final long volumeId = omMetadataManager.getVolumeId(volume);
+ final long bucketId = omMetadataManager.getBucketId(volume, bucket);
+
+ for (int i = 0; i < count; i++) {
+ // Initiate MPU
+ final String keyName = dirName + (key != null ? key :
+ UUID.randomUUID().toString());
+
+ long parentID = OMRequestTestUtils.addParentsToDirTable(
+ volume, bucket, dirName, omMetadataManager);
+
+ OMRequest initiateMPURequest =
+ doPreExecuteInitiateMPUWithFSO(volume, bucket, keyName);
+
+ S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+ new S3InitiateMultipartUploadRequestWithFSO(initiateMPURequest,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ OMClientResponse omClientResponse = s3InitiateMultipartUploadRequest
+ .validateAndUpdateCache(ozoneManager, trxnLogIndex,
+ ozoneManagerDoubleBufferHelper);
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus() ==
+ OzoneManagerProtocolProtos.Status.OK);
+
+ trxnLogIndex++;
+
+ String multipartUploadID = omClientResponse.getOMResponse()
+ .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+ String mpuKey = omMetadataManager.getMultipartKey(
+ volume, bucket, keyName, multipartUploadID);
+
+ String mpuOpenKey = OMMultipartUploadUtils
+ .getMultipartOpenKey(volume, bucket, keyName, multipartUploadID,
+ omMetadataManager, getBucketLayout());
+ Assert.assertNotNull(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .get(mpuOpenKey));
+
+ mpuKeys.add(mpuKey);
+
+ // Commit MPU parts
+ for (int j = 1; j <= numParts; j++) {
+ long clientID = UniqueId.next();
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(
+ volume, bucket, keyName, clientID, multipartUploadID, j);
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ new S3MultipartUploadCommitPartRequestWithFSO(
+ commitMultipartRequest, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ // Add key to open key table to be used in MPU commit processing
+ OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volume,
+ bucket, keyName, HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, parentID + j, parentID,
+ trxnLogIndex, Time.now(), true);
+ String fileName = OzoneFSUtils.getFileName(keyName);
+ OMRequestTestUtils.addFileToKeyTable(true, false,
+ fileName, omKeyInfo, clientID, trxnLogIndex, omMetadataManager);
+
+ OMClientResponse commitResponse =
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(
+ ozoneManager, trxnLogIndex, ozoneManagerDoubleBufferHelper);
+ trxnLogIndex++;
+
+ Assert.assertTrue(commitResponse.getOMResponse().getStatus() ==
+ OzoneManagerProtocolProtos.Status.OK);
+
+ // MPU part open key should be deleted after commit
+ String partKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+ parentID, fileName, clientID);
+ Assert.assertNull(
+ omMetadataManager.getOpenKeyTable(getBucketLayout()).get(partKey));
+ }
+ }
+
+ return mpuKeys;
+ }
+
+
+ /**
+ * Make MPUs with same key name and randomized upload ID for LEGACY/OBS
+ * bucket.
+ * If key is specified, simulate scenarios where there are
+ * concurrent multipart uploads happening at the same time.
+ */
+ private List<String> createMPUs(String volume, String bucket,
+ String key, int count, int numParts) throws Exception {
+ List<String> mpuKeys = new ArrayList<>();
+
+ long trxnLogIndex = 1L;
+
+ for (int i = 0; i < count; i++) {
+ // Initiate MPU
+ final String keyName = key != null ? key : UUID.randomUUID().toString();
+ OMRequest initiateMPURequest =
+ doPreExecuteInitiateMPU(volume, bucket, keyName);
+
+ S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+ getS3InitiateMultipartUploadReq(initiateMPURequest);
+
+ OMClientResponse omClientResponse = s3InitiateMultipartUploadRequest
+ .validateAndUpdateCache(ozoneManager, trxnLogIndex,
+ ozoneManagerDoubleBufferHelper);
+
+ Assert.assertTrue(omClientResponse.getOMResponse().getStatus() ==
+ OzoneManagerProtocolProtos.Status.OK);
+
+ trxnLogIndex++;
+
+ String multipartUploadID = omClientResponse.getOMResponse()
+ .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+ String mpuKey = omMetadataManager.getMultipartKey(
+ volume, bucket, keyName, multipartUploadID);
+
+ String mpuOpenKey = OMMultipartUploadUtils
+ .getMultipartOpenKey(volume, bucket, keyName, multipartUploadID,
+ omMetadataManager, getBucketLayout());
+ Assert.assertNotNull(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .get(mpuOpenKey));
+
+ mpuKeys.add(mpuKey);
+
+ // Commit MPU parts
+ for (int j = 1; j <= numParts; j++) {
+ long clientID = UniqueId.next();
+ OMRequest commitMultipartRequest = doPreExecuteCommitMPU(
+ volume, bucket, keyName, clientID, multipartUploadID, j);
+
+ S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+ getS3MultipartUploadCommitReq(commitMultipartRequest);
+
+ // Add key to open key table to be used in MPU commit processing
+ OMRequestTestUtils.addKeyToTable(
+ true, true,
+ volume, bucket, keyName, clientID,
HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+ OMClientResponse commitResponse =
+ s3MultipartUploadCommitPartRequest.validateAndUpdateCache(
+ ozoneManager, trxnLogIndex, ozoneManagerDoubleBufferHelper);
+ trxnLogIndex++;
+
+ Assert.assertTrue(commitResponse.getOMResponse().getStatus() ==
+ OzoneManagerProtocolProtos.Status.OK);
+
+ // MPU part open key should be deleted after commit
+ String partKey = omMetadataManager.getOpenKey(volume, bucket, keyName,
+ clientID);
+ Assert.assertNull(
+ omMetadataManager.getOpenKeyTable(getBucketLayout()).get(partKey));
+ }
+ }
+
+ return mpuKeys;
+ }
+
+ /**
+ * Create mock MPU keys that do not actuall exist in the multipartInfoTable.
+ */
+ private List<String> createMockMPUKeys(String volume, String bucket,
+ String key, int count) {
+ List<String> mpuKeys = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ final String keyName = key != null ? key : UUID.randomUUID().toString();
+ String multipartUploadID = OMMultipartUploadUtils.getMultipartUploadId();
+ String mpuKey = omMetadataManager.getMultipartKey(
+ volume, bucket, keyName, multipartUploadID);
+ mpuKeys.add(mpuKey);
+ }
+ return mpuKeys;
+ }
+
+ private void assertInMultipartInfoTable(List<String> mpuKeys)
+ throws Exception {
+ for (String mpuKey: mpuKeys) {
+ Assert.assertTrue(omMetadataManager.getMultipartInfoTable()
+ .isExist(mpuKey));
+ }
+ }
+
+ private void assertNotInMultipartInfoTable(List<String> mpuKeys)
+ throws Exception {
+ for (String mpuKey: mpuKeys) {
+ Assert.assertFalse(omMetadataManager.getMultipartInfoTable()
+ .isExist(mpuKey));
+ }
+ }
+
+ private void assertNotInOpenKeyTable(List<String> mpuOpenKeys)
+ throws Exception {
+ for (String mpuOpenKey: mpuOpenKeys) {
+ Assert.assertFalse(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .isExist(mpuOpenKey));
+ }
+ }
+
+ private void assertInOpenKeyTable(List<String> mpuOpenKeys)
+ throws Exception {
+ for (String mpuOpenKey: mpuOpenKeys) {
+ Assert.assertTrue(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .isExist(mpuOpenKey));
+ }
+ }
+
+ /**
+ * From the MPU DB keys, we will remove the corresponding MPU open keys
+ * from the openKeyTable. This is used to simulate orphan MPU keys.
+ */
+ private void removeFromOpenKeyTable(List<String> mpuKeys)
+ throws Exception {
+ List<OmMultipartUpload> omMultipartUploads = mpuKeys.stream()
+ .map(OmMultipartUpload::from)
+ .collect(Collectors.toList());
+
+ List<String> mpuOpenKeys = new ArrayList<>();
+
+ for (OmMultipartUpload omMultipartUpload: omMultipartUploads) {
+ mpuOpenKeys.add(OMMultipartUploadUtils
+ .getMultipartOpenKey(
+ omMultipartUpload.getVolumeName(),
+ omMultipartUpload.getBucketName(),
+ omMultipartUpload.getKeyName(),
+ omMultipartUpload.getUploadId(),
+ omMetadataManager,
+ getBucketLayout()));
+ }
+
+ assertInOpenKeyTable(mpuOpenKeys);
+ for (String mpuOpenKey: mpuOpenKeys) {
+ omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .addCacheEntry(new CacheKey<>(mpuOpenKey),
+ new CacheValue<>(Optional.absent(), 100L));
+ omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .delete(mpuOpenKey);
+ }
+ assertNotInOpenKeyTable(mpuOpenKeys);
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 4a2ced8e60..afe5a00add 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
import org.junit.After;
import org.junit.Assert;
@@ -55,6 +56,7 @@ import
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
@@ -79,7 +81,7 @@ public class TestS3MultipartRequest {
@Before
public void setup() throws Exception {
- ozoneManager = Mockito.mock(OzoneManager.class);
+ ozoneManager = mock(OzoneManager.class);
omMetrics = OMMetrics.create();
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
@@ -88,12 +90,12 @@ public class TestS3MultipartRequest {
ozoneManager);
when(ozoneManager.getMetrics()).thenReturn(omMetrics);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
- auditLogger = Mockito.mock(AuditLogger.class);
+ auditLogger = mock(AuditLogger.class);
ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmMetadataReader =
- Mockito.mock(ReferenceCounted.class);
+ mock(ReferenceCounted.class);
when(ozoneManager.getOmMetadataReader()).thenReturn(rcOmMetadataReader);
// Init OmMetadataReader to let the test pass
- OmMetadataReader omMetadataReader = Mockito.mock(OmMetadataReader.class);
+ OmMetadataReader omMetadataReader = mock(OmMetadataReader.class);
when(omMetadataReader.isNativeAuthorizerEnabled()).thenReturn(true);
when(rcOmMetadataReader.get()).thenReturn(omMetadataReader);
when(ozoneManager.getAccessAuthorizer())
@@ -110,6 +112,10 @@ public class TestS3MultipartRequest {
Pair.of(args.getVolumeName(), args.getBucketName()),
Pair.of(args.getVolumeName(), args.getBucketName()));
});
+ OMLayoutVersionManager lvm = mock(OMLayoutVersionManager.class);
+ when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+ when(ozoneManager.getVersionManager()).thenReturn(lvm);
+ when(ozoneManager.isRatisEnabled()).thenReturn(true);
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
index 12f0c200dd..25cedec80f 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequestWithFSO.java
@@ -64,7 +64,7 @@ public class TestS3MultipartUploadCommitPartRequestWithFSO
@Override
protected void addKeyToOpenKeyTable(String volumeName, String bucketName,
String keyName, long clientID) throws Exception {
- long txnLogId = 10000;
+ long txnLogId = 0L;
OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketName, keyName, HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, parentID + 1, parentID,
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
index 87ba9a2a6e..19ae723fcf 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCompleteRequestWithFSO.java
@@ -71,7 +71,7 @@ public class TestS3MultipartUploadCompleteRequestWithFSO
// add parentDir to dirTable
long parentID = getParentID(volumeName, bucketName, keyName);
- long txnId = 50;
+ long txnId = 2;
long objectId = parentID + 1;
OmKeyInfo omKeyInfoFSO =
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3ExpiredMultipartUploadsAbortResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3ExpiredMultipartUploadsAbortResponse.java
new file mode 100644
index 0000000000..3a7a8a92a4
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3ExpiredMultipartUploadsAbortResponse.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.utils.UniqueId;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests S3ExpiredMultipartUploadsAbortResponse.
+ */
+@RunWith(Parameterized.class)
+public class TestS3ExpiredMultipartUploadsAbortResponse
+ extends TestS3MultipartResponse {
+
+ private final BucketLayout bucketLayout;
+
+ public TestS3ExpiredMultipartUploadsAbortResponse(
+ BucketLayout bucketLayout) {
+ this.bucketLayout = bucketLayout;
+ }
+
+ @Override
+ public BucketLayout getBucketLayout() {
+ return bucketLayout;
+ }
+
+ @Parameters
+ public static Collection<BucketLayout> bucketLayouts() {
+ return Arrays.asList(
+ BucketLayout.DEFAULT,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED
+ );
+ }
+
+ /**
+ * Tests deleting MPUs from multipartInfoTable when the MPU have no
+ * associated parts.
+ * @throws Exception
+ */
+ @Test
+ public void testAddToDBBatchWithEmptyParts() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort =
+ addMPUsToDB(volumeName, 3);
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToKeep =
+ addMPUsToDB(volumeName, 3);
+
+ createAndCommitResponse(mpusToAbort, Status.OK);
+
+ for (List<OmMultipartAbortInfo> abortInfos: mpusToAbort.values()) {
+ for (OmMultipartAbortInfo abortInfo: abortInfos) {
+ // MPUs with no associated parts should have been removed
+ // from the multipartInfoTable. Since there are no parts
+ // these parts will not added to the deletedTable
+ Assert.assertFalse(omMetadataManager.getMultipartInfoTable().isExist(
+ abortInfo.getMultipartKey()));
+ Assert.assertFalse(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .isExist(abortInfo.getMultipartOpenKey()));
+ }
+ }
+
+ for (List<OmMultipartAbortInfo> abortInfos: mpusToKeep.values()) {
+ for (OmMultipartAbortInfo abortInfo: abortInfos) {
+ // These MPUs should not have been removed from the multipartInfoTable
+ Assert.assertTrue(omMetadataManager.getMultipartInfoTable().isExist(
+ abortInfo.getMultipartKey()));
+ Assert.assertTrue(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .isExist(abortInfo.getMultipartOpenKey()));
+ }
+ }
+ }
+
+
+ /**
+ * Tests deleting MPUs from multipartInfoTable when the MPU have some MPU
+ * parts.
+ * @throws Exception
+ */
+ @Test
+ public void testAddToDBBatchWithNonEmptyParts() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort =
+ addMPUsToDB(volumeName, 3, 3);
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToKeep =
+ addMPUsToDB(volumeName, 3, 3);
+
+ createAndCommitResponse(mpusToAbort, Status.OK);
+
+ for (List<OmMultipartAbortInfo> abortInfos: mpusToAbort.values()) {
+ for (OmMultipartAbortInfo abortInfo: abortInfos) {
+ // All the associated parts of the MPU should have been moved from
+ // the multipartInfoTable to the deleted table.
+ Assert.assertFalse(omMetadataManager.getMultipartInfoTable().isExist(
+ abortInfo.getMultipartKey()));
+ Assert.assertFalse(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .isExist(abortInfo.getMultipartOpenKey()));
+
+ for (PartKeyInfo partKeyInfo: abortInfo
+ .getOmMultipartKeyInfo().getPartKeyInfoMap()) {
+ OmKeyInfo currentPartKeyInfo =
+ OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+ String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+ currentPartKeyInfo.getObjectID(), abortInfo.getMultipartKey());
+ Assert.assertTrue(omMetadataManager.getDeletedTable().isExist(
+ deleteKey));
+ }
+
+ }
+ }
+
+ for (List<OmMultipartAbortInfo> abortInfos: mpusToKeep.values()) {
+ for (OmMultipartAbortInfo abortInfo: abortInfos) {
+ // These MPUs should not have been removed from the multipartInfoTable
+ // and its parts should not be in the deletedTable.
+ Assert.assertTrue(omMetadataManager.getMultipartInfoTable().isExist(
+ abortInfo.getMultipartKey()));
+ Assert.assertTrue(omMetadataManager.getOpenKeyTable(getBucketLayout())
+ .isExist(abortInfo.getMultipartOpenKey()));
+
+ for (PartKeyInfo partKeyInfo: abortInfo
+ .getOmMultipartKeyInfo().getPartKeyInfoMap()) {
+ OmKeyInfo currentPartKeyInfo =
+ OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+ String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+ currentPartKeyInfo.getObjectID(), abortInfo.getMultipartKey());
+ Assert.assertFalse(omMetadataManager.getDeletedTable().isExist(
+ deleteKey));
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests attempting aborting MPUs from the multipartINfoTable when the
+ * submitted repsponse has an error status. In this case, no changes to the
+ * DB should be mmade.
+ */
+ @Test
+ public void testAddToDBBatchWithErrorResponse() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort =
+ addMPUsToDB(volumeName, 3, 3);
+
+ createAndCommitResponse(mpusToAbort, Status.INTERNAL_ERROR);
+
+ for (List<OmMultipartAbortInfo> multipartAbortInfos:
+ mpusToAbort.values()) {
+ for (OmMultipartAbortInfo multipartAbortInfo: multipartAbortInfos) {
+ // if an error occurs in the response, the batch operation moving MPUs
+ // parts from the multipartInfoTable to the deleted table should not be
+ // committed
+ Assert.assertTrue(
+ omMetadataManager.getMultipartInfoTable().isExist(
+ multipartAbortInfo.getMultipartKey()));
+
+ for (PartKeyInfo partKeyInfo: multipartAbortInfo
+ .getOmMultipartKeyInfo().getPartKeyInfoMap()) {
+ OmKeyInfo currentPartKeyInfo =
+ OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+ String deleteKey = omMetadataManager.getOzoneDeletePathKey(
+ currentPartKeyInfo.getObjectID(), multipartAbortInfo
+ .getMultipartKey());
+ Assert.assertFalse(omMetadataManager.getDeletedTable()
+ .isExist(deleteKey));
+ }
+ }
+ }
+
+ }
+
+
+ /**
+ * Constructs an {@link S3ExpiredMultipartUploadsAbortResponse} to abort
+ * MPus in (@code mpusToAbort}, with the completion status set to
+ * {@code status}. If {@code status} is {@link Status#OK}, the MPUs to
+ * abort will be added to a batch operation and committed to the database.
+ *
+ * @throws Exception
+ */
+ private void createAndCommitResponse(
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> mpusToAbort,
+ Status status) throws Exception {
+
+ OMResponse omResponse = OMResponse.newBuilder()
+ .setStatus(status)
+ .setCmdType(OzoneManagerProtocolProtos.Type.
+ AbortExpiredMultiPartUploads)
+ .build();
+
+ S3ExpiredMultipartUploadsAbortResponse response = new
+ S3ExpiredMultipartUploadsAbortResponse(omResponse, mpusToAbort, true);
+
+ // Operations are only added to the batch by this method when status is OK
+ response.checkAndUpdateDB(omMetadataManager, batchOperation);
+
+ // If status is not OK, this will do nothing.
+ omMetadataManager.getStore().commitBatchOperation(batchOperation);
+ }
+
+ private Map<OmBucketInfo, List<OmMultipartAbortInfo>> addMPUsToDB(
+ String volume, int numMPUs) throws Exception {
+ return addMPUsToDB(volume, numMPUs, 0);
+ }
+
+ /**
+ *
+ * Creates {@code numMPUs} multipart uploads with random names, each with
+ * {@code numParts} parts, maps each one to a new {@link OmKeyInfo} to be
+ * added to the open key table and {@link OmMultipartKeyInfo} to be added
+ * to the multipartInfoTable. Return list of
+ * {@link OmMultipartAbortInfo} for each unique bucket.
+ *
+ */
+ private Map<OmBucketInfo, List<OmMultipartAbortInfo>> addMPUsToDB(
+ String volume, int numMPUs, int numParts)
+ throws Exception {
+
+ Map<OmBucketInfo, List<OmMultipartAbortInfo>> newMPUs = new HashMap<>();
+
+ OMRequestTestUtils.addVolumeToDB(volume, omMetadataManager);
+
+ for (int i = 0; i < numMPUs; i++) {
+ String bucket = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+ long objectID = UniqueId.next();
+ String uploadId = OMMultipartUploadUtils.getMultipartUploadId();
+
+ OmBucketInfo omBucketInfo = OMRequestTestUtils.addBucketToDB(volume,
+ bucket, omMetadataManager, getBucketLayout());
+
+ final OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volume,
+ bucket, keyName,
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE,
+ 0L, Time.now(), true);
+
+ if (getBucketLayout().equals(BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+ omKeyInfo.setParentObjectID(omBucketInfo.getObjectID());
+ omKeyInfo.setFileName(OzoneFSUtils.getFileName(keyName));
+ OMRequestTestUtils.addMultipartKeyToOpenFileTable(false,
+ omKeyInfo.getFileName(), omKeyInfo, uploadId, 0L,
+ omMetadataManager);
+ } else {
+ OMRequestTestUtils.addMultipartKeyToOpenKeyTable(false,
+ omKeyInfo, uploadId, 0L, omMetadataManager);
+ }
+
+ OmMultipartKeyInfo multipartKeyInfo = OMRequestTestUtils
+ .createOmMultipartKeyInfo(uploadId, Time.now(),
+ ReplicationType.RATIS, ReplicationFactor.THREE, objectID);
+
+ for (int j = 1; j <= numParts; j++) {
+ PartKeyInfo part = createPartKeyInfo(volume, bucket, keyName, j);
+ addPart(j, part, multipartKeyInfo);
+ }
+
+ String multipartKey = OMRequestTestUtils.addMultipartInfoToTable(false,
+ omKeyInfo, multipartKeyInfo, 0L, omMetadataManager);
+ String multipartOpenKey = OMMultipartUploadUtils
+ .getMultipartOpenKey(volume, bucket, keyName, uploadId,
+ omMetadataManager, getBucketLayout());
+
+ OmMultipartAbortInfo omMultipartAbortInfo = new OmMultipartAbortInfo
+ .Builder()
+ .setMultipartKey(multipartKey)
+ .setMultipartOpenKey(multipartOpenKey)
+ .setBucketLayout(getBucketLayout())
+ .setMultipartKeyInfo(multipartKeyInfo)
+ .build();
+
+ newMPUs.computeIfAbsent(omBucketInfo, k -> new ArrayList<>())
+ .add(omMultipartAbortInfo);
+ }
+
+ return newMPUs;
+ }
+
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestMultipartUploadCleanupService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestMultipartUploadCleanupService.java
new file mode 100644
index 0000000000..b65cfd0484
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestMultipartUploadCleanupService.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmTestManagers;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_EXPIRE_THRESHOLD;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Multipart Upload Cleanup Service.
+ */
+public class TestMultipartUploadCleanupService {
+ private OzoneManagerProtocol writeClient;
+ private OzoneManager om;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestMultipartUploadCleanupService.class);
+
+ private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500);
+ private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000);
+ private KeyManager keyManager;
+ private OMMetadataManager omMetadataManager;
+
+ @BeforeAll
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
+ @BeforeEach
+ public void createConfAndInitValues(@TempDir Path tempDir) throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+ ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString());
+ conf.setTimeDuration(OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL,
+ SERVICE_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_OM_MPU_EXPIRE_THRESHOLD,
+ EXPIRE_THRESHOLD.toMillis(), TimeUnit.MILLISECONDS);
+ conf.setInt(OZONE_OM_MPU_PARTS_CLEANUP_LIMIT_PER_TASK, 1000);
+ conf.setQuietMode(false);
+ OmTestManagers omTestManagers = new OmTestManagers(conf);
+ keyManager = omTestManagers.getKeyManager();
+ omMetadataManager = omTestManagers.getMetadataManager();
+ writeClient = omTestManagers.getWriteClient();
+ om = omTestManagers.getOzoneManager();
+ }
+
+ @AfterEach
+ public void cleanup() throws Exception {
+ om.stop();
+ }
+
+ /**
+ * Create a bunch incomplete/inflight multipart upload info. Then we start
+ * the MultipartUploadCleanupService. We make sure that all the multipart
+ * upload info is picked up and aborted by OzoneManager.
+ * @throws Exception
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "99,0",
+ "0, 88",
+ "66, 77"
+ })
+ @Timeout(300)
+ public void checkIfCleanupServiceIsDeletingExpiredMultipartUpload(
+ int numDEFKeys, int numFSOKeys) throws Exception {
+
+ MultipartUploadCleanupService multipartUploadCleanupService =
+ (MultipartUploadCleanupService)
+ keyManager.getMultipartUploadCleanupService();
+
+ multipartUploadCleanupService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL.toMillis());
+ final long oldMpuInfoCount =
+ multipartUploadCleanupService.getSubmittedMpuInfoCount();
+ final long oldRunCount =
+ multipartUploadCleanupService.getRunCount();
+
+ createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT);
+ createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ // wait for MPU info to expire
+ Thread.sleep(EXPIRE_THRESHOLD.toMillis());
+
+ assertFalse(keyManager.getExpiredMultipartUploads(EXPIRE_THRESHOLD,
+ 10000).isEmpty());
+
+ multipartUploadCleanupService.resume();
+
+ GenericTestUtils.waitFor(() -> multipartUploadCleanupService
+ .getRunCount() > oldRunCount,
+ (int) SERVICE_INTERVAL.toMillis(),
+ 5 * (int) SERVICE_INTERVAL.toMillis());
+
+ // wait for requests to complete
+ Thread.sleep(10 * SERVICE_INTERVAL.toMillis());
+
+ assertTrue(multipartUploadCleanupService.getSubmittedMpuInfoCount() >=
+ oldMpuInfoCount + numDEFKeys + numFSOKeys);
+ assertTrue(keyManager.getExpiredMultipartUploads(EXPIRE_THRESHOLD,
+ 10000).isEmpty());
+
+ }
+
+ private void createIncompleteMPUKeys(int mpuKeyCount,
+ BucketLayout bucketLayout) throws IOException {
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ for (int x = 0; x < mpuKeyCount; x++) {
+ if (RandomUtils.nextBoolean()) {
+ bucket = UUID.randomUUID().toString();
+ if (RandomUtils.nextBoolean()) {
+ volume = UUID.randomUUID().toString();
+ }
+ }
+ String key = UUID.randomUUID().toString();
+ createVolumeAndBucket(volume, bucket, bucketLayout);
+
+ final int numParts = RandomUtils.nextInt(0, 5);
+ // Create the MPU key
+ createIncompleteMPUKey(volume, bucket, key, numParts);
+ }
+
+ }
+
+ private void createVolumeAndBucket(String volumeName, String bucketName,
+ BucketLayout bucketLayout) throws IOException {
+ // cheat here, just create a volume and bucket entry so that we can
+ // create the keys, we put the same data for key and value since the
+ // system does not decode the object
+ OMRequestTestUtils.addVolumeToOM(omMetadataManager,
+ OmVolumeArgs.newBuilder()
+ .setOwnerName("o")
+ .setAdminName("a")
+ .setVolume(volumeName)
+ .build());
+
+ OMRequestTestUtils.addBucketToOM(omMetadataManager,
+ OmBucketInfo.newBuilder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setBucketLayout(bucketLayout)
+ .build());
+ }
+
+ /**
+ * Create inflight multipart upload that are not completed / aborted yet.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ * @throws IOException
+ */
+ private void createIncompleteMPUKey(String volumeName, String bucketName,
+ String keyName, int numParts) throws IOException {
+ // Initiate MPU
+ OmKeyArgs keyArgs =
+ new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setAcls(Collections.emptyList())
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
+ .setLocationInfoList(new ArrayList<>())
+ .build();
+
+ OmMultipartInfo omMultipartInfo = writeClient.
+ initiateMultipartUpload(keyArgs);
+
+ // Commit MPU parts
+ for (int i = 1; i <= numParts; i++) {
+ OmKeyArgs partKeyArgs =
+ new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setIsMultipartKey(true)
+ .setMultipartUploadID(omMultipartInfo.getUploadID())
+ .setMultipartUploadPartNumber(i)
+ .setAcls(Collections.emptyList())
+ .setReplicationConfig(
+ StandaloneReplicationConfig.getInstance(ONE))
+ .build();
+
+ OpenKeySession openKey = writeClient.openKey(partKeyArgs);
+
+ OmKeyArgs commitPartKeyArgs =
+ new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setIsMultipartKey(true)
+ .setMultipartUploadID(omMultipartInfo.getUploadID())
+ .setMultipartUploadPartNumber(i)
+ .setAcls(Collections.emptyList())
+ .setReplicationConfig(
+ StandaloneReplicationConfig.getInstance(ONE))
+ .setLocationInfoList(Collections.emptyList())
+ .build();
+
+ writeClient.commitMultipartUploadPart(commitPartKeyArgs,
openKey.getId());
+ }
+
+ // MPU key is not completed / aborted, so it's still in the
+ // multipartInfoTable
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]