This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b43f158b8 HDDS-9285. Revert set isMultipartKey for Open MPU part keys 
(#5295)
4b43f158b8 is described below

commit 4b43f158b8b5f9e0d1a58ffe81d3411729337c66
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Sep 18 21:06:21 2023 +0800

    HDDS-9285. Revert set isMultipartKey for Open MPU part keys (#5295)
---
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   3 +-
 .../om/service/TestOpenKeyCleanupService.java      | 119 +++++++++++++++++----
 2 files changed, 99 insertions(+), 23 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index fc1e84c6fc..749b0f3fe6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -750,8 +750,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
             .setBucketName(keyArgs.getBucketName())
             .setKeyName(keyArgs.getKeyName())
             .setOmKeyLocationInfos(Collections.singletonList(
-                    new OmKeyLocationInfoGroup(0, locations,
-                        keyArgs.getIsMultipartKey())))
+                    new OmKeyLocationInfoGroup(0, locations)))
             .setCreationTime(keyArgs.getModificationTime())
             .setModificationTime(keyArgs.getModificationTime())
             .setDataSize(size)
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
index 5db26711bd..c421e10f94 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
@@ -83,6 +83,7 @@ public class TestOpenKeyCleanupService {
 
   private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100);
   private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200);
+  private static final int NUM_MPU_PARTS = 5;
   private KeyManager keyManager;
   private OMMetadataManager omMetadataManager;
 
@@ -223,8 +224,10 @@ public class TestOpenKeyCleanupService {
     assertEquals(0, metrics.getNumKeyHSyncs());
     assertEquals(0, metrics.getNumOpenKeysCleaned());
     assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
-    createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT);
-    createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT, NUM_MPU_PARTS,
+        true);
+    createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        NUM_MPU_PARTS, true);
 
     // wait for open keys to expire
     Thread.sleep(EXPIRE_THRESHOLD.toMillis());
@@ -253,6 +256,76 @@ public class TestOpenKeyCleanupService {
     assertEquals(0, metrics.getNumOpenKeysCleaned());
   }
 
