mukul1987 commented on a change in pull request #718: HDDS-1301. Optimize
recursive ozone filesystem apis
URL: https://github.com/apache/hadoop/pull/718#discussion_r275350966
##########
File path:
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
##########
@@ -1546,6 +1559,358 @@ public OmKeyInfo lookupFile(OmKeyArgs args) throws
IOException {
ResultCodes.NOT_A_FILE);
}
+ /**
+ * Rename input file or directory.
+ *
+ * @param args Key args
+ * @param toKeyName new name for the file or directory. In case of a file it
+ * is the new absolute path for the file. In case of a
directory
+ * it is the absolute path of the new parent.
+ * @throws IOException if there is a rename from/to root
+ * For a file rename, if file or folder already exists at
+ * the destination
+ * For a directory rename, if file exists or destination
+ * directory is not empty or rename to a subdirectory
+ */
+ public void rename(OmKeyArgs args, String toKeyName) throws IOException {
+ Preconditions.checkNotNull(args, "Key args can not be null");
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
+
+ if (args.getKeyName().length() == 0 || toKeyName.length() == 0) {
+ throw new OMException("Can not rename from/to root directory",
+ ResultCodes.ROOT_DIRECTORY_RENAME_NOT_ALLOWED);
+ }
+
+ try {
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+ OzoneFileStatus fromKeyStatus = getFileStatus(args);
+ OzoneFileStatus toKeyStatus = null;
+ OmKeyArgs toKeyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setKeyName(toKeyName).build();
+ try {
+ toKeyStatus = getFileStatus(toKeyArgs);
+ } catch (OMException e) {
+ if (e.getResult() != ResultCodes.FILE_NOT_FOUND) {
+ throw e;
+ }
+ }
+ if (fromKeyStatus.isFile()) {
+ renameFile(volumeName, bucketName, fromKeyStatus, toKeyStatus,
+ toKeyName);
+ } else if (fromKeyStatus.isDirectory()) {
+ renameDirectory(volumeName, bucketName, toKeyStatus,
+ OzoneFSUtils.addTrailingSlashIfNeeded(keyName),
+ OzoneFSUtils.addTrailingSlashIfNeeded(toKeyName));
+ }
+ } finally {
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+ }
+ }
+
+ private void renameDirectory(String volumeName, String bucketName,
+ OzoneFileStatus toKeyStatus, String fromKeyName, String toKeyName)
+ throws IOException {
+ if (fromKeyName.equals(toKeyName)) {
+ return;
+ }
+ if (toKeyStatus != null) {
+ if (toKeyStatus.isFile()) {
+ throw new OMException(
+ "Can not rename. File exists at destination " + toKeyName,
+ ResultCodes.FILE_ALREADY_EXISTS);
+ } else if (toKeyStatus.isDirectory()) {
+ if (!isDirectoryEmpty(volumeName, bucketName, toKeyName)) {
+ throw new OMException(
+ "Can not rename. Destination directory " + toKeyName
+ + " not empty", ResultCodes.DIRECTORY_NOT_EMPTY);
+ }
+ }
+ }
+ String fromParent = OzoneFSUtils.getParent(fromKeyName);
+ toKeyName = toKeyStatus != null ?
+ toKeyName + fromKeyName.substring(fromParent.length()) : toKeyName;
+ renamePrefix(volumeName, bucketName, fromKeyName, toKeyName);
+ }
+
+ private void renamePrefix(String volumeName, String bucketName,
+ String fromPrefix, String toPrefix) throws IOException {
+ if (toPrefix.startsWith(fromPrefix)) {
+ throw new OMException("Can not rename to subdirectory",
+ ResultCodes.RENAME_TO_SUB_DIRECTORY_NOT_ALLOWED);
+ }
+ BatchOperation batch = metadataManager.getStore().initBatchOperation();
+ TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+ iterator = metadataManager.getKeyTable().iterator();
+ String fromPrefixInDb =
+ metadataManager.getOzoneKey(volumeName, bucketName, fromPrefix);
+ iterator.seek(fromPrefixInDb);
+ while (iterator.hasNext()) {
+ Table.KeyValue<String, OmKeyInfo> kv = iterator.next();
+ String key = kv.getKey();
+ if (key.startsWith(fromPrefixInDb)) {
+ renameKey(volumeName, bucketName, kv.getValue().getKeyName(),
+ toPrefix + key.substring(fromPrefixInDb.length()), kv.getValue(),
+ batch);
+ }
+ }
+ metadataManager.getStore().commitBatchOperation(batch);
+ }
+
+ private void renameFile(String volumeName, String bucketName,
+ OzoneFileStatus fromKeyStatus, OzoneFileStatus toKeyStatus,
+ String toKeyName) throws IOException {
+ Preconditions.checkArgument(!toKeyName.endsWith(OZONE_URI_DELIMITER),
+ "File can not be created with / suffix: " + toKeyName);
+ String fromKeyName = fromKeyStatus.getKeyInfo().getKeyName();
+ if (fromKeyName.equals(toKeyName)) {
+ return;
+ }
+ if (toKeyStatus != null) {
+ throw new OMException(
+ "Can not rename file to an existing entry at " + toKeyName,
+ toKeyStatus.isFile() ? ResultCodes.FILE_ALREADY_EXISTS :
+ ResultCodes.DIRECTORY_ALREADY_EXISTS);
+ }
+ String toParent = OzoneFSUtils.getParent(toKeyName);
+ OmKeyArgs toParentKeyArgs = new
OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setKeyName(toParent).build();
+ getFileStatus(toParentKeyArgs);
+ BatchOperation batch = metadataManager.getStore().initBatchOperation();
+ renameKey(volumeName, bucketName, fromKeyName, toKeyName,
+ fromKeyStatus.getKeyInfo(), batch);
+ metadataManager.getStore().commitBatchOperation(batch);
+ }
+
+ private void renameKey(String volumeName, String bucketName, String key,
+ String toKey, OmKeyInfo value, BatchOperation batch) throws IOException {
+ value.setKeyName(toKey);
+ value.updateModifcationTime();
+ String oldOzoneKey =
+ metadataManager.getOzoneKey(volumeName, bucketName, key);
+ String newOzoneKey =
+ metadataManager.getOzoneKey(volumeName, bucketName, toKey);
+ metadataManager.getKeyTable().deleteWithBatch(batch, oldOzoneKey);
+ metadataManager.getKeyTable().putWithBatch(batch, newOzoneKey, value);
+ }
+
+ private boolean isDirectoryEmpty(String volumeName, String bucketName,
+ String keyName) throws IOException {
+ try {
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+
+ List<OmKeyInfo> keys =
+ metadataManager.listKeys(volumeName, bucketName, null, keyName, 2);
+ return keys.size() != 2;
+ } finally {
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+ }
+ }
+
+ /**
+ * Deletes the input file or directory
+ *
+ * @param args Key args
+ * @param recursive For deletion of non-empty directory recursive flag must
+ * be true
+ * @throws IOException if deletion for a root
+ * if recursive flag is false and directory is not empty
+ */
+ public void delete(OmKeyArgs args, boolean recursive) throws IOException {
+ // TODO: add recursive option
Review comment:
This has been implemented
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]