This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 23a62e48843 HDDS-13729. Acquire Bulk Bucket locks in order to prevent
deadlock in OmDirectoryPurgeRequest (#9084)
23a62e48843 is described below
commit 23a62e488433543bbb592a014f79ca6a81f2f44e
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Sat Oct 4 05:36:02 2025 -0400
HDDS-13729. Acquire Bulk Bucket locks in order to prevent deadlock in
OmDirectoryPurgeRequest (#9084)
---
.../TestDirectoryDeletingServiceWithFSO.java | 3 +-
.../src/main/proto/OmClientProtocol.proto | 8 +
.../apache/hadoop/ozone/om/OMMetadataManager.java | 49 ++++++
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 13 ++
.../key/OMDirectoriesPurgeRequestWithFSO.java | 50 ++++--
.../ozone/om/service/DirectoryDeletingService.java | 43 +++--
.../TestOMDirectoriesPurgeRequestAndResponse.java | 181 ++++++++++++++++++---
.../ozone/om/request/key/TestOMKeyRequest.java | 5 +-
8 files changed, 299 insertions(+), 53 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index c8ebcb083f1..81e1dd3b444 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -30,6 +30,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@@ -623,7 +624,7 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
return null;
}).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(),
anyLong(), anyList(), anyList(), eq(null), anyLong(), anyLong(), any(),
- any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class),
any(),
+ any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class),
anyMap(), any(),
anyLong());
Mockito.doAnswer(i -> {
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d6fc15a45a4..01f4699a88a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1438,6 +1438,14 @@ message PurgeDirectoriesRequest {
optional string snapshotTableKey = 2;
// previous snapshotID can also be null & this field would be absent in
older requests.
optional NullableUUID expectedPreviousSnapshotID = 3;
+ repeated BucketNameInfo bucketNameInfos = 4;
+}
+
+message BucketNameInfo {
+ optional uint64 volumeId = 1;
+ optional uint64 bucketId = 2;
+ optional string volumeName = 3;
+ optional string bucketName = 4;
}
message NullableUUID {
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index a8e5b2dab41..b52c738b568 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.DBStoreHAManager;
@@ -135,6 +136,15 @@ public interface OMMetadataManager extends
DBStoreHAManager, AutoCloseable {
*/
String getBucketKeyPrefixFSO(String volume, String bucket) throws
IOException;
+
+ /**
+ * Retrieves a pair of volume ID and bucket ID associated with the provided
FSO (File System Object) key.
+ *
+ * @param fsoKey the key representing the File System Object, used to
identify the corresponding volume and bucket.
+ * @return a Pair containing the volume ID as the first element and the
bucket ID as the second element.
+ */
+ VolumeBucketId getVolumeBucketIdPairFSO(String fsoKey) throws IOException;
+
/**
* Given a volume, bucket and a key, return the corresponding DB key.
*
@@ -675,4 +685,43 @@ String getMultipartKey(long volumeId, long bucketId,
*/
boolean containsIncompleteMPUs(String volume, String bucket)
throws IOException;
+
+ /**
+ * Represents a unique identifier for a specific bucket within a volume.
+ *
+ * This class combines a volume identifier and a bucket identifier
+ * to uniquely identify a bucket within a storage system.
+ */
+ class VolumeBucketId {
+ private final long volumeId;
+ private final long bucketId;
+
+ public VolumeBucketId(long volumeId, long bucketId) {
+ this.volumeId = volumeId;
+ this.bucketId = bucketId;
+ }
+
+ public long getBucketId() {
+ return bucketId;
+ }
+
+ public long getVolumeId() {
+ return volumeId;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (!(o instanceof VolumeBucketId)) {
+ return false;
+ }
+
+ VolumeBucketId that = (VolumeBucketId) o;
+ return volumeId == that.volumeId && bucketId == that.bucketId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(volumeId, bucketId);
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 417241c9e4f..27dcb6a24e0 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -544,6 +544,19 @@ public String getBucketKeyPrefixFSO(String volume, String
bucket) throws IOExcep
return getOzoneKeyFSO(volume, bucket, OM_KEY_PREFIX);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public VolumeBucketId getVolumeBucketIdPairFSO(String fsoKey) throws
IOException {
+ String[] keySplit = fsoKey.split(OM_KEY_PREFIX);
+ try {
+ return new VolumeBucketId(Long.parseLong(keySplit[1]),
Long.parseLong(keySplit[2]));
+ } catch (NumberFormatException e) {
+ throw new IOException("Invalid format for FSO Key: " + fsoKey, e);
+ }
+ }
+
@Override
public String getOzoneKey(String volume, String bucket, String key) {
StringBuilder builder = new StringBuilder()
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index 3e817b9c86c..e30a66aa124 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
@@ -42,6 +44,7 @@
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -54,6 +57,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeDirectoriesRequest;
/**
* Handles purging of keys from OM DB.
@@ -75,14 +79,13 @@ public OMDirectoriesPurgeRequestWithFSO(OMRequest
omRequest) {
@Override
@SuppressWarnings("methodlength")
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
ExecutionContext context) {
- OzoneManagerProtocolProtos.PurgeDirectoriesRequest purgeDirsRequest =
+ PurgeDirectoriesRequest purgeDirsRequest =
getOmRequest().getPurgeDirectoriesRequest();
String fromSnapshot = purgeDirsRequest.hasSnapshotTableKey() ?
purgeDirsRequest.getSnapshotTableKey() : null;
List<OzoneManagerProtocolProtos.PurgePathRequest> purgeRequests =
purgeDirsRequest.getDeletedPathList();
- Set<Pair<String, String>> lockSet = new HashSet<>();
Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap = new HashMap<>();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
ozoneManager.getMetadataManager();
Map<String, OmKeyInfo> openKeyInfoMap = new HashMap<>();
@@ -116,6 +119,15 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.DIRECTORY_DELETION,
null, e));
return new
OMDirectoriesPurgeResponseWithFSO(createErrorOMResponse(omResponse, e));
}
+ List<String[]> bucketLockKeys = getBucketLockKeySet(purgeDirsRequest);
+ boolean lockAcquired =
omMetadataManager.getLock().acquireWriteLocks(BUCKET_LOCK,
bucketLockKeys).isLockAcquired();
+ if (!lockAcquired && !purgeDirsRequest.getBucketNameInfosList().isEmpty())
{
+ OMException oe = new OMException("Unable to acquire write locks on
buckets while performing DirectoryPurge",
+ OMException.ResultCodes.KEY_DELETION_ERROR);
+ LOG.error("Error occurred while performing OMDirectoriesPurge. ", oe);
+
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.DIRECTORY_DELETION,
null, oe));
+ return new
OMDirectoriesPurgeResponseWithFSO(createErrorOMResponse(omResponse, oe));
+ }
try {
int numSubDirMoved = 0, numSubFilesMoved = 0, numDirsDeleted = 0;
for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
@@ -133,11 +145,7 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
String volumeName = keyInfo.getVolumeName();
String bucketName = keyInfo.getBucketName();
Pair<String, String> volBucketPair = Pair.of(volumeName, bucketName);
- if (!lockSet.contains(volBucketPair)) {
- omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
- volumeName, bucketName);
- lockSet.add(volBucketPair);
- }
+
omMetrics.decNumKeys();
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
@@ -167,11 +175,6 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
String volumeName = keyInfo.getVolumeName();
String bucketName = keyInfo.getBucketName();
Pair<String, String> volBucketPair = Pair.of(volumeName, bucketName);
- if (!lockSet.contains(volBucketPair)) {
- omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
- volumeName, bucketName);
- lockSet.add(volBucketPair);
- }
// If omKeyInfo has hsync metadata, delete its corresponding open
key as well
String dbOpenKey;
@@ -239,17 +242,34 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.DIRECTORY_DELETION,
null, ex));
throw new IllegalStateException(ex);
} finally {
- lockSet.stream().forEach(e -> omMetadataManager.getLock()
- .releaseWriteLock(BUCKET_LOCK, e.getKey(),
- e.getValue()));
for (Map.Entry<Pair<String, String>, OmBucketInfo> entry :
volBucketInfoMap.entrySet()) {
entry.setValue(entry.getValue().copyObject());
}
+ omMetadataManager.getLock().releaseWriteLocks(BUCKET_LOCK,
bucketLockKeys);
}
return new OMDirectoriesPurgeResponseWithFSO(
omResponse.build(), purgeRequests,
getBucketLayout(), volBucketInfoMap, fromSnapshotInfo, openKeyInfoMap);
}
+
+ private List<String[]> getBucketLockKeySet(PurgeDirectoriesRequest
purgeDirsRequest) {
+ if (!purgeDirsRequest.getBucketNameInfosList().isEmpty()) {
+ return purgeDirsRequest.getBucketNameInfosList().stream()
+ .map(keyInfo -> Pair.of(keyInfo.getVolumeName(),
keyInfo.getBucketName()))
+ .distinct()
+ .map(pair -> new String[]{pair.getLeft(), pair.getRight()})
+ .collect(Collectors.toList());
+ }
+
+ return purgeDirsRequest.getDeletedPathList().stream()
+ .flatMap(purgePathRequest ->
Stream.concat(purgePathRequest.getDeletedSubFilesList().stream(),
+ purgePathRequest.getMarkDeletedSubDirsList().stream()))
+ .map(keyInfo -> Pair.of(keyInfo.getVolumeName(),
keyInfo.getBucketName()))
+ .distinct()
+ .map(pair -> new String[]{pair.getLeft(), pair.getRight()})
+ .collect(Collectors.toList());
+ }
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 90ad878c640..7abf1406d96 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.om.service;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
@@ -30,9 +29,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -64,6 +65,7 @@
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
@@ -76,6 +78,7 @@
import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableDirFilter;
import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.function.CheckedFunction;
@@ -262,6 +265,7 @@ void optimizeDirDeletesAndSubmitRequest(
long remainingBufLimit, KeyManager keyManager,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
reclaimableDirChecker,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
reclaimableFileChecker,
+ Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap,
UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException
{
// Optimization to handle delete sub-dir and keys to remove quickly
@@ -297,7 +301,7 @@ void optimizeDirDeletesAndSubmitRequest(
}
}
if (!purgePathRequestList.isEmpty()) {
- submitPurgePaths(purgePathRequestList, snapTableKey,
expectedPreviousSnapshotId);
+ submitPurgePaths(purgePathRequestList, snapTableKey,
expectedPreviousSnapshotId, bucketNameInfoMap);
}
if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -382,21 +386,20 @@ private Optional<PurgePathRequest>
prepareDeleteDirRequest(
pendingDeletedDirInfo.getKeyName());
}
- final String[] keys = delDirName.split(OM_KEY_PREFIX);
- final long volumeId = Long.parseLong(keys[1]);
- final long bucketId = Long.parseLong(keys[2]);
+ VolumeBucketId volumeBucketId = keyManager.getMetadataManager()
+ .getVolumeBucketIdPairFSO(delDirName);
// step-1: get all sub directories under the deletedDir
DeleteKeysResult subDirDeleteResult =
- keyManager.getPendingDeletionSubDirs(volumeId, bucketId,
+ keyManager.getPendingDeletionSubDirs(volumeBucketId.getVolumeId(),
volumeBucketId.getBucketId(),
pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit);
List<OmKeyInfo> subDirs = subDirDeleteResult.getKeysToDelete();
remainingBufLimit -= subDirDeleteResult.getConsumedSize();
OMMetadataManager omMetadataManager = keyManager.getMetadataManager();
for (OmKeyInfo dirInfo : subDirs) {
- String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
- bucketId, dirInfo.getParentObjectID(), dirInfo.getFileName());
+ String ozoneDbKey =
omMetadataManager.getOzonePathKey(volumeBucketId.getVolumeId(),
+ volumeBucketId.getBucketId(), dirInfo.getParentObjectID(),
dirInfo.getFileName());
String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey(
dirInfo.getObjectID(), ozoneDbKey);
subDirList.add(Pair.of(ozoneDeleteKey, dirInfo));
@@ -406,7 +409,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest(
// step-2: get all sub files under the deletedDir
// Only remove sub files if the parent directory is going to be deleted or
can be reclaimed.
DeleteKeysResult subFileDeleteResult =
- keyManager.getPendingDeletionSubFiles(volumeId, bucketId,
+ keyManager.getPendingDeletionSubFiles(volumeBucketId.getVolumeId(),
volumeBucketId.getBucketId(),
pendingDeletedDirInfo, keyInfo -> purgeDir ||
reclaimableFileFilter.apply(keyInfo), remainingBufLimit);
List<OmKeyInfo> subFiles = subFileDeleteResult.getKeysToDelete();
@@ -423,7 +426,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest(
if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) {
return Optional.empty();
}
- return Optional.of(wrapPurgeRequest(volumeId, bucketId,
+ return Optional.of(wrapPurgeRequest(volumeBucketId.getVolumeId(),
volumeBucketId.getBucketId(),
purgeDeletedDir, subFiles, subDirs));
}
@@ -458,7 +461,8 @@ private OzoneManagerProtocolProtos.PurgePathRequest
wrapPurgeRequest(
}
private OzoneManagerProtocolProtos.OMResponse
submitPurgePaths(List<PurgePathRequest> requests,
- String snapTableKey, UUID expectedPreviousSnapshotId) throws
InterruptedException {
+ String snapTableKey, UUID expectedPreviousSnapshotId,
Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap)
+ throws InterruptedException {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest
=
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
@@ -473,6 +477,9 @@ private OzoneManagerProtocolProtos.OMResponse
submitPurgePaths(List<PurgePathReq
purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
purgeDirRequest.addAllDeletedPath(requests);
+
purgeDirRequest.addAllBucketNameInfos(requests.stream().map(purgePathRequest ->
+ new VolumeBucketId(purgePathRequest.getVolumeId(),
purgePathRequest.getBucketId())).distinct()
+
.map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
@@ -613,12 +620,24 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
long subFileNum = 0L;
int consumedSize = 0;
List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
+ Map<VolumeBucketId, BucketNameInfo> bucketNameInfos = new HashMap<>();
+
List<Pair<String, OmKeyInfo>> allSubDirList = new ArrayList<>();
while (remainingBufLimit > 0) {
KeyValue<String, OmKeyInfo> pendingDeletedDirInfo =
dirSupplier.get();
if (pendingDeletedDirInfo == null) {
break;
}
+ OmKeyInfo deletedDirInfo = pendingDeletedDirInfo.getValue();
+ VolumeBucketId volumeBucketId =
+
keyManager.getMetadataManager().getVolumeBucketIdPairFSO(pendingDeletedDirInfo.getKey());
+ bucketNameInfos.computeIfAbsent(volumeBucketId,
+ (k) ->
BucketNameInfo.newBuilder().setVolumeId(volumeBucketId.getVolumeId())
+ .setBucketId(volumeBucketId.getBucketId())
+ .setVolumeName(deletedDirInfo.getVolumeName())
+ .setBucketName(deletedDirInfo.getBucketName())
+ .build());
+
boolean isDirReclaimable =
reclaimableDirFilter.apply(pendingDeletedDirInfo);
Optional<PurgePathRequest> request = prepareDeleteDirRequest(
pendingDeletedDirInfo.getValue(),
@@ -642,7 +661,7 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
startTime, remainingBufLimit, getOzoneManager().getKeyManager(),
- reclaimableDirFilter, reclaimableFileFilter,
expectedPreviousSnapshotId,
+ reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos,
expectedPreviousSnapshotId,
runCount);
Map<UUID, Long> exclusiveReplicatedSizeMap =
reclaimableFileFilter.getExclusiveReplicatedSizeMap();
Map<UUID, Long> exclusiveSizeMap =
reclaimableFileFilter.getExclusiveSizeMap();
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
index dff5a74173b..96f0fd63da0 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
@@ -18,20 +18,33 @@
package org.apache.hadoop.ozone.om.request.key;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.BUCKET_LOCK;
import static
org.apache.hadoop.ozone.om.request.file.OMFileRequest.getOmKeyInfo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Random;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -49,12 +62,16 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import
org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -127,45 +144,48 @@ private OMRequest createPurgeKeysRequest(String
fromSnapshot, String purgeDelete
return createPurgeKeysRequest(fromSnapshot, purgeDeletedDir,
Collections.emptyList(), keyList, bucketInfo);
}
+ private OMRequest createPurgeKeysRequest(String fromSnapshot,
+ List<PurgePathRequest> purgePathRequestList) {
+ OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest
=
+ OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
+ purgeDirRequest.addAllDeletedPath(purgePathRequestList);
+ if (fromSnapshot != null) {
+ purgeDirRequest.setSnapshotTableKey(fromSnapshot);
+ }
+ OzoneManagerProtocolProtos.OMRequest omRequest =
+ OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+ .setPurgeDirectoriesRequest(purgeDirRequest)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
+ return omRequest;
+ }
+
/**
* Create OMRequest which encapsulates DeleteKeyRequest.
* @return OMRequest
*/
private OMRequest createPurgeKeysRequest(String fromSnapshot, String
purgeDeletedDir,
List<OmKeyInfo> subDirs, List<OmKeyInfo> keyList, OmBucketInfo
bucketInfo) throws IOException {
- List<OzoneManagerProtocolProtos.PurgePathRequest> purgePathRequestList
- = new ArrayList<>();
+ List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
List<OmKeyInfo> subFiles = new ArrayList<>();
for (OmKeyInfo key : keyList) {
subFiles.add(key);
}
Long volumeId = omMetadataManager.getVolumeId(bucketInfo.getVolumeName());
Long bucketId = bucketInfo.getObjectID();
- OzoneManagerProtocolProtos.PurgePathRequest request = wrapPurgeRequest(
+ PurgePathRequest request = wrapPurgeRequest(
volumeId, bucketId, purgeDeletedDir, subFiles, subDirs);
purgePathRequestList.add(request);
-
- OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest
=
- OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
- purgeDirRequest.addAllDeletedPath(purgePathRequestList);
- if (fromSnapshot != null) {
- purgeDirRequest.setSnapshotTableKey(fromSnapshot);
- }
- OzoneManagerProtocolProtos.OMRequest omRequest =
- OzoneManagerProtocolProtos.OMRequest.newBuilder()
- .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
- .setPurgeDirectoriesRequest(purgeDirRequest)
- .setClientId(UUID.randomUUID().toString())
- .build();
- return omRequest;
+ return createPurgeKeysRequest(fromSnapshot, purgePathRequestList);
}
- private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest(
+ private PurgePathRequest wrapPurgeRequest(
final long volumeId, final long bucketId, final String purgeDeletedDir,
final List<OmKeyInfo> purgeDeletedFiles, final List<OmKeyInfo>
markDirsAsDeleted) {
// Put all keys to be purged in a list
- OzoneManagerProtocolProtos.PurgePathRequest.Builder purgePathsRequest
- = OzoneManagerProtocolProtos.PurgePathRequest.newBuilder();
+ PurgePathRequest.Builder purgePathsRequest
+ = PurgePathRequest.newBuilder();
purgePathsRequest.setVolumeId(volumeId);
purgePathsRequest.setBucketId(bucketId);
@@ -200,13 +220,128 @@ private OMRequest preExecute(OMRequest
originalOmRequest) throws IOException {
return modifiedOmRequest;
}
+ private PurgePathRequest createBucketDataAndGetPurgePathRequest(OmBucketInfo
bucketInfo) throws Exception {
+ OmDirectoryInfo dir1 = new OmDirectoryInfo.Builder()
+ .setName("dir1")
+ .setCreationTime(Time.now())
+ .setModificationTime(Time.now())
+ .setObjectID(1)
+ .setParentObjectID(bucketInfo.getObjectID())
+ .setUpdateID(0)
+ .build();
+ String dirKey = OMRequestTestUtils.addDirKeyToDirTable(false, dir1,
volumeName,
+ bucketInfo.getBucketName(), 1L, omMetadataManager);
+ List<OmKeyInfo> subFiles = new ArrayList<>();
+ List<OmKeyInfo> subDirs = new ArrayList<>();
+ List<String> subFileKeys = new ArrayList<>();
+ List<String> subDirKeys = new ArrayList<>();
+ for (int id = 1; id < 10; id++) {
+ OmDirectoryInfo subdir = new OmDirectoryInfo.Builder()
+ .setName("subdir" + id)
+ .setCreationTime(Time.now())
+ .setModificationTime(Time.now())
+ .setObjectID(2 * id)
+ .setParentObjectID(dir1.getObjectID())
+ .setUpdateID(0)
+ .build();
+ String subDirectoryPath = OMRequestTestUtils.addDirKeyToDirTable(false,
subdir, volumeName,
+ bucketInfo.getBucketName(), 2 * id, omMetadataManager);
+ subDirKeys.add(subDirectoryPath);
+ OmKeyInfo subFile =
+ OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketInfo.getBucketName(), "file" + id,
+ RatisReplicationConfig.getInstance(ONE))
+ .setObjectID(2 * id + 1)
+ .setParentObjectID(dir1.getObjectID())
+ .setUpdateID(100L)
+ .build();
+ String subFilePath = OMRequestTestUtils.addFileToKeyTable(false, true,
subFile.getKeyName(),
+ subFile, 1234L, 2 * id + 1, omMetadataManager);
+ subFileKeys.add(subFilePath);
+ subFile.setKeyName("dir1/" + subFile.getKeyName());
+ subFiles.add(subFile);
+ subDirs.add(getOmKeyInfo(volumeName, bucketInfo.getBucketName(), subdir,
+ "dir1/" + subdir.getName()));
+ }
+ String deletedDirKey = OMRequestTestUtils.deleteDir(dirKey, volumeName,
bucketInfo.getBucketName(),
+ omMetadataManager);
+ for (String subDirKey : subDirKeys) {
+ assertTrue(omMetadataManager.getDirectoryTable().isExist(subDirKey));
+ }
+ for (String subFileKey : subFileKeys) {
+ assertTrue(omMetadataManager.getFileTable().isExist(subFileKey));
+ }
+ assertFalse(omMetadataManager.getDirectoryTable().isExist(dirKey));
+ Long volumeId = omMetadataManager.getVolumeId(bucketInfo.getVolumeName());
+ long bucketId = bucketInfo.getObjectID();
+ return wrapPurgeRequest(volumeId, bucketId, deletedDirKey, subFiles,
subDirs);
+ }
+
+ @Test
+ public void testBucketLockWithPurgeDirectory() throws Exception {
+ when(ozoneManager.getDefaultReplicationConfig())
+
.thenReturn(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+ String bucket1 = "bucket" + RandomUtils.secure().randomInt();
+ // Add volume, bucket and key entries to OM DB.
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket1,
+ omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ String bucketKey1 = omMetadataManager.getBucketKey(volumeName, bucket1);
+ OmBucketInfo bucketInfo1 =
omMetadataManager.getBucketTable().get(bucketKey1);
+ PurgePathRequest purgePathRequest1 =
createBucketDataAndGetPurgePathRequest(bucketInfo1);
+ String bucket2 = "bucket" + RandomUtils.secure().randomInt();
+ // Add volume, bucket and key entries to OM DB.
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket2,
+ omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ String bucketKey2 = omMetadataManager.getBucketKey(volumeName, bucket1);
+ OmBucketInfo bucketInfo2 =
omMetadataManager.getBucketTable().get(bucketKey2);
+ PurgePathRequest purgePathRequest2 =
createBucketDataAndGetPurgePathRequest(bucketInfo2);
+ IOzoneManagerLock lock = spy(omMetadataManager.getLock());
+ Set<Long> acquiredLockIds = new ConcurrentSkipListSet<>();
+ Set<String> acquiredLockKeys = new ConcurrentSkipListSet<>();
+ try {
+ doAnswer(i -> {
+ long threadId = Thread.currentThread().getId();
+ GenericTestUtils.waitFor(() -> !acquiredLockIds.contains(threadId) ||
acquiredLockIds.size() == 2, 1000, 30000);
+ OMLockDetails lockDetails = (OMLockDetails) i.callRealMethod();
+ acquiredLockIds.add(threadId);
+ acquiredLockKeys.add(i.getArgument(1) + "/" + i.getArgument(2));
+ return lockDetails;
+ }).when(lock).acquireWriteLock(eq(BUCKET_LOCK), anyString(),
anyString());
+
+ doAnswer(i -> {
+ long threadId = Thread.currentThread().getId();
+ GenericTestUtils.waitFor(() -> !acquiredLockIds.contains(threadId) ||
acquiredLockIds.size() == 2, 1000, 30000);
+ OMLockDetails lockDetails = (OMLockDetails) i.callRealMethod();
+ acquiredLockIds.add(threadId);
+ for (String[] lockKey : (List<String[]>) i.getArgument(1)) {
+ acquiredLockKeys.add(lockKey[0] + "/" + lockKey[1]);
+ }
+ return lockDetails;
+ }).when(lock).acquireWriteLocks(eq(BUCKET_LOCK), anyCollection());
+ when(omMetadataManager.getLock()).thenReturn(lock);
+ OMDirectoriesPurgeRequestWithFSO purgePathRequests1 = new
OMDirectoriesPurgeRequestWithFSO(
+ preExecute(createPurgeKeysRequest(null,
Arrays.asList(purgePathRequest1, purgePathRequest2))));
+ OMDirectoriesPurgeRequestWithFSO purgePathRequests2 = new
OMDirectoriesPurgeRequestWithFSO(
+ preExecute(createPurgeKeysRequest(null,
Arrays.asList(purgePathRequest2, purgePathRequest1))));
+ CompletableFuture future1 = CompletableFuture.runAsync(() ->
+ purgePathRequests1.validateAndUpdateCache(ozoneManager, 100L));
+ CompletableFuture future2 = CompletableFuture.runAsync(() ->
+ purgePathRequests2.validateAndUpdateCache(ozoneManager, 100L));
+ future1.get();
+ future2.get();
+ assertEquals(Stream.of(bucketInfo1.getVolumeName() + "/" +
bucketInfo1.getBucketName(),
+ bucketInfo2.getVolumeName() + "/" +
bucketInfo2.getBucketName()).collect(Collectors.toSet()),
+ acquiredLockKeys);
+ } finally {
+ reset(lock);
+ }
+ }
+
@ParameterizedTest
@CsvSource(value = {"false,false", "false,true", "true,false", "true,true"})
public void testDirectoryPurge(boolean fromSnapshot, boolean purgeDirectory)
throws Exception {
when(ozoneManager.getDefaultReplicationConfig())
.thenReturn(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
- Random random = new Random();
- String bucket = "bucket" + random.nextInt();
+ String bucket = "bucket" + RandomUtils.secure().randomInt();
// Add volume, bucket and key entries to OM DB.
OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket,
omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 1d29e37d80e..f0e32ac405b 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -27,6 +27,7 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.framework;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import jakarta.annotation.Nonnull;
@@ -145,8 +146,8 @@ public void setup() throws Exception {
folder.toAbsolutePath().toString());
ozoneConfiguration.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED,
true);
ozoneConfiguration.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
true);
- omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration,
- ozoneManager);
+ omMetadataManager = spy(new OmMetadataManagerImpl(ozoneConfiguration,
+ ozoneManager));
when(ozoneManager.getMetrics()).thenReturn(omMetrics);
when(ozoneManager.getPerfMetrics()).thenReturn(perfMetrics);
when(ozoneManager.getDeletionMetrics()).thenReturn(delMetrics);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]