+  /**
+   * In this test, we create a bunch of MPU keys with uncommitted parts, then
+   * we will start the OpenKeyCleanupService. The OpenKeyCleanupService
+   * should only delete the open MPU part keys (not the open MPU key).
+   *
+   * @throws IOException - on Failure.
+   */
+  @ParameterizedTest
+  @CsvSource({
+      "9, 0",
+      "0, 8",
+      "6, 7",
+  })
+  public void testCleanupExpiredOpenMPUPartKeys(
+      int numDEFKeys, int numFSOKeys) throws Exception {
+    LOG.info("numDEFMpuKeys={}, numFSOMpuKeys={}",
+        numDEFKeys, numFSOKeys);
+
+    OpenKeyCleanupService openKeyCleanupService =
+        (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
+
+    openKeyCleanupService.suspend();
+    // wait for submitted tasks to complete
+    Thread.sleep(SERVICE_INTERVAL.toMillis());
+    final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
+    final long oldrunCount = openKeyCleanupService.getRunCount();
+    LOG.info("oldMpuKeyCount={}, oldMpuRunCount={}", oldkeyCount, oldrunCount);
+    assertEquals(0, oldkeyCount);
+
+    final OMMetrics metrics = om.getMetrics();
+    assertEquals(0, metrics.getNumKeyHSyncs());
+    assertEquals(0, metrics.getNumOpenKeysCleaned());
+    assertEquals(0, metrics.getNumOpenKeysHSyncCleaned());
+    final int keyCount = numDEFKeys + numFSOKeys;
+    createIncompleteMPUKeys(numDEFKeys, BucketLayout.DEFAULT, NUM_MPU_PARTS,
+        false);
+    createIncompleteMPUKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        NUM_MPU_PARTS, false);
+
+    Thread.sleep(EXPIRE_THRESHOLD.toMillis());
+
+    // Each MPU keys create 1 MPU open key and some MPU open part keys
+    // only the MPU open part keys will be deleted
+    assertExpiredOpenKeys((numDEFKeys * NUM_MPU_PARTS) == 0, false,
+        BucketLayout.DEFAULT);
+    assertExpiredOpenKeys((numFSOKeys * NUM_MPU_PARTS) == 0, false,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    openKeyCleanupService.resume();
+
+    GenericTestUtils.waitFor(() -> openKeyCleanupService
+            .getRunCount() > oldrunCount,
+        (int) SERVICE_INTERVAL.toMillis(),
+        5 * (int) SERVICE_INTERVAL.toMillis());
+
+    // wait for requests to complete
+    Thread.sleep(SERVICE_INTERVAL.toMillis());
+
+    // No expired MPU parts fetched
+    int numExpiredParts = NUM_MPU_PARTS * keyCount;
+    assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >=
+        (oldkeyCount + numExpiredParts));
+    assertExpiredOpenKeys(true, false, BucketLayout.DEFAULT);
+    assertExpiredOpenKeys(true, false,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    assertEquals(numExpiredParts,
+        metrics.getNumOpenKeysCleaned());
+  }
+
   void assertExpiredOpenKeys(boolean expectedToEmpty, boolean hsync,
       BucketLayout layout) throws IOException {
     final ExpiredOpenKeys expired = keyManager.getExpiredOpenKeys(
@@ -327,7 +400,8 @@ public class TestOpenKeyCleanupService {
   }
 
   private void createIncompleteMPUKeys(int mpuKeyCount,
-       BucketLayout bucketLayout) throws IOException {
+       BucketLayout bucketLayout, int numParts, boolean arePartsCommitted)
+      throws IOException {
     String volume = UUID.randomUUID().toString();
     String bucket = UUID.randomUUID().toString();
     for (int x = 0; x < mpuKeyCount; x++) {
@@ -340,9 +414,8 @@ public class TestOpenKeyCleanupService {
       String key = UUID.randomUUID().toString();
       createVolumeAndBucket(volume, bucket, bucketLayout);
 
-      final int numParts = RandomUtils.nextInt(0, 5);
       // Create the MPU key
-      createIncompleteMPUKey(volume, bucket, key, numParts);
+      createIncompleteMPUKey(volume, bucket, key, numParts, arePartsCommitted);
     }
   }
 
@@ -354,7 +427,8 @@ public class TestOpenKeyCleanupService {
    * @throws IOException
    */
   private void createIncompleteMPUKey(String volumeName, String bucketName,
-      String keyName, int numParts) throws IOException {
+      String keyName, int numParts, boolean arePartsCommitted)
+      throws IOException {
     // Initiate MPU
     OmKeyArgs keyArgs =
         new OmKeyArgs.Builder()
@@ -387,21 +461,24 @@ public class TestOpenKeyCleanupService {
 
       OpenKeySession openKey = writeClient.openKey(partKeyArgs);
 
-      OmKeyArgs commitPartKeyArgs =
-          new OmKeyArgs.Builder()
-              .setVolumeName(volumeName)
-              .setBucketName(bucketName)
-              .setKeyName(keyName)
-              .setIsMultipartKey(true)
-              .setMultipartUploadID(omMultipartInfo.getUploadID())
-              .setMultipartUploadPartNumber(i)
-              .setAcls(Collections.emptyList())
-              .setReplicationConfig(StandaloneReplicationConfig.getInstance(
-                  HddsProtos.ReplicationFactor.ONE))
-              .setLocationInfoList(Collections.emptyList())
-              .build();
-
-      writeClient.commitMultipartUploadPart(commitPartKeyArgs, 
openKey.getId());
+      if (arePartsCommitted) {
+        OmKeyArgs commitPartKeyArgs =
+            new OmKeyArgs.Builder()
+                .setVolumeName(volumeName)
+                .setBucketName(bucketName)
+                .setKeyName(keyName)
+                .setIsMultipartKey(true)
+                .setMultipartUploadID(omMultipartInfo.getUploadID())
+                .setMultipartUploadPartNumber(i)
+                .setAcls(Collections.emptyList())
+                .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+                    HddsProtos.ReplicationFactor.ONE))
+                .setLocationInfoList(Collections.emptyList())
+                .build();
+
+        writeClient.commitMultipartUploadPart(commitPartKeyArgs,
+            openKey.getId());
+      }
     }
 
     // MPU key is not completed / aborted, so it's still in the


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to