sadanand48 commented on code in PR #4675:
URL: https://github.com/apache/ozone/pull/4675#discussion_r1187328558
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/DeleteKeyHandler.java:
##########
@@ -45,6 +64,81 @@ protected void execute(OzoneClient client, OzoneAddress
address)
OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = vol.getBucket(bucketName);
- bucket.deleteKey(keyName);
+
+ float hadoopTrashInterval = getConf().getFloat(
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+
+ long trashInterval =
+ (long) (getConf().getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
+ hadoopTrashInterval) * 10000);
+
+ // If Bucket layout is FSO and Trash is enabled
+ // In this case during delete operation move key to trash
+ if (bucket.getBucketLayout().isFileSystemOptimized() && trashInterval > 0
+ && !skipTrash && !keyName.contains(".Trash")) {
+
+ keyName = OzoneFSUtils.removeTrailingSlashIfNeeded(keyName);
+ try {
+ // Check if key exists in Ozone
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ if (key == null) {
+ out().printf("Key not found %s", keyName);
+ return;
+ }
+ // Check whether directory is empty or not
+ Iterator<? extends OzoneKey> ozoneKeyIterator =
+ bucket.listKeys(keyName, keyName);
+ int count = 0;
+ while (ozoneKeyIterator.hasNext()) {
+ ozoneKeyIterator.next();
+ if (++count > 1) {
+ // Assume FSO Tree: /a/b1/c1/k1.txt
+ // And we are trying to delete key /a/b1/c1
+ // In this case count is 2 which is greater than 1
+ // /a/b1/c1/ and /a/b1/c1/k1.txt
+ out().printf("Directory is not empty");
+ return;
+ }
+ }
+ } catch (Exception e) {
+ out().printf("Key not found %s", keyName);
+ return;
+ }
+
+ final String username =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
+ Path userTrash = new Path(trashRoot, username);
+ Path userTrashCurrent = new Path(userTrash, CURRENT);
+
+ String trashDirectory = (keyName.contains("/")
+ ? new Path(userTrashCurrent, keyName.substring(0,
+ keyName.lastIndexOf("/")))
+ : userTrashCurrent).toUri().getPath();
+
+ String toKeyName = new Path(userTrashCurrent, keyName).toUri().getPath();
+ OzoneKeyDetails toKeyDetails = null;
+ try {
+ // check whether key already exist in trash
+ toKeyDetails = bucket.getKey(toKeyName);
+ } catch (IOException e) {
+ // Key doesn't exist inside trash.
+ }
+
+ if (toKeyDetails != null) {
+ // if key(directory) already exist in trash, just delete the key
Review Comment:
Not sure about this, we don't follow the same behaviour with filesystem
trash, after deleting the key once , it should not be possible to the user to
delete again, we have skipTrash for the same reason. If user wants to delete it
from trash then the path must be given accordingly.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/DeleteKeyHandler.java:
##########
@@ -45,6 +64,81 @@ protected void execute(OzoneClient client, OzoneAddress
address)
OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = vol.getBucket(bucketName);
- bucket.deleteKey(keyName);
+
+ float hadoopTrashInterval = getConf().getFloat(
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+
+ long trashInterval =
+ (long) (getConf().getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
+ hadoopTrashInterval) * 10000);
+
+ // If Bucket layout is FSO and Trash is enabled
+ // In this case during delete operation move key to trash
+ if (bucket.getBucketLayout().isFileSystemOptimized() && trashInterval > 0
+ && !skipTrash && !keyName.contains(".Trash")) {
Review Comment:
TRASH_PREFIX instead of ".Trash"
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/DeleteKeyHandler.java:
##########
@@ -45,6 +64,81 @@ protected void execute(OzoneClient client, OzoneAddress
address)
OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = vol.getBucket(bucketName);
- bucket.deleteKey(keyName);
+
+ float hadoopTrashInterval = getConf().getFloat(
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+
+ long trashInterval =
+ (long) (getConf().getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
+ hadoopTrashInterval) * 10000);
+
+ // If Bucket layout is FSO and Trash is enabled
+ // In this case during delete operation move key to trash
+ if (bucket.getBucketLayout().isFileSystemOptimized() && trashInterval > 0
Review Comment:
Can we add all this logic to a separate method to handle FSO key deletes?
something like deleteFSOKey()
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/DeleteKeyHandler.java:
##########
@@ -45,6 +64,81 @@ protected void execute(OzoneClient client, OzoneAddress
address)
OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = vol.getBucket(bucketName);
- bucket.deleteKey(keyName);
+
+ float hadoopTrashInterval = getConf().getFloat(
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+
+ long trashInterval =
+ (long) (getConf().getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
+ hadoopTrashInterval) * 10000);
+
+ // If Bucket layout is FSO and Trash is enabled
+ // In this case during delete operation move key to trash
+ if (bucket.getBucketLayout().isFileSystemOptimized() && trashInterval > 0
+ && !skipTrash && !keyName.contains(".Trash")) {
+
+ keyName = OzoneFSUtils.removeTrailingSlashIfNeeded(keyName);
+ try {
+ // Check if key exists in Ozone
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ if (key == null) {
+ out().printf("Key not found %s", keyName);
+ return;
+ }
+ // Check whether directory is empty or not
+ Iterator<? extends OzoneKey> ozoneKeyIterator =
+ bucket.listKeys(keyName, keyName);
Review Comment:
bucket.listKeys() by default will return 1000 entries in each RPC. This
might be a little unnecessary as we only want to check if the directory is
empty or not here, Even there is a single key with given prefix, it should
conclude that directory is non-empty.
1. I couldn't find an API for listKeys with maxEntries param but instead
you could use
``` java
public List<OzoneFileStatus> listStatus(String keyName, boolean recursive,
String startKey, long numEntries) throws IOException
```
and pass numEntries as 1.
3. To check if it is directory you could use `
bucket.getFileStatus(key).isDirectory();`
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/DeleteKeyHandler.java:
##########
@@ -45,6 +64,81 @@ protected void execute(OzoneClient client, OzoneAddress
address)
OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = vol.getBucket(bucketName);
- bucket.deleteKey(keyName);
+
+ float hadoopTrashInterval = getConf().getFloat(
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+
+ long trashInterval =
+ (long) (getConf().getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
+ hadoopTrashInterval) * 10000);
+
+ // If Bucket layout is FSO and Trash is enabled
+ // In this case during delete operation move key to trash
+ if (bucket.getBucketLayout().isFileSystemOptimized() && trashInterval > 0
+ && !skipTrash && !keyName.contains(".Trash")) {
+
+ keyName = OzoneFSUtils.removeTrailingSlashIfNeeded(keyName);
+ try {
+ // Check if key exists in Ozone
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ if (key == null) {
+ out().printf("Key not found %s", keyName);
+ return;
+ }
+ // Check whether directory is empty or not
+ Iterator<? extends OzoneKey> ozoneKeyIterator =
+ bucket.listKeys(keyName, keyName);
+ int count = 0;
+ while (ozoneKeyIterator.hasNext()) {
+ ozoneKeyIterator.next();
+ if (++count > 1) {
+ // Assume FSO Tree: /a/b1/c1/k1.txt
+ // And we are trying to delete key /a/b1/c1
+ // In this case count is 2 which is greater than 1
+ // /a/b1/c1/ and /a/b1/c1/k1.txt
+ out().printf("Directory is not empty");
+ return;
+ }
+ }
+ } catch (Exception e) {
+ out().printf("Key not found %s", keyName);
+ return;
+ }
+
+ final String username =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
+ Path userTrash = new Path(trashRoot, username);
+ Path userTrashCurrent = new Path(userTrash, CURRENT);
+
+ String trashDirectory = (keyName.contains("/")
+ ? new Path(userTrashCurrent, keyName.substring(0,
+ keyName.lastIndexOf("/")))
+ : userTrashCurrent).toUri().getPath();
+
+ String toKeyName = new Path(userTrashCurrent, keyName).toUri().getPath();
+ OzoneKeyDetails toKeyDetails = null;
+ try {
+ // check whether key already exist in trash
+ toKeyDetails = bucket.getKey(toKeyName);
+ } catch (IOException e) {
+ // Key doesn't exist inside trash.
+ }
+
+ if (toKeyDetails != null) {
+ // if key(directory) already exist in trash, just delete the key
+ bucket.deleteKey(keyName);
+ return;
+ }
+ // Create directory inside trash
+ bucket.createDirectory(trashDirectory);
Review Comment:
Do we need to manually create this , Doesn't rename take care of creating
missing parent directories for FSO keys. I think it does.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java:
##########
@@ -1124,6 +1129,127 @@ public void
testCreateBucketWithECReplicationConfigWithoutReplicationParam() {
}
}
+
+ @Test
+ public void testKeyDeleteOrSkipTrashWhenTrashEnableFSO()
+ throws UnsupportedEncodingException {
+ // Create 100 keys
+ generateKeys("/volumefso1", "/bucket1",
+ BucketLayout.FILE_SYSTEM_OPTIMIZED.toString());
+
+ // Enable trash
+ String trashConfKey = generateSetConfString(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY, "1");
+ String[] args =
+ new String[] {trashConfKey, "key", "delete",
+ "/volumefso1/bucket1/key4"};
+
+ // Delete one key from FSO bucket
+ execute(ozoneShell, args);
+
+ // Get key list in .Trash path
+ String prefixKey = "--prefix=.Trash";
+ args = new String[] {"key", "list", prefixKey, "o3://" +
+ omServiceId + "/volumefso1/bucket1/"};
+ out.reset();
+ execute(ozoneShell, args);
+
+ // One key should be present in .Trash
+ Assert.assertEquals(1, getNumOfKeys());
+
+ args = new String[] {"key", "list", "o3://" + omServiceId +
+ "/volumefso1/bucket1/", "-l ", "110"};
+ out.reset();
+ execute(ozoneShell, args);
+
+ // Total number of keys still 100.
+ Assert.assertEquals(100, getNumOfKeys());
+
+ // Skip Trash
+ args = new String[] {trashConfKey, "key", "delete",
+ "/volumefso1/bucket1/key5", "--skipTrash"};
+ execute(ozoneShell, args);
+
+ // .Trash should still contain 1 key
+ prefixKey = "--prefix=.Trash";
+ args = new String[] {"key", "list", prefixKey, "o3://" +
+ omServiceId + "/volumefso1/bucket1/"};
+ out.reset();
+ execute(ozoneShell, args);
+ Assert.assertEquals(1, getNumOfKeys());
+
+ args = new String[] {"key", "list", "o3://" + omServiceId +
+ "/volumefso1/bucket1/", "-l ", "110"};
+ out.reset();
+ execute(ozoneShell, args);
+ // Total number of keys now will be 99 as
+ // 1 key deleted without trash
+ Assert.assertEquals(99, getNumOfKeys());
Review Comment:
maybe add a small testcase to check delete key from trash should be a no op.
then delete key from trash with skipTrash should actually delete it
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/DeleteKeyHandler.java:
##########
@@ -45,6 +64,81 @@ protected void execute(OzoneClient client, OzoneAddress
address)
OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
OzoneBucket bucket = vol.getBucket(bucketName);
- bucket.deleteKey(keyName);
+
+ float hadoopTrashInterval = getConf().getFloat(
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+
+ long trashInterval =
+ (long) (getConf().getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
+ hadoopTrashInterval) * 10000);
+
+ // If Bucket layout is FSO and Trash is enabled
+ // In this case during delete operation move key to trash
+ if (bucket.getBucketLayout().isFileSystemOptimized() && trashInterval > 0
Review Comment:
Can we add all this logic to a separate method to handle FSO key deletes?
something like :
```
if (bucket.isFSO()) {
deleteFSOKey();
} else {
bucket.deleteKey();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]