This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 322ca93b4a HDDS-13025. Refactor KeyDeletingService to use
ReclaimableKeyFilter (#8450)
322ca93b4a is described below
commit 322ca93b4a6358b5dda6ed2f62abb64274ad3975
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri May 23 22:18:14 2025 -0400
HDDS-13025. Refactor KeyDeletingService to use ReclaimableKeyFilter (#8450)
---
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 +
.../hadoop/ozone/TestOzoneConfigurationFields.java | 1 +
.../org/apache/hadoop/ozone/om/TestKeyPurging.java | 2 +-
...TestSnapshotDeletingServiceIntegrationTest.java | 8 +-
.../TestSnapshotDirectoryCleaningService.java | 1 -
.../org/apache/hadoop/ozone/om/KeyManager.java | 40 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 79 +++-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 156 ------
.../hadoop/ozone/om/PendingKeysDeletion.java | 18 +-
.../om/service/AbstractKeyDeletingService.java | 51 +-
.../ozone/om/service/KeyDeletingService.java | 523 +++++++--------------
.../hadoop/ozone/om/snapshot/SnapshotUtils.java | 5 +
.../ozone/om/service/TestKeyDeletingService.java | 55 ++-
13 files changed, 378 insertions(+), 566 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index d9b08494f0..43a0895142 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -399,8 +399,13 @@ public final class OMConfigKeys {
public static final String OZONE_THREAD_NUMBER_DIR_DELETION =
"ozone.thread.number.dir.deletion";
+ public static final String OZONE_THREAD_NUMBER_KEY_DELETION =
+ "ozone.thread.number.key.deletion";
+
public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10;
+ public static final int OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT = 10;
+
public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
"ozone.snapshot.filtering.limit.per.task";
public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 57caaa45b2..c699a6f6fa 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -123,6 +123,7 @@ private void addPropertiesNotInXml() {
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION,
+ OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION,
ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
ScmConfigKeys.OZONE_SCM_HA_PREFIX,
S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index 17d4d40a09..fa59754b67 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -126,7 +126,7 @@ public void testKeysPurgingByKeyDeletingService() throws
Exception {
GenericTestUtils.waitFor(
() -> {
try {
- return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+ return keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE)
.getKeyBlocksList().isEmpty();
} catch (IOException e) {
return false;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
index 7e8befa6a1..1662904853 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
@@ -25,6 +25,7 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -127,6 +128,7 @@ public void setup() throws Exception {
conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true);
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT,
10000, TimeUnit.MILLISECONDS);
+ conf.setInt(OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL, 500);
conf.setInt(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 500);
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 500,
TimeUnit.MILLISECONDS);
@@ -493,12 +495,12 @@ private KeyDeletingService
getMockedKeyDeletingService(AtomicBoolean keyDeletion
KeyManager keyManager = Mockito.spy(om.getKeyManager());
when(ozoneManager.getKeyManager()).thenReturn(keyManager);
KeyDeletingService keyDeletingService = Mockito.spy(new
KeyDeletingService(ozoneManager,
- ozoneManager.getScmClient().getBlockClient(), keyManager, 10000,
- 100000, cluster.getConf(), false));
+ ozoneManager.getScmClient().getBlockClient(), 10000,
+ 100000, cluster.getConf(), 10, false));
keyDeletingService.shutdown();
GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0,
1000,
100000);
- when(keyManager.getPendingDeletionKeys(anyInt())).thenAnswer(i -> {
+ when(keyManager.getPendingDeletionKeys(any(), anyInt())).thenAnswer(i -> {
// wait for SDS to reach the KDS wait block before processing any key.
GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000);
keyDeletionStarted.set(true);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
index 84307c5549..8591c6d1e8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java
@@ -240,7 +240,6 @@ public void testExclusiveSizeWithDirectoryDeepClean()
throws Exception {
Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
String snapshotName = snapshotEntry.getValue().getName();
SnapshotInfo snapshotInfo =
snapshotInfoTable.get(snapshotEntry.getKey());
- System.out.println(snapshotInfo.getName() + " " +
snapshotInfo.getDeepCleanedDeletedDir());
assertEquals(expectedSize.get(snapshotName),
snapshotInfo.getExclusiveSize() +
snapshotInfo.getExclusiveSizeDeltaFromDirDeepCleaning());
// Since for the test we are using RATIS/THREE
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index fa3e622313..61f46634ec 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -113,17 +112,38 @@ ListKeysResult listKeys(String volumeName, String
bucketName, String startKey,
throws IOException;
/**
- * Returns a PendingKeysDeletion. It has a list of pending deletion key info
- * that ups to the given count.Each entry is a {@link BlockGroup}, which
- * contains the info about the key name and all its associated block IDs.
- * Second is a Mapping of Key-Value pair which is updated in the
deletedTable.
+ * Retrieves pending deletion keys that match a given filter function.
*
- * @param count max number of keys to return.
- * @return a Pair of list of {@link BlockGroup} representing keys and blocks,
- * and a hashmap for key-value pair to be updated in the deletedTable.
- * @throws IOException
+ * @param filter a functional interface specifying the filter condition to
apply
+ * to the keys. It takes a KeyValue pair containing a string
key and
+ * an OmKeyInfo object, and returns a boolean value indicating
whether
+ * the key meets the filter criteria.
+ * @param count the maximum number of keys to retrieve.
+ * @return a PendingKeysDeletion object containing the keys that satisfy the
filter
+ * criteria, up to the specified count.
+ * @throws IOException if an I/O error occurs while fetching the keys.
*/
- PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException;
+ PendingKeysDeletion getPendingDeletionKeys(
+ CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, int count)
+ throws IOException;
+
+ /**
+ * Retrieves the keys that are pending deletion in a specified bucket and
volume.
+ *
+ * @param volume the name of the volume that contains the bucket.
+ * @param bucket the name of the bucket within the volume where keys are
located.
+ * @param startKey the key from which to start retrieving pending deletions.
+ * @param filter a filter function to determine which keys should be included
+ * in the pending deletion list.
+ * @param count the maximum number of keys to retrieve that are pending
deletion.
+ * @return a PendingKeysDeletion object containing the list of keys
+ * pending deletion based on the specified parameters.
+ * @throws IOException if an I/O error occurs during the operation.
+ */
+ PendingKeysDeletion getPendingDeletionKeys(
+ String volume, String bucket, String startKey,
+ CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, int count)
+ throws IOException;
/**
* Returns a list rename entries from the snapshotRenamedTable.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 6a493c1f31..a29e8fdfad 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -66,6 +66,8 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -113,6 +115,7 @@
import
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -136,6 +139,7 @@
import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
@@ -172,6 +176,7 @@
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.security.acl.RequestContext;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.function.CheckedFunction;
@@ -254,9 +259,15 @@ public void start(OzoneConfiguration configuration) {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
+ int keyDeletingServiceCorePoolSize =
+ configuration.getInt(OZONE_THREAD_NUMBER_KEY_DELETION,
+ OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT);
+ if (keyDeletingServiceCorePoolSize <= 0) {
+ keyDeletingServiceCorePoolSize = 1;
+ }
keyDeletingService = new KeyDeletingService(ozoneManager,
- scmClient.getBlockClient(), this, blockDeleteInterval,
- serviceTimeout, configuration, isSnapshotDeepCleaningEnabled);
+ scmClient.getBlockClient(), blockDeleteInterval,
+ serviceTimeout, configuration, keyDeletingServiceCorePoolSize,
isSnapshotDeepCleaningEnabled);
keyDeletingService.start();
}
@@ -722,12 +733,66 @@ public ListKeysResult listKeys(String volumeName, String
bucketName,
}
@Override
- public PendingKeysDeletion getPendingDeletionKeys(final int count)
+ public PendingKeysDeletion getPendingDeletionKeys(
+ final CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean,
IOException> filter, final int count)
throws IOException {
- OmMetadataManagerImpl omMetadataManager =
- (OmMetadataManagerImpl) metadataManager;
- return omMetadataManager
- .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
+ return getPendingDeletionKeys(null, null, null, filter, count);
+ }
+
+ @Override
+ public PendingKeysDeletion getPendingDeletionKeys(
+ String volume, String bucket, String startKey,
+ CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter,
+ int count) throws IOException {
+ List<BlockGroup> keyBlocksList = Lists.newArrayList();
+ Map<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
+ // Bucket prefix would be empty if volume is empty i.e. either null or "".
+ Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false);
+ try (TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>>
+ delKeyIter =
metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) {
+
+ /* Seeking to the start key if it not null. The next key picked up would
be ensured to start with the bucket
+ prefix, {@link
org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this.
+ */
+ if (startKey != null) {
+ delKeyIter.seek(startKey);
+ }
+ int currentCount = 0;
+ while (delKeyIter.hasNext() && currentCount < count) {
+ RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
+ Table.KeyValue<String, RepeatedOmKeyInfo> kv = delKeyIter.next();
+ if (kv != null) {
+ List<BlockGroup> blockGroupList = Lists.newArrayList();
+ // Multiple keys with the same path can be queued in one DB entry
+ RepeatedOmKeyInfo infoList = kv.getValue();
+ for (OmKeyInfo info : infoList.getOmKeyInfoList()) {
+
+ // Skip the key if the filter doesn't allow the file to be deleted.
+ if (filter == null || filter.apply(Table.newKeyValue(kv.getKey(),
info))) {
+ List<BlockID> blockIDS = info.getKeyLocationVersions().stream()
+ .flatMap(versionLocations ->
versionLocations.getLocationList().stream()
+ .map(b -> new BlockID(b.getContainerID(),
b.getLocalID()))).collect(Collectors.toList());
+ BlockGroup keyBlocks =
BlockGroup.newBuilder().setKeyName(kv.getKey())
+ .addAllBlockIDs(blockIDS).build();
+ blockGroupList.add(keyBlocks);
+ currentCount++;
+ } else {
+ notReclaimableKeyInfo.addOmKeyInfo(info);
+ }
+ }
+
+ List<OmKeyInfo> notReclaimableKeyInfoList =
notReclaimableKeyInfo.getOmKeyInfoList();
+
+ // If all the versions are not reclaimable, then modify key by just
purging the key that can be purged.
+ if (!notReclaimableKeyInfoList.isEmpty() &&
+ notReclaimableKeyInfoList.size() !=
infoList.getOmKeyInfoList().size()) {
+ keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
+ }
+ keyBlocksList.addAll(blockGroupList);
+ }
+ }
+ }
+ return new PendingKeysDeletion(keyBlocksList, keysToModify);
}
private <V, R> List<Table.KeyValue<String, R>> getTableEntries(String
startKey,
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 173f18313c..e1f50b1922 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -34,7 +34,6 @@
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotDirExist;
-import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.isBlockLocationInfoSame;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
@@ -55,7 +54,6 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -123,7 +121,6 @@
import org.apache.hadoop.util.Time;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1311,159 +1308,6 @@ private PersistedUserVolumeInfo getVolumesByUser(String
userNameKey)
}
}
- /**
- * Returns a list of pending deletion key info up to the limit.
- * Each entry is a {@link BlockGroup}, which contains the info about the key
- * name and all its associated block IDs.
- *
- * @param keyCount max number of keys to return.
- * @param omSnapshotManager SnapshotManager
- * @return a list of {@link BlockGroup} represent keys and blocks.
- * @throws IOException
- */
- public PendingKeysDeletion getPendingDeletionKeys(final int keyCount,
- OmSnapshotManager omSnapshotManager)
- throws IOException {
- List<BlockGroup> keyBlocksList = Lists.newArrayList();
- HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
- try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
- keyIter = getDeletedTable().iterator()) {
- int currentCount = 0;
- while (keyIter.hasNext() && currentCount < keyCount) {
- RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo();
- KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
- if (kv != null) {
- List<BlockGroup> blockGroupList = Lists.newArrayList();
- // Get volume name and bucket name
- String[] keySplit = kv.getKey().split(OM_KEY_PREFIX);
- String bucketKey = getBucketKey(keySplit[1], keySplit[2]);
- OmBucketInfo bucketInfo = getBucketTable().get(bucketKey);
- // If Bucket deleted bucketInfo would be null, thus making previous
snapshot also null.
- SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null :
- SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(),
- bucketInfo.getBucketName(), ozoneManager, snapshotChainManager);
- // previous snapshot is not active or it has not been flushed to
disk then don't process the key in this
- // iteration.
- if (previousSnapshotInfo != null &&
- (previousSnapshotInfo.getSnapshotStatus() !=
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
-
!OmSnapshotManager.areSnapshotChangesFlushedToDB(ozoneManager.getMetadataManager(),
- previousSnapshotInfo))) {
- continue;
- }
- // Get the latest snapshot in snapshot path.
- try (UncheckedAutoCloseableSupplier<OmSnapshot> rcLatestSnapshot =
previousSnapshotInfo == null ? null :
-
omSnapshotManager.getSnapshot(previousSnapshotInfo.getVolumeName(),
- previousSnapshotInfo.getBucketName(),
previousSnapshotInfo.getName())) {
-
- // Multiple keys with the same path can be queued in one DB entry
- RepeatedOmKeyInfo infoList = kv.getValue();
- for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) {
- // Skip the key if it exists in the previous snapshot (of the
same
- // scope) as in this case its blocks should not be reclaimed
-
- // If the last snapshot is deleted and the keys renamed in
between
- // the snapshots will be cleaned up by KDS. So we need to check
- // in the renamedTable as well.
- String dbRenameKey = getRenameKey(info.getVolumeName(),
- info.getBucketName(), info.getObjectID());
-
- if (rcLatestSnapshot != null) {
- Table<String, OmKeyInfo> prevKeyTable =
- rcLatestSnapshot.get()
- .getMetadataManager()
- .getKeyTable(bucketInfo.getBucketLayout());
-
- Table<String, RepeatedOmKeyInfo> prevDeletedTable =
-
rcLatestSnapshot.get().getMetadataManager().getDeletedTable();
- String prevKeyTableDBKey = getSnapshotRenamedTable()
- .get(dbRenameKey);
- String prevDelTableDBKey = getOzoneKey(info.getVolumeName(),
- info.getBucketName(), info.getKeyName());
- // format: /volName/bucketName/keyName/objId
- prevDelTableDBKey = getOzoneDeletePathKey(info.getObjectID(),
- prevDelTableDBKey);
-
- if (prevKeyTableDBKey == null &&
- bucketInfo.getBucketLayout().isFileSystemOptimized()) {
- long volumeId = getVolumeId(info.getVolumeName());
- prevKeyTableDBKey = getOzonePathKey(volumeId,
- bucketInfo.getObjectID(),
- info.getParentObjectID(),
- info.getFileName());
- } else if (prevKeyTableDBKey == null) {
- prevKeyTableDBKey = getOzoneKey(info.getVolumeName(),
- info.getBucketName(),
- info.getKeyName());
- }
-
- OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey);
- // When key is deleted it is no longer in keyTable, we also
- // have to check deletedTable of previous snapshot
- RepeatedOmKeyInfo delOmKeyInfo =
- prevDeletedTable.get(prevDelTableDBKey);
- if (versionExistsInPreviousSnapshot(omKeyInfo,
- info, delOmKeyInfo)) {
- // If the infoList size is 1, there is nothing to split.
- // We either delete it or skip it.
- if (!(infoList.getOmKeyInfoList().size() == 1)) {
- notReclaimableKeyInfo.addOmKeyInfo(info);
- }
- continue;
- }
- }
-
- // Add all blocks from all versions of the key to the deletion
- // list
- for (OmKeyLocationInfoGroup keyLocations :
- info.getKeyLocationVersions()) {
- List<BlockID> item = keyLocations.getLocationList().stream()
- .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
- .collect(Collectors.toList());
- BlockGroup keyBlocks = BlockGroup.newBuilder()
- .setKeyName(kv.getKey())
- .addAllBlockIDs(item)
- .build();
- blockGroupList.add(keyBlocks);
- }
- currentCount++;
- }
-
- List<OmKeyInfo> notReclaimableKeyInfoList =
- notReclaimableKeyInfo.getOmKeyInfoList();
- // If Bucket deleted bucketInfo would be null, thus making
previous snapshot also null.
- SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null :
- SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(),
- bucketInfo.getBucketName(), ozoneManager,
snapshotChainManager);
- // Check if the previous snapshot in the chain hasn't changed.
- if
(Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId),
-
Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) {
- // If all the versions are not reclaimable, then do nothing.
- if (!notReclaimableKeyInfoList.isEmpty() &&
- notReclaimableKeyInfoList.size() !=
- infoList.getOmKeyInfoList().size()) {
- keysToModify.put(kv.getKey(), notReclaimableKeyInfo);
- }
-
- if (notReclaimableKeyInfoList.size() !=
- infoList.getOmKeyInfoList().size()) {
- keyBlocksList.addAll(blockGroupList);
- }
- }
- }
- }
- }
- }
- return new PendingKeysDeletion(keyBlocksList, keysToModify);
- }
-
- private boolean versionExistsInPreviousSnapshot(OmKeyInfo omKeyInfo,
- OmKeyInfo info, RepeatedOmKeyInfo delOmKeyInfo) {
- return (omKeyInfo != null &&
- info.getObjectID() == omKeyInfo.getObjectID() &&
- isBlockLocationInfoSame(omKeyInfo, info)) ||
- delOmKeyInfo != null;
- }
-
/**
* Decide whether the open key is a multipart upload related key.
* @param openKeyInfo open key related to multipart upload
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
index 7af213f8f1..e1fbdfb107 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PendingKeysDeletion.java
@@ -17,26 +17,34 @@
package org.apache.hadoop.ozone.om;
-import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
/**
- * Return class for OMMetadataManager#getPendingDeletionKeys.
+ * Tracks metadata for keys pending deletion and their associated blocks.
+ *
+ * This class maintains:
+ * <ul>
+ * <li>A list of {@link BlockGroup} entries, where each entry contains
+ * a key name and its associated block IDs</li>
+ * <li>A key-value mapping that requires updating after the remaining
+ * blocks are purged</li>
+ * </ul>
*/
public class PendingKeysDeletion {
- private HashMap<String, RepeatedOmKeyInfo> keysToModify;
+ private Map<String, RepeatedOmKeyInfo> keysToModify;
private List<BlockGroup> keyBlocksList;
public PendingKeysDeletion(List<BlockGroup> keyBlocksList,
- HashMap<String, RepeatedOmKeyInfo> keysToModify) {
+ Map<String, RepeatedOmKeyInfo> keysToModify) {
this.keysToModify = keysToModify;
this.keyBlocksList = keyBlocksList;
}
- public HashMap<String, RepeatedOmKeyInfo> getKeysToModify() {
+ public Map<String, RepeatedOmKeyInfo> getKeysToModify() {
return keysToModify;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 8b9455b49c..155ea9a37a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -101,12 +101,12 @@ public AbstractKeyDeletingService(String serviceName,
long interval,
this.callId = new AtomicLong(0);
}
- protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
+ protected Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup>
keyBlocksList,
Map<String, RepeatedOmKeyInfo> keysToModify,
- String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException
{
+ String snapTableKey, UUID expectedPreviousSnapshotId) throws
IOException, InterruptedException {
long startTime = Time.monotonicNow();
- int delCount = 0;
+ Pair<Integer, Boolean> purgeResult = Pair.of(0, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Send {} key(s) to SCM: {}",
keyBlocksList.size(), keyBlocksList);
@@ -124,15 +124,15 @@ protected int processKeyDeletes(List<BlockGroup>
keyBlocksList,
keyBlocksList.size(), Time.monotonicNow() - startTime);
if (blockDeletionResults != null) {
long purgeStartTime = Time.monotonicNow();
- delCount = submitPurgeKeysRequest(blockDeletionResults,
+ purgeResult = submitPurgeKeysRequest(blockDeletionResults,
keysToModify, snapTableKey, expectedPreviousSnapshotId);
int limit =
ozoneManager.getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK,
OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms.
Limit per task is {}.",
- delCount, blockDeletionResults.size(), Time.monotonicNow() -
purgeStartTime, limit);
+ purgeResult, blockDeletionResults.size(), Time.monotonicNow() -
purgeStartTime, limit);
}
perfMetrics.setKeyDeletingServiceLatencyMs(Time.monotonicNow() -
startTime);
- return delCount;
+ return purgeResult;
}
/**
@@ -141,13 +141,15 @@ protected int processKeyDeletes(List<BlockGroup>
keyBlocksList,
* @param results DeleteBlockGroups returned by SCM.
* @param keysToModify Updated list of RepeatedOmKeyInfo
*/
- private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
- Map<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, UUID
expectedPreviousSnapshotId) {
+ private Pair<Integer, Boolean>
submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
+ Map<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, UUID
expectedPreviousSnapshotId)
+ throws InterruptedException {
List<String> purgeKeys = new ArrayList<>();
// Put all keys to be purged in a list
int deletedCount = 0;
Set<String> failedDeletedKeys = new HashSet<>();
+ boolean purgeSuccess = true;
for (DeleteBlockGroupResult result : results) {
String deletedKey = result.getObjectKey();
if (result.isSuccess()) {
@@ -169,6 +171,7 @@ private int
submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
} else {
// If the block deletion failed, then the deleted keys should also not
be modified.
failedDeletedKeys.add(deletedKey);
+ purgeSuccess = false;
}
}
@@ -219,14 +222,17 @@ private int
submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
.build();
// Submit PurgeKeys request to OM
- try {
- submitRequest(omRequest);
+ try (BootstrapStateHandler.Lock lock = snapTableKey != null ?
getBootstrapStateLock().lock() : null) {
+ OzoneManagerProtocolProtos.OMResponse omResponse =
submitRequest(omRequest);
+ if (omResponse != null) {
+ purgeSuccess = purgeSuccess && omResponse.getSuccess();
+ }
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.", e);
- return 0;
+ return Pair.of(0, false);
}
- return deletedCount;
+ return Pair.of(deletedCount, purgeSuccess);
}
protected OzoneManagerProtocolProtos.OMResponse submitRequest(OMRequest
omRequest) throws ServiceException {
@@ -637,4 +643,25 @@ public long getMovedFilesCount() {
public BootstrapStateHandler.Lock getBootstrapStateLock() {
return lock;
}
+
+ /**
+ * Submits SetSnapsnapshotPropertyRequest to OM.
+ * @param setSnapshotPropertyRequests request to be sent to OM
+ */
+ protected void submitSetSnapshotRequests(
+ List<OzoneManagerProtocolProtos.SetSnapshotPropertyRequest>
setSnapshotPropertyRequests) {
+ if (setSnapshotPropertyRequests.isEmpty()) {
+ return;
+ }
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.SetSnapshotProperty)
+ .addAllSetSnapshotPropertyRequests(setSnapshotPropertyRequests)
+ .setClientId(clientId.toString())
+ .build();
+ try {
+ submitRequest(omRequest);
+ } catch (ServiceException e) {
+ LOG.error("Failed to submit set snapshot property request", e);
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index 7ec8ed71e6..faf320ab85 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -19,34 +19,29 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
-import static
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
-import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.DeletingServiceMetrics;
import org.apache.hadoop.ozone.om.KeyManager;
@@ -56,17 +51,12 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.PendingKeysDeletion;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
-import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
-import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,46 +71,30 @@ public class KeyDeletingService extends
AbstractKeyDeletingService {
private static final Logger LOG =
LoggerFactory.getLogger(KeyDeletingService.class);
- // Use only a single thread for KeyDeletion. Multiple threads would read
- // from the same table and can send deletion requests for same key multiple
- // times.
- private static final int KEY_DELETING_CORE_POOL_SIZE = 1;
-
- private final KeyManager manager;
private int keyLimitPerTask;
private final AtomicLong deletedKeyCount;
private final AtomicBoolean suspended;
- private final Map<String, Long> exclusiveSizeMap;
- private final Map<String, Long> exclusiveReplicatedSizeMap;
- private final Set<String> completedExclusiveSizeSet;
- private final Map<String, String> snapshotSeekMap;
- private AtomicBoolean isRunningOnAOS;
+ private final AtomicBoolean isRunningOnAOS;
private final boolean deepCleanSnapshots;
private final SnapshotChainManager snapshotChainManager;
private DeletingServiceMetrics metrics;
public KeyDeletingService(OzoneManager ozoneManager,
- ScmBlockLocationProtocol scmClient,
- KeyManager manager, long serviceInterval,
- long serviceTimeout, ConfigurationSource conf,
+ ScmBlockLocationProtocol scmClient, long serviceInterval,
+ long serviceTimeout, ConfigurationSource conf, int
keyDeletionCorePoolSize,
boolean deepCleanSnapshots) {
super(KeyDeletingService.class.getSimpleName(), serviceInterval,
- TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE,
+ TimeUnit.MILLISECONDS, keyDeletionCorePoolSize,
serviceTimeout, ozoneManager, scmClient);
- this.manager = manager;
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
Preconditions.checkArgument(keyLimitPerTask >= 0,
OZONE_KEY_DELETING_LIMIT_PER_TASK + " cannot be negative.");
this.deletedKeyCount = new AtomicLong(0);
this.suspended = new AtomicBoolean(false);
- this.exclusiveSizeMap = new HashMap<>();
- this.exclusiveReplicatedSizeMap = new HashMap<>();
- this.completedExclusiveSizeSet = new HashSet<>();
- this.snapshotSeekMap = new HashMap<>();
this.isRunningOnAOS = new AtomicBoolean(false);
this.deepCleanSnapshots = deepCleanSnapshots;
- this.snapshotChainManager =
((OmMetadataManagerImpl)manager.getMetadataManager()).getSnapshotChainManager();
+ this.snapshotChainManager =
((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
this.metrics = ozoneManager.getDeletionMetrics();
}
@@ -141,7 +115,20 @@ public boolean isRunningOnAOS() {
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- queue.add(new KeyDeletingTask(this));
+ queue.add(new KeyDeletingTask(this, null));
+ if (deepCleanSnapshots) {
+ Iterator<UUID> iterator = null;
+ try {
+ iterator = snapshotChainManager.iterator(true);
+ } catch (IOException e) {
+ LOG.error("Error while initializing snapshot chain iterator.");
+ return queue;
+ }
+ while (iterator.hasNext()) {
+ UUID snapshotId = iterator.next();
+ queue.add(new KeyDeletingTask(this, snapshotId));
+ }
+ }
return queue;
}
@@ -186,9 +173,114 @@ public void setKeyLimitPerTask(int keyLimitPerTask) {
*/
private final class KeyDeletingTask implements BackgroundTask {
private final KeyDeletingService deletingService;
+ private final UUID snapshotId;
- private KeyDeletingTask(KeyDeletingService service) {
+ private KeyDeletingTask(KeyDeletingService service, UUID snapshotId) {
this.deletingService = service;
+ this.snapshotId = snapshotId;
+ }
+
+ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest
getSetSnapshotRequestUpdatingExclusiveSize(
+ Map<UUID, Long> exclusiveSizeMap, Map<UUID, Long>
exclusiveReplicatedSizeMap, UUID snapshotID) {
+ OzoneManagerProtocolProtos.SnapshotSize snapshotSize =
OzoneManagerProtocolProtos.SnapshotSize.newBuilder()
+ .setExclusiveSize(
+ exclusiveSizeMap.getOrDefault(snapshotID, 0L))
+ .setExclusiveReplicatedSize(
+ exclusiveReplicatedSizeMap.getOrDefault(
+ snapshotID, 0L))
+ .build();
+
+ return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+ .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID))
+ .setSnapshotSize(snapshotSize)
+ .build();
+ }
+
+ /**
+ *
+ * @param currentSnapshotInfo if null, deleted directories in AOS should
be processed.
+ * @param keyManager KeyManager of the underlying store.
+ */
+ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo,
KeyManager keyManager,
+ int remainNum) throws IOException, InterruptedException {
+ String volume = null, bucket = null, snapshotTableKey = null;
+ if (currentSnapshotInfo != null) {
+ volume = currentSnapshotInfo.getVolumeName();
+ bucket = currentSnapshotInfo.getBucketName();
+ snapshotTableKey = currentSnapshotInfo.getTableKey();
+ }
+
+ boolean successStatus = true;
+ try {
+ // TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in
+ // snapshot's deletedTable when active DB's deletedTable
+ // doesn't have enough entries left.
+ // OM would have to keep track of which snapshot the key is coming
+ // from if the above would be done inside getPendingDeletionKeys().
+ OmSnapshotManager omSnapshotManager =
getOzoneManager().getOmSnapshotManager();
+ // This is to avoid race condition b/w purge request and snapshot
chain update. For AOS taking the global
+ // snapshotId since AOS could process multiple buckets in one
iteration. While using path
+ // previous snapshotId for a snapshot since it would process only one
bucket.
+ UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ?
+ snapshotChainManager.getLatestGlobalSnapshotId() :
+ SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo,
snapshotChainManager);
+
+ IOzoneManagerLock lock =
getOzoneManager().getMetadataManager().getLock();
+
+ // Purge deleted Keys in the deletedTable && rename entries in the
snapshotRenamedTable which doesn't have a
+ // reference in the previous snapshot.
+ try (ReclaimableKeyFilter reclaimableKeyFilter = new
ReclaimableKeyFilter(getOzoneManager(),
+ omSnapshotManager, snapshotChainManager, currentSnapshotInfo,
keyManager, lock)) {
+ // Get pending keys that can be deleted
+ PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null
+ ? keyManager.getPendingDeletionKeys(reclaimableKeyFilter,
remainNum)
+ : keyManager.getPendingDeletionKeys(volume, bucket, null,
reclaimableKeyFilter, remainNum);
+ List<BlockGroup> keyBlocksList =
pendingKeysDeletion.getKeyBlocksList();
+ //submit purge requests if there are renamed entries to be purged or
keys to be purged.
+ if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
+ // Validating if the previous snapshot is still the same before
purging the blocks.
+ SnapshotUtils.validatePreviousSnapshotId(currentSnapshotInfo,
snapshotChainManager,
+ expectedPreviousSnapshotId);
+ Pair<Integer, Boolean> purgeResult =
processKeyDeletes(keyBlocksList,
+ pendingKeysDeletion.getKeysToModify(), snapshotTableKey,
+ expectedPreviousSnapshotId);
+ remainNum -= purgeResult.getKey();
+ successStatus = purgeResult.getValue();
+ metrics.incrNumKeysProcessed(keyBlocksList.size());
+ metrics.incrNumKeysSentForPurge(purgeResult.getKey());
+ if (successStatus) {
+ deletedKeyCount.addAndGet(purgeResult.getKey());
+ }
+ }
+
+ // Checking remainNum is greater than zero and not equal to the
initial value if there were some keys to
+ // reclaim. This is to check if all keys have been iterated over and
all the keys necessary have been
+ // reclaimed.
+ if (remainNum > 0 && successStatus) {
+ List<SetSnapshotPropertyRequest> setSnapshotPropertyRequests = new
ArrayList<>();
+ Map<UUID, Long> exclusiveReplicatedSizeMap =
reclaimableKeyFilter.getExclusiveReplicatedSizeMap();
+ Map<UUID, Long> exclusiveSizeMap =
reclaimableKeyFilter.getExclusiveSizeMap();
+ List<UUID> previousPathSnapshotsInChain =
+ Stream.of(exclusiveSizeMap.keySet(),
exclusiveReplicatedSizeMap.keySet())
+
.flatMap(Collection::stream).distinct().collect(Collectors.toList());
+ for (UUID snapshot : previousPathSnapshotsInChain) {
+
setSnapshotPropertyRequests.add(getSetSnapshotRequestUpdatingExclusiveSize(exclusiveSizeMap,
+ exclusiveReplicatedSizeMap, snapshot));
+ }
+
+ // Updating directory deep clean flag of snapshot.
+ if (currentSnapshotInfo != null) {
+
setSnapshotPropertyRequests.add(OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+ .setSnapshotKey(snapshotTableKey)
+ .setDeepCleanedDeletedKey(true)
+ .build());
+ }
+ submitSetSnapshotRequests(setSnapshotPropertyRequests);
+ }
+ }
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
}
@Override
@@ -202,326 +294,51 @@ public BackgroundTaskResult call() {
// task.
if (shouldRun()) {
final long run = getRunCount().incrementAndGet();
- LOG.debug("Running KeyDeletingService {}", run);
- isRunningOnAOS.set(true);
- int delCount = 0;
- try {
- // TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in
- // snapshot's deletedTable when active DB's deletedTable
- // doesn't have enough entries left.
- // OM would have to keep track of which snapshot the key is coming
- // from if the above would be done inside getPendingDeletionKeys().
- // This is to avoid race condition b/w purge request and snapshot
chain update. For AOS taking the global
- // snapshotId since AOS could process multiple buckets in one
iteration.
- UUID expectedPreviousSnapshotId =
snapshotChainManager.getLatestGlobalSnapshotId();
- PendingKeysDeletion pendingKeysDeletion = manager
- .getPendingDeletionKeys(getKeyLimitPerTask());
- List<BlockGroup> keyBlocksList = pendingKeysDeletion
- .getKeyBlocksList();
- if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
- delCount = processKeyDeletes(keyBlocksList,
- pendingKeysDeletion.getKeysToModify(), null,
expectedPreviousSnapshotId);
- deletedKeyCount.addAndGet(delCount);
- metrics.incrNumKeysProcessed(keyBlocksList.size());
- metrics.incrNumKeysSentForPurge(delCount);
- }
- } catch (IOException e) {
- LOG.error("Error while running delete keys background task. Will " +
- "retry at next run.", e);
+ if (snapshotId == null) {
+ LOG.debug("Running KeyDeletingService for active object store, {}",
run);
+ isRunningOnAOS.set(true);
+ } else {
+ LOG.debug("Running KeyDeletingService for snapshot : {}, {}",
snapshotId, run);
}
-
+ int remainNum = keyLimitPerTask;
+ OmSnapshotManager omSnapshotManager =
getOzoneManager().getOmSnapshotManager();
+ SnapshotInfo snapInfo = null;
try {
- if (deepCleanSnapshots && delCount < keyLimitPerTask) {
- processSnapshotDeepClean(delCount);
- }
- } catch (Exception e) {
- LOG.error("Error while running deep clean on snapshots. Will " +
- "retry at next run.", e);
- }
-
- }
- isRunningOnAOS.set(false);
- synchronized (deletingService) {
- this.deletingService.notify();
- }
-
- // By design, no one cares about the results of this call back.
- return EmptyTaskResult.newResult();
- }
-
- @SuppressWarnings("checkstyle:MethodLength")
- private void processSnapshotDeepClean(int delCount)
- throws IOException {
- OmSnapshotManager omSnapshotManager =
- getOzoneManager().getOmSnapshotManager();
- OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
- getOzoneManager().getMetadataManager();
- SnapshotChainManager snapChainManager = metadataManager
- .getSnapshotChainManager();
- Table<String, SnapshotInfo> snapshotInfoTable =
- getOzoneManager().getMetadataManager().getSnapshotInfoTable();
- List<String> deepCleanedSnapshots = new ArrayList<>();
- try (TableIterator<String, ? extends Table.KeyValue
- <String, SnapshotInfo>> iterator = snapshotInfoTable.iterator()) {
-
- while (delCount < keyLimitPerTask && iterator.hasNext()) {
- List<BlockGroup> keysToPurge = new ArrayList<>();
- HashMap<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>();
- SnapshotInfo currSnapInfo =
snapshotInfoTable.get(iterator.next().getKey());
- // Deep clean only on active snapshot. Deleted Snapshots will be
- // cleaned up by SnapshotDeletingService.
- if (currSnapInfo == null || currSnapInfo.getSnapshotStatus() !=
SNAPSHOT_ACTIVE ||
- currSnapInfo.getDeepClean()) {
- continue;
- }
-
- SnapshotInfo prevSnapInfo =
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
- currSnapInfo);
- if (prevSnapInfo != null &&
- (prevSnapInfo.getSnapshotStatus() !=
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
-
!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
- prevSnapInfo))) {
- continue;
- }
-
- try (UncheckedAutoCloseableSupplier<OmSnapshot>
- rcCurrOmSnapshot = omSnapshotManager.getSnapshot(
- currSnapInfo.getVolumeName(),
- currSnapInfo.getBucketName(),
- currSnapInfo.getName())) {
- OmSnapshot currOmSnapshot = rcCurrOmSnapshot.get();
-
- Table<String, RepeatedOmKeyInfo> snapDeletedTable =
- currOmSnapshot.getMetadataManager().getDeletedTable();
- Table<String, String> snapRenamedTable =
- currOmSnapshot.getMetadataManager().getSnapshotRenamedTable();
-
- long volumeId = metadataManager.getVolumeId(
- currSnapInfo.getVolumeName());
- // Get bucketInfo for the snapshot bucket to get bucket layout.
- String dbBucketKey = metadataManager.getBucketKey(
- currSnapInfo.getVolumeName(), currSnapInfo.getBucketName());
- OmBucketInfo bucketInfo = metadataManager.getBucketTable()
- .get(dbBucketKey);
-
- if (bucketInfo == null) {
- throw new IllegalStateException("Bucket " + "/" + currSnapInfo
- .getVolumeName() + "/" + currSnapInfo.getBucketName() +
- " is not found. BucketInfo should not be null for" +
- " snapshotted bucket. The OM is in unexpected state.");
+ snapInfo = snapshotId == null ? null :
+ SnapshotUtils.getSnapshotInfo(getOzoneManager(),
snapshotChainManager, snapshotId);
+ if (snapInfo != null) {
+ if
(!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
snapInfo)) {
+ LOG.info("Skipping snapshot processing since changes to snapshot
{} have not been flushed to disk",
+ snapInfo);
+ return EmptyTaskResult.newResult();
}
-
- String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX;
- SnapshotInfo previousSnapshot =
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
- currSnapInfo);
- SnapshotInfo previousToPrevSnapshot = null;
-
- if (previousSnapshot != null) {
- previousToPrevSnapshot =
SnapshotUtils.getPreviousSnapshot(getOzoneManager(), snapChainManager,
- previousSnapshot);
+ if (!snapInfo.getDeepCleanedDeletedDir()) {
+ LOG.debug("Snapshot {} hasn't done deleted directory deep
cleaning yet. Skipping the snapshot in this" +
+ " iteration.", snapInfo);
+ return EmptyTaskResult.newResult();
}
-
- Table<String, OmKeyInfo> previousKeyTable = null;
- Table<String, String> prevRenamedTable = null;
- UncheckedAutoCloseableSupplier<OmSnapshot> rcPrevOmSnapshot = null;
-
- // Split RepeatedOmKeyInfo and update current snapshot
- // deletedKeyTable and next snapshot deletedKeyTable.
- if (previousSnapshot != null) {
- rcPrevOmSnapshot = omSnapshotManager.getSnapshot(
- previousSnapshot.getVolumeName(),
- previousSnapshot.getBucketName(),
- previousSnapshot.getName());
- OmSnapshot omPreviousSnapshot = rcPrevOmSnapshot.get();
-
- previousKeyTable = omPreviousSnapshot.getMetadataManager()
- .getKeyTable(bucketInfo.getBucketLayout());
- prevRenamedTable = omPreviousSnapshot
- .getMetadataManager().getSnapshotRenamedTable();
- }
-
- Table<String, OmKeyInfo> previousToPrevKeyTable = null;
- UncheckedAutoCloseableSupplier<OmSnapshot> rcPrevToPrevOmSnapshot
= null;
- if (previousToPrevSnapshot != null) {
- rcPrevToPrevOmSnapshot = omSnapshotManager.getSnapshot(
- previousToPrevSnapshot.getVolumeName(),
- previousToPrevSnapshot.getBucketName(),
- previousToPrevSnapshot.getName());
- OmSnapshot omPreviousToPrevSnapshot =
rcPrevToPrevOmSnapshot.get();
-
- previousToPrevKeyTable = omPreviousToPrevSnapshot
- .getMetadataManager()
- .getKeyTable(bucketInfo.getBucketLayout());
- }
-
- try (TableIterator<String, ? extends Table.KeyValue<String,
- RepeatedOmKeyInfo>> deletedIterator = snapDeletedTable
- .iterator()) {
-
- String lastKeyInCurrentRun = null;
- String deletedTableSeek = snapshotSeekMap.getOrDefault(
- currSnapInfo.getTableKey(), snapshotBucketKey);
- deletedIterator.seek(deletedTableSeek);
- // To avoid processing the last key from the previous
- // run again.
- if (!deletedTableSeek.equals(snapshotBucketKey) &&
- deletedIterator.hasNext()) {
- deletedIterator.next();
- }
-
- while (deletedIterator.hasNext() && delCount < keyLimitPerTask) {
- Table.KeyValue<String, RepeatedOmKeyInfo>
- deletedKeyValue = deletedIterator.next();
- String deletedKey = deletedKeyValue.getKey();
- lastKeyInCurrentRun = deletedKey;
-
- // Exit if it is out of the bucket scope.
- if (!deletedKey.startsWith(snapshotBucketKey)) {
- break;
- }
-
- RepeatedOmKeyInfo repeatedOmKeyInfo =
- deletedKeyValue.getValue();
-
- List<BlockGroup> blockGroupList = new ArrayList<>();
- RepeatedOmKeyInfo newRepeatedOmKeyInfo =
- new RepeatedOmKeyInfo();
- for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList())
{
- if (previousSnapshot != null) {
- // Calculates the exclusive size for the previous
- // snapshot. See Java Doc for more info.
- calculateExclusiveSize(previousSnapshot,
- previousToPrevSnapshot, keyInfo, bucketInfo, volumeId,
- snapRenamedTable, previousKeyTable, prevRenamedTable,
- previousToPrevKeyTable, exclusiveSizeMap,
- exclusiveReplicatedSizeMap);
- }
-
- if (isKeyReclaimable(previousKeyTable, snapRenamedTable,
- keyInfo, bucketInfo, volumeId, null)) {
- List<BlockGroup> blocksForKeyDelete = currOmSnapshot
- .getMetadataManager()
- .getBlocksForKeyDelete(deletedKey);
- if (blocksForKeyDelete != null) {
- blockGroupList.addAll(blocksForKeyDelete);
- }
- delCount++;
- } else {
- newRepeatedOmKeyInfo.addOmKeyInfo(keyInfo);
- }
- }
-
- if (!newRepeatedOmKeyInfo.getOmKeyInfoList().isEmpty() &&
- newRepeatedOmKeyInfo.getOmKeyInfoList().size() !=
- repeatedOmKeyInfo.getOmKeyInfoList().size()) {
- keysToModify.put(deletedKey, newRepeatedOmKeyInfo);
- }
-
- if (newRepeatedOmKeyInfo.getOmKeyInfoList().size() !=
- repeatedOmKeyInfo.getOmKeyInfoList().size()) {
- keysToPurge.addAll(blockGroupList);
- }
- }
-
- if (delCount < keyLimitPerTask) {
- // Deep clean is completed, we can update the SnapInfo.
- deepCleanedSnapshots.add(currSnapInfo.getTableKey());
- // exclusiveSizeList contains check is used to prevent
- // case where there is no entry in deletedTable, this
- // will throw NPE when we submit request.
- if (previousSnapshot != null && exclusiveSizeMap
- .containsKey(previousSnapshot.getTableKey())) {
- completedExclusiveSizeSet.add(
- previousSnapshot.getTableKey());
- }
-
- snapshotSeekMap.remove(currSnapInfo.getTableKey());
- } else {
- // There are keys that still needs processing
- // we can continue from it in the next iteration
- if (lastKeyInCurrentRun != null) {
- snapshotSeekMap.put(currSnapInfo.getTableKey(),
- lastKeyInCurrentRun);
- }
- }
-
- if (!keysToPurge.isEmpty()) {
- processKeyDeletes(keysToPurge,
- keysToModify, currSnapInfo.getTableKey(),
-
Optional.ofNullable(previousSnapshot).map(SnapshotInfo::getSnapshotId).orElse(null));
- }
- } finally {
- IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot);
+ }
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> omSnapshot =
snapInfo == null ? null :
+ omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(),
snapInfo.getBucketName(),
+ snapInfo.getName())) {
+ KeyManager keyManager = snapInfo == null ?
getOzoneManager().getKeyManager()
+ : omSnapshot.get().getKeyManager();
+ processDeletedKeysForStore(snapInfo, keyManager, remainNum);
+ }
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Error while running delete files background task for
store {}. Will retry at next run.",
+ snapInfo, e);
+ } finally {
+ if (snapshotId == null) {
+ isRunningOnAOS.set(false);
+ synchronized (deletingService) {
+ this.deletingService.notify();
}
}
-
}
}
-
- updateDeepCleanedSnapshots(deepCleanedSnapshots);
- updateSnapshotExclusiveSize();
- }
-
- private void updateSnapshotExclusiveSize() {
-
- if (completedExclusiveSizeSet.isEmpty()) {
- return;
- }
-
- Iterator<String> completedSnapshotIterator =
- completedExclusiveSizeSet.iterator();
- while (completedSnapshotIterator.hasNext()) {
- ClientId clientId = ClientId.randomId();
- String dbKey = completedSnapshotIterator.next();
- SnapshotSize snapshotSize = SnapshotSize.newBuilder()
- .setExclusiveSize(exclusiveSizeMap.getOrDefault(dbKey, 0L))
- .setExclusiveReplicatedSize(
- exclusiveReplicatedSizeMap.getOrDefault(dbKey, 0L))
- .build();
- SetSnapshotPropertyRequest setSnapshotPropertyRequest =
- SetSnapshotPropertyRequest.newBuilder()
- .setSnapshotKey(dbKey)
- .setSnapshotSize(snapshotSize)
- .build();
-
- OMRequest omRequest = OMRequest.newBuilder()
- .setCmdType(Type.SetSnapshotProperty)
- .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
- .setClientId(clientId.toString())
- .build();
- submitRequest(omRequest, clientId);
- exclusiveSizeMap.remove(dbKey);
- exclusiveReplicatedSizeMap.remove(dbKey);
- completedSnapshotIterator.remove();
- }
- }
-
- private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots)
{
- for (String deepCleanedSnapshot: deepCleanedSnapshots) {
- ClientId clientId = ClientId.randomId();
- SetSnapshotPropertyRequest setSnapshotPropertyRequest =
- SetSnapshotPropertyRequest.newBuilder()
- .setSnapshotKey(deepCleanedSnapshot)
- .setDeepCleanedDeletedKey(true)
- .build();
-
- OMRequest omRequest = OMRequest.newBuilder()
- .setCmdType(Type.SetSnapshotProperty)
- .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest)
- .setClientId(clientId.toString())
- .build();
-
- submitRequest(omRequest, clientId);
- }
- }
-
- public void submitRequest(OMRequest omRequest, ClientId clientId) {
- try {
- OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest,
clientId, getRunCount().get());
- } catch (ServiceException e) {
- LOG.error("Snapshot deep cleaning request failed. " +
- "Will retry at next run.", e);
- }
+ // By design, no one cares about the results of this call back.
+ return EmptyTaskResult.newResult();
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index 27416bc95f..9339e844f2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -93,6 +93,11 @@ public static SnapshotInfo getSnapshotInfo(OzoneManager
ozoneManager,
SnapshotChainManager chainManager,
UUID snapshotId) throws
IOException {
String tableKey = chainManager.getTableKey(snapshotId);
+ if (tableKey == null) {
+ LOG.error("Snapshot not found with UUID '{}'", snapshotId);
+ throw new OMException("Snapshot not found with UUID '" + snapshotId +
"'",
+ FILE_NOT_FOUND);
+ }
return SnapshotUtils.getSnapshotInfo(ozoneManager, tableKey);
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index a306ae1cf1..3b55255ee7 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -22,6 +22,7 @@
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -67,6 +68,7 @@
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OmTestManagers;
@@ -87,6 +89,7 @@
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.OzoneTestBase;
@@ -128,6 +131,7 @@ class TestKeyDeletingService extends OzoneTestBase {
private KeyManager keyManager;
private OMMetadataManager metadataManager;
private KeyDeletingService keyDeletingService;
+ private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
private ScmBlockLocationTestingClient scmBlockTestingClient;
@BeforeAll
@@ -143,6 +147,8 @@ private void createConfig(File testDir) {
100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
100, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL,
+ 100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL,
@@ -155,6 +161,7 @@ private void createSubject() throws Exception {
OmTestManagers omTestManagers = new OmTestManagers(conf,
scmBlockTestingClient, null);
keyManager = omTestManagers.getKeyManager();
keyDeletingService = keyManager.getDeletingService();
+ snapshotDirectoryCleaningService =
keyManager.getSnapshotDirectoryService();
writeClient = omTestManagers.getWriteClient();
om = omTestManagers.getOzoneManager();
metadataManager = omTestManagers.getMetadataManager();
@@ -207,7 +214,9 @@ void checkIfDeleteServiceIsDeletingKeys()
() -> getDeletedKeyCount() >= initialDeletedCount + keyCount,
100, 10000);
assertThat(getRunCount()).isGreaterThan(initialRunCount);
-
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+ assertThat(keyManager.getPendingDeletionKeys(new
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
null,
+ keyManager, om.getMetadataManager().getLock()),
Integer.MAX_VALUE).getKeyBlocksList())
.isEmpty();
}
@@ -236,7 +245,7 @@ void checkDeletionForKeysWithMultipleVersions() throws
Exception {
1000, 10000);
assertThat(getRunCount())
.isGreaterThan(initialRunCount);
-
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+ assertThat(keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE).getKeyBlocksList())
.isEmpty();
// The 1st version of the key has 1 block and the 2nd version has 2
@@ -278,7 +287,10 @@ void checkDeletedTableCleanUpForSnapshot() throws
Exception {
1000, 10000);
assertThat(getRunCount())
.isGreaterThan(initialRunCount);
-
assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList())
+ assertThat(keyManager.getPendingDeletionKeys(new
ReclaimableKeyFilter(om, om.getOmSnapshotManager(),
+
((OmMetadataManagerImpl)om.getMetadataManager()).getSnapshotChainManager(),
null,
+ keyManager, om.getMetadataManager().getLock()),
+ Integer.MAX_VALUE).getKeyBlocksList())
.isEmpty();
// deletedTable should have deleted key of the snapshot bucket
@@ -334,8 +346,9 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> {
return omSnapshotManager;
});
- KeyDeletingService service = new KeyDeletingService(ozoneManager,
scmBlockTestingClient, km, 10000,
- 100000, conf, false);
+ when(ozoneManager.getKeyManager()).thenReturn(km);
+ KeyDeletingService service = new KeyDeletingService(ozoneManager,
scmBlockTestingClient, 10000,
+ 100000, conf, 10, false);
service.shutdown();
final long initialSnapshotCount =
metadataManager.countRowsInTable(snapshotInfoTable);
final long initialDeletedCount =
metadataManager.countRowsInTable(deletedTable);
@@ -376,7 +389,7 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
}
}, 1000, 10000);
return i.callRealMethod();
- }).when(omSnapshotManager).getSnapshot(ArgumentMatchers.eq(volumeName),
ArgumentMatchers.eq(bucketName),
+
}).when(omSnapshotManager).getActiveSnapshot(ArgumentMatchers.eq(volumeName),
ArgumentMatchers.eq(bucketName),
ArgumentMatchers.eq(snap1));
assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1,
metadataManager);
doAnswer(i -> {
@@ -385,7 +398,7 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
Assertions.assertNotEquals(deletePathKey[0], group.getGroupID());
}
return pendingKeysDeletion;
- }).when(km).getPendingDeletionKeys(anyInt());
+ }).when(km).getPendingDeletionKeys(any(), anyInt());
service.runPeriodicalTaskNow();
service.runPeriodicalTaskNow();
assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2,
metadataManager);
@@ -582,9 +595,15 @@ void testSnapshotExclusiveSize() throws Exception {
// Create Snapshot4
String snap4 = uniqueObjectName("snap");
writeClient.createSnapshot(testVolumeName, testBucketName, snap4);
+ assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 4,
metadataManager);
createAndCommitKey(testVolumeName, testBucketName,
uniqueObjectName("key"), 3);
long prevKdsRunCount = getRunCount();
+ long prevSnapshotDirectorServiceCnt =
snapshotDirectoryCleaningService.getRunCount().get();
+ // Let SnapshotDirectoryCleaningService to run for some iterations
+ GenericTestUtils.waitFor(
+ () -> (snapshotDirectoryCleaningService.getRunCount().get() >
prevSnapshotDirectorServiceCnt + 20),
+ 100, 100000);
keyDeletingService.resume();
Map<String, Long> expectedSize = new ImmutableMap.Builder<String, Long>()
@@ -597,22 +616,23 @@ void testSnapshotExclusiveSize() throws Exception {
// Let KeyDeletingService to run for some iterations
GenericTestUtils.waitFor(
- () -> (getRunCount() > prevKdsRunCount + 5),
- 100, 10000);
-
+ () -> (getRunCount() > prevKdsRunCount + 20),
+ 100, 100000);
// Check if the exclusive size is set.
+ om.awaitDoubleBufferFlush();
try (TableIterator<String, ? extends Table.KeyValue<String,
SnapshotInfo>>
iterator = snapshotInfoTable.iterator()) {
while (iterator.hasNext()) {
Table.KeyValue<String, SnapshotInfo> snapshotEntry = iterator.next();
+ SnapshotInfo snapshotInfo =
om.getMetadataManager().getSnapshotInfoTable().get(snapshotEntry.getKey());
String snapshotName = snapshotEntry.getValue().getName();
- Long expected = expectedSize.getOrDefault(snapshotName, 0L);
+ Long expected = expectedSize.getOrDefault(snapshotName,
snapshotInfo.getExclusiveSize());
assertNotNull(expected);
System.out.println(snapshotName);
- assertEquals(expected, snapshotEntry.getValue().getExclusiveSize());
+ assertEquals(expected, snapshotInfo.getExclusiveSize());
// Since for the test we are using RATIS/THREE
- assertEquals(expected * 3,
snapshotEntry.getValue().getExclusiveReplicatedSize());
+ assertEquals(expected * 3,
snapshotInfo.getExclusiveReplicatedSize());
}
}
}
@@ -647,7 +667,7 @@ void cleanup() {
@Test
@DisplayName("Should not update keys when purge request times out during
key deletion")
- public void testFailingModifiedKeyPurge() throws IOException {
+ public void testFailingModifiedKeyPurge() throws IOException,
InterruptedException {
try (MockedStatic<OzoneManagerRatisUtils> mocked =
mockStatic(OzoneManagerRatisUtils.class,
CALLS_REAL_METHODS)) {
@@ -781,8 +801,7 @@ private static void checkSnapDeepCleanStatus(Table<String,
SnapshotInfo> table,
private static void assertTableRowCount(Table<String, ?> table,
long count, OMMetadataManager metadataManager)
throws TimeoutException, InterruptedException {
- GenericTestUtils.waitFor(() -> assertTableRowCount(count, table,
- metadataManager), 1000, 120000); // 2 minutes
+ GenericTestUtils.waitFor(() -> assertTableRowCount(count, table,
metadataManager), 1000, 120000); // 2 minutes
}
private static boolean assertTableRowCount(long expectedCount,
@@ -918,7 +937,7 @@ private long getRunCount() {
private int countKeysPendingDeletion() {
try {
- final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+ final int count = keyManager.getPendingDeletionKeys((kv) -> true,
Integer.MAX_VALUE)
.getKeyBlocksList().size();
LOG.debug("KeyManager keys pending deletion: {}", count);
return count;
@@ -929,7 +948,7 @@ private int countKeysPendingDeletion() {
private long countBlocksPendingDeletion() {
try {
- return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE)
+ return keyManager.getPendingDeletionKeys((kv) -> true, Integer.MAX_VALUE)
.getKeyBlocksList()
.stream()
.map(BlockGroup::getBlockIDList)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]