bharatviswa504 commented on a change in pull request #1557:
URL: https://github.com/apache/ozone/pull/1557#discussion_r522507971



##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
##########
@@ -696,4 +697,91 @@ public static String getAbsolutePath(String prefixName, 
String fileName) {
     }
     return prefixName.concat(OzoneConsts.OZONE_URI_DELIMITER).concat(fileName);
   }
+
+  /**
+   * Build DirectoryInfo from OmKeyInfo.
+   *
+   * @param keyInfo omKeyInfo
+   * @return omDirectoryInfo object
+   */
+  public static OmDirectoryInfo getDirectoryInfo(OmKeyInfo keyInfo){
+    OmDirectoryInfo.Builder builder = new OmDirectoryInfo.Builder();
+    builder.setParentObjectID(keyInfo.getParentObjectID());
+    builder.setAcls(keyInfo.getAcls());
+    builder.addAllMetadata(keyInfo.getMetadata());
+    builder.setCreationTime(keyInfo.getCreationTime());
+    builder.setModificationTime(keyInfo.getModificationTime());
+    builder.setObjectID(keyInfo.getObjectID());
+    builder.setUpdateID(keyInfo.getUpdateID());
+    builder.setName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
+    return builder.build();
+  }
+
+  /**
+   * Verify that the given toKey directory is a sub directory of fromKey
+   * directory.
+   * <p>
+   * For example, special case of renaming a directory to its own
+   * sub-directory is not allowed.
+   *
+   * @param fromKeyName source path
+   * @param toKeyName   destination path
+   * @throws OMException if the dest dir is a sub-dir of source dir.
+   */
+  public static void verifyToDirIsASubDirOfFromDirectory(String fromKeyName,
+      String toKeyName, boolean isDir) throws OMException {
+    if (!isDir) {
+      return;
+    }
+    Path dstParent = Paths.get(toKeyName).getParent();
+    while (dstParent != null) {
+      if (Paths.get(fromKeyName).equals(dstParent)) {
+        throw new OMException("Cannot rename a directory to its own " +
+                "subdirectory", OMException.ResultCodes.KEY_RENAME_ERROR);
+        // TODO: Existing rename throws java.lang.IllegalArgumentException.
+        //       Should we throw same exception ?
+      }
+      dstParent = dstParent.getParent();
+    }
+  }
+
+  /**
+   * Verify parent exists for the destination path and return destination
+   * path parent Id.
+   * <p>
+   * Check whether dst parent dir exists or not. If the parent exists, then the
+   * source can be renamed to dst path.
+   *
+   * @param volumeName  volume name
+   * @param bucketName  bucket name
+   * @param toKeyName   destination path
+   * @param fromKeyName source path
+   * @param metaMgr     metadata manager
+   * @throws IOException if the destination parent dir doesn't exists.
+   */
+  public static long getToKeyNameParentId(String volumeName,
+      String bucketName, String toKeyName, String fromKeyName,
+      OMMetadataManager metaMgr) throws IOException {
+
+    int totalDirsCount = OzoneFSUtils.getFileCount(toKeyName);
+    // skip parent is root '/'
+    if (totalDirsCount <= 1) {
+      String bucketKey = metaMgr.getBucketKey(volumeName, bucketName);
+      OmBucketInfo omBucketInfo =
+              metaMgr.getBucketTable().get(bucketKey);
+      return omBucketInfo.getObjectID();
+    }
+
+    String toKeyParentDir = OzoneFSUtils.getParentDir(toKeyName);
+
+    OzoneFileStatus toKeyParentDirStatus = getOMKeyInfoIfExists(metaMgr,
+            volumeName, bucketName, toKeyParentDir, 0);
+    // check if the immediate parent exists
+    if (toKeyParentDirStatus == null || toKeyParentDirStatus.isFile()) {
+      throw new OMException(String.format(

Review comment:
       Minor: When toKeyParentDirStatus null means intermediate directories 
does not exist. So, should we change the exception when it is file and null.

##########
File path: 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
##########
@@ -162,4 +162,46 @@ public static boolean isImmediateChild(String parentKey, 
String childKey) {
 
     return parentPath.equals(childParent);
   }
+
+  /**
+   * The function returns parent directory from the given absolute path. For
+   * example, the given key path '/a/b/c/d/e/file1' then it returns parent
+   * directory name as 'e'.
+   *
+   * @param keyName key name
+   */
+  public static String getParentDir(@Nonnull String keyName) {
+    java.nio.file.Path fileName = Paths.get(keyName).getParent();
+    if (fileName != null) {
+      return fileName.toString();
+    }
+    // failed to converts a path key
+    return keyName;

Review comment:
       Not got when parent is null, why we need to return keyName

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
##########
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles rename key request layout version V1.
+ */
+public class OMKeyRenameRequestV1 extends OMKeyRenameRequest {
+
+  private static final Logger LOG =
+          LoggerFactory.getLogger(OMKeyRenameRequestV1.class);
+
+  public OMKeyRenameRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    RenameKeyRequest renameKeyRequest = getOmRequest().getRenameKeyRequest();
+    KeyArgs keyArgs = renameKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildAuditMap(keyArgs, renameKeyRequest);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String fromKeyName = keyArgs.getKeyName();
+    String toKeyName = renameKeyRequest.getToKeyName();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumKeyRenames();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+            getOmRequest());
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean acquiredLock = false;
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+    OmKeyInfo fromKeyValue;
+    String fromKey = null;
+    Result result;
+    try {
+      if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
+        throw new OMException("Key name is empty",
+                OMException.ResultCodes.INVALID_KEY_NAME);
+      }
+
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      // check Acls to see if user has access to perform delete operation on
+      // old key and create operation on new key
+      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
+
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      // Check if fromKey exists
+      OzoneFileStatus fromKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager, volumeName,
+                      bucketName, fromKeyName, 0);
+      // case-1) fromKeyName should exist, otw throws exception
+      if (fromKeyFileStatus == null) {
+        // TODO: Add support for renaming open key
+        throw new OMException("Key not found " + fromKey, KEY_NOT_FOUND);
+      }
+
+      // source existed
+      fromKeyValue = fromKeyFileStatus.getKeyInfo();
+      boolean isRenameDirectory = fromKeyFileStatus.isDirectory();
+
+      // case-2) Cannot rename a directory to its own subdirectory
+      OMFileRequest.verifyToDirIsASubDirOfFromDirectory(fromKeyName,
+              toKeyName, fromKeyFileStatus.isDirectory());
+
+      OzoneFileStatus toKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                      volumeName, bucketName, toKeyName, 0);
+
+      // Check if toKey exists.
+      if(toKeyFileStatus != null) {
+        // Destination exists and following are different cases:
+        OmKeyInfo toKeyValue = toKeyFileStatus.getKeyInfo();
+
+        if (fromKeyValue.getKeyName().equals(toKeyValue.getKeyName())) {
+          // case-3) If src == destin then check source and destin of same type
+          // (a) If dst is a file then return true.
+          // (b) Otherwise throws exception.
+          // TODO: Discuss do we need to throw exception for file as well.
+          if (toKeyFileStatus.isFile()) {
+            result = Result.SUCCESS;
+          } else {
+            throw new OMException("Key already exists " + toKeyName,
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+        } else if (toKeyFileStatus.isDirectory()) {
+          // case-4) If dst is a directory then rename source as sub-path of it
+          // For example: rename /source to /dst will lead to /dst/source
+          String fromFileName = OzoneFSUtils.getFileName(fromKeyName);
+          String newToKeyName = OzoneFSUtils.appendFileNameToKeyPath(toKeyName,
+                  fromFileName);
+          OzoneFileStatus newToOzoneFileStatus =
+                  OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                          volumeName, bucketName, newToKeyName, 0);
+
+          if (newToOzoneFileStatus != null) {
+            // case-5) If new destin '/dst/source' exists then throws exception
+            throw new OMException(String.format(
+                    "Failed to rename %s to %s, file already exists or not " +
+                            "empty!", fromKeyName, newToKeyName),
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+
+          omClientResponse = renameKey(toKeyValue.getObjectID(), trxnLogIndex,
+                  fromKeyValue, isRenameDirectory, newToKeyName,
+                  keyArgs.getModificationTime(), omResponse, ozoneManager);
+          result = Result.SUCCESS;
+        } else {
+          // case-6) If destination is a file type and if exists then throws

Review comment:
       Okay reading more I see FS As RenameOptions only when OverWrite then it 
will do.
   With out any rename options, we fail if dest exists.
   Correct me if I am missing something here.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java
##########
@@ -696,4 +697,91 @@ public static String getAbsolutePath(String prefixName, 
String fileName) {
     }
     return prefixName.concat(OzoneConsts.OZONE_URI_DELIMITER).concat(fileName);
   }
+
+  /**
+   * Build DirectoryInfo from OmKeyInfo.
+   *
+   * @param keyInfo omKeyInfo
+   * @return omDirectoryInfo object
+   */
+  public static OmDirectoryInfo getDirectoryInfo(OmKeyInfo keyInfo){
+    OmDirectoryInfo.Builder builder = new OmDirectoryInfo.Builder();
+    builder.setParentObjectID(keyInfo.getParentObjectID());
+    builder.setAcls(keyInfo.getAcls());
+    builder.addAllMetadata(keyInfo.getMetadata());
+    builder.setCreationTime(keyInfo.getCreationTime());
+    builder.setModificationTime(keyInfo.getModificationTime());
+    builder.setObjectID(keyInfo.getObjectID());
+    builder.setUpdateID(keyInfo.getUpdateID());
+    builder.setName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
+    return builder.build();
+  }
+
+  /**
+   * Verify that the given toKey directory is a sub directory of fromKey
+   * directory.
+   * <p>
+   * For example, special case of renaming a directory to its own
+   * sub-directory is not allowed.
+   *
+   * @param fromKeyName source path
+   * @param toKeyName   destination path
+   * @throws OMException if the dest dir is a sub-dir of source dir.

Review comment:
       Minor: isDir is missing in javaDoc

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
##########
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles rename key request layout version V1.
+ */
+public class OMKeyRenameRequestV1 extends OMKeyRenameRequest {
+
+  private static final Logger LOG =
+          LoggerFactory.getLogger(OMKeyRenameRequestV1.class);
+
+  public OMKeyRenameRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    RenameKeyRequest renameKeyRequest = getOmRequest().getRenameKeyRequest();
+    KeyArgs keyArgs = renameKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildAuditMap(keyArgs, renameKeyRequest);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String fromKeyName = keyArgs.getKeyName();
+    String toKeyName = renameKeyRequest.getToKeyName();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumKeyRenames();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+            getOmRequest());
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean acquiredLock = false;
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+    OmKeyInfo fromKeyValue;
+    String fromKey = null;
+    Result result;
+    try {
+      if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
+        throw new OMException("Key name is empty",
+                OMException.ResultCodes.INVALID_KEY_NAME);
+      }
+
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      // check Acls to see if user has access to perform delete operation on
+      // old key and create operation on new key
+      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
+
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      // Check if fromKey exists
+      OzoneFileStatus fromKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager, volumeName,
+                      bucketName, fromKeyName, 0);
+      // case-1) fromKeyName should exist, otw throws exception
+      if (fromKeyFileStatus == null) {
+        // TODO: Add support for renaming open key
+        throw new OMException("Key not found " + fromKey, KEY_NOT_FOUND);
+      }
+
+      // source existed
+      fromKeyValue = fromKeyFileStatus.getKeyInfo();
+      boolean isRenameDirectory = fromKeyFileStatus.isDirectory();
+
+      // case-2) Cannot rename a directory to its own subdirectory
+      OMFileRequest.verifyToDirIsASubDirOfFromDirectory(fromKeyName,
+              toKeyName, fromKeyFileStatus.isDirectory());
+
+      OzoneFileStatus toKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                      volumeName, bucketName, toKeyName, 0);
+
+      // Check if toKey exists.
+      if(toKeyFileStatus != null) {
+        // Destination exists and following are different cases:
+        OmKeyInfo toKeyValue = toKeyFileStatus.getKeyInfo();
+
+        if (fromKeyValue.getKeyName().equals(toKeyValue.getKeyName())) {
+          // case-3) If src == destin then check source and destin of same type
+          // (a) If dst is a file then return true.
+          // (b) Otherwise throws exception.
+          // TODO: Discuss do we need to throw exception for file as well.
+          if (toKeyFileStatus.isFile()) {
+            result = Result.SUCCESS;
+          } else {
+            throw new OMException("Key already exists " + toKeyName,
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+        } else if (toKeyFileStatus.isDirectory()) {
+          // case-4) If dst is a directory then rename source as sub-path of it
+          // For example: rename /source to /dst will lead to /dst/source
+          String fromFileName = OzoneFSUtils.getFileName(fromKeyName);
+          String newToKeyName = OzoneFSUtils.appendFileNameToKeyPath(toKeyName,
+                  fromFileName);
+          OzoneFileStatus newToOzoneFileStatus =
+                  OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                          volumeName, bucketName, newToKeyName, 0);
+
+          if (newToOzoneFileStatus != null) {
+            // case-5) If new destin '/dst/source' exists then throws exception
+            throw new OMException(String.format(
+                    "Failed to rename %s to %s, file already exists or not " +
+                            "empty!", fromKeyName, newToKeyName),
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+
+          omClientResponse = renameKey(toKeyValue.getObjectID(), trxnLogIndex,
+                  fromKeyValue, isRenameDirectory, newToKeyName,
+                  keyArgs.getModificationTime(), omResponse, ozoneManager);
+          result = Result.SUCCESS;
+        } else {
+          // case-6) If destination is a file type and if exists then throws

Review comment:
       General mv supports it, 
   $ mv /tmp/f1 /Users/bviswanadham/f1
   
   Is this semantics of rename API in Fs?

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestV1.java
##########
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyRenameResponseV1;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles rename key request layout version V1.
+ */
+public class OMKeyRenameRequestV1 extends OMKeyRenameRequest {
+
+  private static final Logger LOG =
+          LoggerFactory.getLogger(OMKeyRenameRequestV1.class);
+
+  public OMKeyRenameRequestV1(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+
+    RenameKeyRequest renameKeyRequest = getOmRequest().getRenameKeyRequest();
+    KeyArgs keyArgs = renameKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildAuditMap(keyArgs, renameKeyRequest);
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String fromKeyName = keyArgs.getKeyName();
+    String toKeyName = renameKeyRequest.getToKeyName();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumKeyRenames();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+            getOmRequest());
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean acquiredLock = false;
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+    OmKeyInfo fromKeyValue;
+    String fromKey = null;
+    Result result;
+    try {
+      if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
+        throw new OMException("Key name is empty",
+                OMException.ResultCodes.INVALID_KEY_NAME);
+      }
+
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      // check Acls to see if user has access to perform delete operation on
+      // old key and create operation on new key
+      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
+
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      // Check if fromKey exists
+      OzoneFileStatus fromKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager, volumeName,
+                      bucketName, fromKeyName, 0);
+      // case-1) fromKeyName should exist, otw throws exception
+      if (fromKeyFileStatus == null) {
+        // TODO: Add support for renaming open key
+        throw new OMException("Key not found " + fromKey, KEY_NOT_FOUND);
+      }
+
+      // source existed
+      fromKeyValue = fromKeyFileStatus.getKeyInfo();
+      boolean isRenameDirectory = fromKeyFileStatus.isDirectory();
+
+      // case-2) Cannot rename a directory to its own subdirectory
+      OMFileRequest.verifyToDirIsASubDirOfFromDirectory(fromKeyName,
+              toKeyName, fromKeyFileStatus.isDirectory());
+
+      OzoneFileStatus toKeyFileStatus =
+              OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                      volumeName, bucketName, toKeyName, 0);
+
+      // Check if toKey exists.
+      if(toKeyFileStatus != null) {
+        // Destination exists and following are different cases:
+        OmKeyInfo toKeyValue = toKeyFileStatus.getKeyInfo();
+
+        if (fromKeyValue.getKeyName().equals(toKeyValue.getKeyName())) {
+          // case-3) If src == destin then check source and destin of same type
+          // (a) If dst is a file then return true.
+          // (b) Otherwise throws exception.
+          // TODO: Discuss do we need to throw exception for file as well.
+          if (toKeyFileStatus.isFile()) {
+            result = Result.SUCCESS;
+          } else {
+            throw new OMException("Key already exists " + toKeyName,
+                    OMException.ResultCodes.KEY_ALREADY_EXISTS);
+          }
+        } else if (toKeyFileStatus.isDirectory()) {
+          // case-4) If dst is a directory then rename source as sub-path of it
+          // For example: rename /source to /dst will lead to /dst/source
+          String fromFileName = OzoneFSUtils.getFileName(fromKeyName);
+          String newToKeyName = OzoneFSUtils.appendFileNameToKeyPath(toKeyName,
+                  fromFileName);
+          OzoneFileStatus newToOzoneFileStatus =
+                  OMFileRequest.getOMKeyInfoIfExists(omMetadataManager,
+                          volumeName, bucketName, newToKeyName, 0);
+
+          if (newToOzoneFileStatus != null) {
+            // case-5) If new destin '/dst/source' exists then throws exception

Review comment:
       Same for directory also.

##########
File path: 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemV1.java
##########
@@ -260,6 +260,160 @@ private void testListFilesRecursive() throws Exception {
             expectedFilesCount, actualCount);
   }
 
+  /**
+   * Case-1) fromKeyName should exist, otw throws exception.
+   */
+  protected void testRenameWithNonExistentSource() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final String dir2 = root + "/dir2";
+    final Path source = new Path(fs.getUri().toString() + dir1);
+    final Path destin = new Path(fs.getUri().toString() + dir2);
+
+    // creates destin
+    fs.mkdirs(destin);
+    LOG.info("Created destin dir: {}", destin);
+
+    LOG.info("Rename op-> source:{} to destin:{}}", source, destin);
+    try {
+      fs.rename(source, destin);
+      Assert.fail("Should throw exception : Source doesn't exist!");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_NOT_FOUND);
+    }
+  }
+
+  /**
+   * Case-2) Cannot rename a directory to its own subdirectory.
+   */
+  protected void testRenameDirToItsOwnSubDir() throws Exception {
+    final String root = "/root";
+    final String dir1 = root + "/dir1";
+    final Path dir1Path = new Path(fs.getUri().toString() + dir1);
+    // Add a sub-dir1 to the directory to be moved.
+    final Path subDir1 = new Path(dir1Path, "sub_dir1");
+    fs.mkdirs(subDir1);
+    LOG.info("Created dir1 {}", subDir1);
+
+    final Path sourceRoot = new Path(fs.getUri().toString() + root);
+    LOG.info("Rename op-> source:{} to destin:{}", sourceRoot, subDir1);
+    try {
+      fs.rename(sourceRoot, subDir1);
+      Assert.fail("Should throw exception : Cannot rename a directory to" +
+              " its own subdirectory");
+    } catch (OMException ome) {
+      // expected
+      assertEquals(ome.getResult(), OMException.ResultCodes.KEY_RENAME_ERROR);
+    }
+  }
+
+  /**
+   * Case-5) If new destin '/dst/source' exists then throws exception.
+   * If destination is a directory then rename source as sub-path of it.
+   * <p>
+   * For example: rename /a to /b will lead to /b/a. This new path should
+   * not exist.
+   */
+  protected void testRenameToNewSubDirShouldNotExist() throws Exception {
+    // Case-5.a) Rename directory from /a to /b.
+    // created /a
+    final Path aSourcePath = new Path(fs.getUri().toString() + "/a");
+    fs.mkdirs(aSourcePath);
+
+    // created /b
+    final Path bDestinPath = new Path(fs.getUri().toString() + "/b");
+    fs.mkdirs(bDestinPath);
+
+    // Add a sub-directory '/a/c' to '/a'. This is to verify that after

Review comment:
       Minor: Test is doing different, comment says different




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to