This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4b43f158b8 HDDS-9285. Revert set isMultipartKey for Open MPU part keys
(#5295)
4b43f158b8 is described below
commit 4b43f158b8b5f9e0d1a58ffe81d3411729337c66
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Sep 18 21:06:21 2023 +0800
HDDS-9285. Revert set isMultipartKey for Open MPU part keys (#5295)
---
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 3 +-
.../om/service/TestOpenKeyCleanupService.java | 119 +++++++++++++++++----
2 files changed, 99 insertions(+), 23 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index fc1e84c6fc..749b0f3fe6 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -750,8 +750,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setOmKeyLocationInfos(Collections.singletonList(
- new OmKeyLocationInfoGroup(0, locations,
- keyArgs.getIsMultipartKey())))
+ new OmKeyLocationInfoGroup(0, locations)))
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setDataSize(size)
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
index 5db26711bd..c421e10f94 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
@@ -83,6 +83,7 @@ public class TestOpenKeyCleanupService {
private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100);
private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200);
+ private static final int NUM_MPU_PARTS = 5;
private KeyManager keyManager;
private OMMetadataManager omMetadataManager;
@@ -223,8 +224,10 @@ public class TestOpenKeyCleanupService {
assertEquals(0, metrics.getNumKeyHSyncs());
assertEquals(0, metrics.getNumOpenKeysCleaned());
assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
- createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT);
- createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT, NUM_MPU_PARTS,
+ true);
+ createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED,
+ NUM_MPU_PARTS, true);
// wait for open keys to expire
Thread.sleep(EXPIRE_THRESHOLD.toMillis());
@@ -253,6 +256,76 @@ public class TestOpenKeyCleanupService {
assertEquals(0, metrics.getNumOpenKeysCleaned());
}
+ /**
+ * In this test, we create a bunch of MPU keys with uncommitted parts, then
+ * we will start the OpenKeyCleanupService. The OpenKeyCleanupService
+ * should only delete the open MPU part keys (not the open MPU key).
+ *
+ * @throws IOException - on Failure.
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "9, 0",
+ "0, 8",
+ "6, 7",
+ })
+ public void testCleanupExpiredOpenMPUPartKeys(
+ int numDEFKeys, int numFSOKeys) throws Exception {
+ LOG.info("numDEFMpuKeys={}, numFSOMpuKeys={}",
+ numDEFKeys, numFSOKeys);
+
+ OpenKeyCleanupService openKeyCleanupService =
+ (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
+
+ openKeyCleanupService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL.toMillis());
+ final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
+ final long oldrunCount = openKeyCleanupService.getRunCount();
+ LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount);
+ assertEquals(0, oldkeyCount);
+
+ final OMMetrics metrics = om.getMetrics();
+ assertEquals(0, metrics.getNumKeyHSyncs());
+ assertEquals(0, metrics.getNumOpenKeysCleaned());
+ assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
+ final int keyCount = numDEFKeys + numFSOKeys;
+ createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT, NUM_MPU_PARTS,
+ false);
+ createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED,
+ NUM_MPU_PARTS, false);
+
+ Thread.sleep(EXPIRE_THRESHOLD.toMillis());
+
+ // Each MPU keys create 1 MPU open key and some MPU open part keys
+ // only the MPU open part keys will be deleted
+ assertExpiredOpenKeys((numDEFKeys * NUM_MPU_PARTS) == 0, false,
+ BucketLayout.DEFAULT);
+ assertExpiredOpenKeys((numFSOKeys * NUM_MPU_PARTS) == 0, false,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ openKeyCleanupService.resume();
+
+ GenericTestUtils.waitFor(() -> openKeyCleanupService
+ .getRunCount() > oldrunCount,
+ (int) SERVICE_INTERVAL.toMillis(),
+ 5 * (int) SERVICE_INTERVAL.toMillis());
+
+ // wait for requests to complete
+ Thread.sleep(SERVICE_INTERVAL.toMillis());
+
+ // No expired MPU parts fetched
+ int numExpiredParts = NUM_MPU_PARTS * keyCount;
+ assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >=
+ (oldkeyCount + numExpiredParts));
+ assertExpiredOpenKeys(true, false, BucketLayout.DEFAULT);
+ assertExpiredOpenKeys(true, false,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ assertEquals(numExpiredParts,
+ metrics.getNumOpenKeysCleaned());
+ }
+
void assertExpiredOpenKeys(boolean expectedToEmpty, boolean hsync,
BucketLayout layout) throws IOException {
final ExpiredOpenKeys expired = keyManager.getExpiredOpenKeys(
@@ -327,7 +400,8 @@ public class TestOpenKeyCleanupService {
}
private void createIncompleteMPUKeys(int mpuKeyCount,
- BucketLayout bucketLayout) throws IOException {
+ BucketLayout bucketLayout, int numParts, boolean arePartsCommitted)
+ throws IOException {
String volume = UUID.randomUUID().toString();
String bucket = UUID.randomUUID().toString();
for (int x = 0; x < mpuKeyCount; x++) {
@@ -340,9 +414,8 @@ public class TestOpenKeyCleanupService {
String key = UUID.randomUUID().toString();
createVolumeAndBucket(volume, bucket, bucketLayout);
- final int numParts = RandomUtils.nextInt(0, 5);
// Create the MPU key
- createIncompleteMPUKey(volume, bucket, key, numParts);
+ createIncompleteMPUKey(volume, bucket, key, numParts, arePartsCommitted);
}
}
@@ -354,7 +427,8 @@ public class TestOpenKeyCleanupService {
* @throws IOException
*/
private void createIncompleteMPUKey(String volumeName, String bucketName,
- String keyName, int numParts) throws IOException {
+ String keyName, int numParts, boolean arePartsCommitted)
+ throws IOException {
// Initiate MPU
OmKeyArgs keyArgs =
new OmKeyArgs.Builder()
@@ -387,21 +461,24 @@ public class TestOpenKeyCleanupService {
OpenKeySession openKey = writeClient.openKey(partKeyArgs);
- OmKeyArgs commitPartKeyArgs =
- new OmKeyArgs.Builder()
- .setVolumeName(volumeName)
- .setBucketName(bucketName)
- .setKeyName(keyName)
- .setIsMultipartKey(true)
- .setMultipartUploadID(omMultipartInfo.getUploadID())
- .setMultipartUploadPartNumber(i)
- .setAcls(Collections.emptyList())
- .setReplicationConfig(StandaloneReplicationConfig.getInstance(
- HddsProtos.ReplicationFactor.ONE))
- .setLocationInfoList(Collections.emptyList())
- .build();
-
- writeClient.commitMultipartUploadPart(commitPartKeyArgs,
openKey.getId());
+ if (arePartsCommitted) {
+ OmKeyArgs commitPartKeyArgs =
+ new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setIsMultipartKey(true)
+ .setMultipartUploadID(omMultipartInfo.getUploadID())
+ .setMultipartUploadPartNumber(i)
+ .setAcls(Collections.emptyList())
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE))
+ .setLocationInfoList(Collections.emptyList())
+ .build();
+
+ writeClient.commitMultipartUploadPart(commitPartKeyArgs,
+ openKey.getId());
+ }
}
// MPU key is not completed / aborted, so it's still in the
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]