This is an automated email from the ASF dual-hosted git repository.

duong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fa3cdc46c2 HDDS-8765. OzoneManagerLock to use striped locks to improve 
performance. (#4885)
fa3cdc46c2 is described below

commit fa3cdc46c2026162525f7fbdf6f4beb4c1fe7121
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Sep 7 13:03:26 2023 -0700

    HDDS-8765. OzoneManagerLock to use striped locks to improve performance. 
(#4885)
---
 .../org/apache/hadoop/hdds/utils/CompositeKey.java |  70 ++++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../hadoop/ozone/om/lock/IOzoneManagerLock.java    |  52 +--
 .../hadoop/ozone/om/lock/OmReadOnlyLock.java       | 104 +----
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     | 462 ++++++---------------
 .../hadoop/ozone/om/lock/TestKeyPathLock.java      |  40 +-
 .../hadoop/ozone/om/lock/TestOzoneManagerLock.java | 333 +++++----------
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |   9 +-
 .../om/hashcodegenerator/OMHashCodeGenerator.java  |  33 --
 .../StringOMHashCodeGeneratorImpl.java             |  31 --
 .../ozone/om/hashcodegenerator/package-info.java   |  23 -
 .../ozone/om/lock/OBSKeyPathLockStrategy.java      |  34 +-
 12 files changed, 335 insertions(+), 861 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/CompositeKey.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/CompositeKey.java
new file mode 100644
index 0000000000..32a4ca924a
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/CompositeKey.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import java.util.Arrays;
+
+/**
+ * This is a utility to combine multiple objects as a key that can be used in
+ * hash map access. The advantage of this is that it is cheap in comparison
+ * to other methods like string concatenation.
+ *
+ * For example, if a composition of volume, bucket and key is needed to
+ * access a hash map, the natural method is:
+ * <pre> {@code
+ * String key = "/" + volume + "/" + bucket + "/" + key.
+ * map.put(key, value);
+ * }</pre>
+ * This is costly because it creates (and stores) a new buffer.
+ *
+ * In comparison, the following achieve the same logic without creating any new
+ * buffer.
+ * <pre> {@code
+ * Object key = combineKeys(volume, bucket, key).
+ * map.put(key, value);
+ * }</pre>
+ *
+ */
+public final class CompositeKey {
+  private final int hashCode;
+  private final Object[] components;
+
+  CompositeKey(Object[] components) {
+    this.components = components;
+    this.hashCode = Arrays.hashCode(components);
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof CompositeKey)) {
+      return false;
+    }
+    CompositeKey other = (CompositeKey) obj;
+    return Arrays.equals(components, other.components);
+  }
+
+  public static Object combineKeys(Object[] components) {
+    return components.length == 1 ?
+        components[0] : new CompositeKey(components);
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index bcfc07e943..9cf74c6cd8 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -519,6 +519,11 @@ public final class OzoneConfigKeys {
   public static final String OZONE_MANAGER_FAIR_LOCK = "ozone.om.lock.fair";
   public static final boolean OZONE_MANAGER_FAIR_LOCK_DEFAULT = false;
 
+  public static final String OZONE_MANAGER_STRIPED_LOCK_SIZE_PREFIX =
+      "ozone.om.lock.stripes.";
+
+  public static final int OZONE_MANAGER_STRIPED_LOCK_SIZE_DEFAULT = 512;
+
   public static final String OZONE_CLIENT_LIST_TRASH_KEYS_MAX =
       "ozone.client.list.trash.keys.max";
   public static final int OZONE_CLIENT_LIST_TRASH_KEYS_MAX_DEFAULT = 1000;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
index 5fc7f5e4ec..3526d947be 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
@@ -23,24 +23,13 @@ import com.google.common.annotations.VisibleForTesting;
  * Interface for OM Metadata locks.
  */
 public interface IOzoneManagerLock {
-  @Deprecated
-  boolean acquireLock(OzoneManagerLock.Resource resource, String... resources);
 
   boolean acquireReadLock(OzoneManagerLock.Resource resource,
                           String... resources);
 
-  boolean acquireReadHashedLock(OzoneManagerLock.Resource resource,
-                           String resourceName);
-
   boolean acquireWriteLock(OzoneManagerLock.Resource resource,
                            String... resources);
 
-  boolean acquireWriteHashedLock(OzoneManagerLock.Resource resource,
-                                 String resourceName);
-
-  String generateResourceName(OzoneManagerLock.Resource resource,
-                        String... resources);
-
   boolean acquireMultiUserLock(String firstUser, String secondUser);
 
   void releaseMultiUserLock(String firstUser, String secondUser);
@@ -48,49 +37,20 @@ public interface IOzoneManagerLock {
   void releaseWriteLock(OzoneManagerLock.Resource resource,
                         String... resources);
 
-  void releaseWriteHashedLock(OzoneManagerLock.Resource resource,
-                        String resourceName);
-
   void releaseReadLock(OzoneManagerLock.Resource resource, String... 
resources);
 
-  void releaseReadHashedLock(OzoneManagerLock.Resource resource,
-                        String resourceName);
-
-  @Deprecated
-  void releaseLock(OzoneManagerLock.Resource resource, String... resources);
-
-  @VisibleForTesting
-  int getReadHoldCount(String resourceName);
-
-  @VisibleForTesting
-  String getReadLockWaitingTimeMsStat();
-
   @VisibleForTesting
-  long getLongestReadLockWaitingTimeMs();
+  int getReadHoldCount(OzoneManagerLock.Resource resource,
+      String... resources);
 
-  @VisibleForTesting
-  String getReadLockHeldTimeMsStat();
-
-  @VisibleForTesting
-  long getLongestReadLockHeldTimeMs();
-
-  @VisibleForTesting
-  int getWriteHoldCount(String resourceName);
-
-  @VisibleForTesting
-  boolean isWriteLockedByCurrentThread(String resourceName);
-
-  @VisibleForTesting
-  String getWriteLockWaitingTimeMsStat();
-
-  @VisibleForTesting
-  long getLongestWriteLockWaitingTimeMs();
 
   @VisibleForTesting
-  String getWriteLockHeldTimeMsStat();
+  int getWriteHoldCount(OzoneManagerLock.Resource resource,
+      String... resources);
 
   @VisibleForTesting
-  long getLongestWriteLockHeldTimeMs();
+  boolean isWriteLockedByCurrentThread(OzoneManagerLock.Resource resource,
+      String... resources);
 
   void cleanup();
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
index dc606bf13f..c46b494f52 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
@@ -17,48 +17,25 @@
 
 package org.apache.hadoop.ozone.om.lock;
 
+import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource;
+
 /**
  * Read only "lock" for snapshots
- * Uses no lock.  Always returns true when aquiring
+ * Uses no lock.  Always returns true when acquiring
  * read lock and false for write locks
  */
 public class OmReadOnlyLock implements IOzoneManagerLock {
-  @Override
-  public boolean acquireLock(OzoneManagerLock.Resource resource,
-                             String... resources) {
-    return false;
-  }
-
-  @Override
-  public boolean acquireReadLock(OzoneManagerLock.Resource resource,
-                                 String... resources) {
-    return true;
-  }
 
   @Override
-  public boolean acquireReadHashedLock(OzoneManagerLock.Resource resource,
-                                 String resourceName) {
+  public boolean acquireReadLock(Resource resource, String... resources) {
     return true;
   }
 
   @Override
-  public boolean acquireWriteLock(OzoneManagerLock.Resource resource,
-                                  String... resources) {
+  public boolean acquireWriteLock(Resource resource, String... resources) {
     return false;
   }
 
-  @Override
-  public boolean acquireWriteHashedLock(OzoneManagerLock.Resource resource,
-                                        String resourceName) {
-    return false;
-  }
-
-  @Override
-  public String generateResourceName(OzoneManagerLock.Resource resource,
-                                     String... resources) {
-    return "";
-  }
-
   @Override
   public boolean acquireMultiUserLock(String firstUser, String secondUser) {
     return false;
@@ -70,90 +47,31 @@ public class OmReadOnlyLock implements IOzoneManagerLock {
   }
 
   @Override
-  public void releaseWriteLock(OzoneManagerLock.Resource resource,
-                               String... resources) {
-  // Intentionally empty
-  }
-
-  @Override
-  public void releaseWriteHashedLock(OzoneManagerLock.Resource resource,
-                               String resourceName) {
-  // Intentionally empty
-  }
-
-  @Override
-  public void releaseReadLock(OzoneManagerLock.Resource resource,
-                              String... resources) {
+  public void releaseWriteLock(Resource resource, String... resources) {
   // Intentionally empty
   }
 
   @Override
-  public void releaseReadHashedLock(OzoneManagerLock.Resource resource,
-                               String resourceName) {
+  public void releaseReadLock(Resource resource, String... resources) {
   // Intentionally empty
   }
 
   @Override
-  public void releaseLock(OzoneManagerLock.Resource resource,
-                          String... resources) {
-  // Intentionally empty
-  }
-
-  @Override
-  public int getReadHoldCount(String resourceName) {
+  public int getReadHoldCount(Resource resource, String... resources) {
     return 0;
   }
 
   @Override
-  public String getReadLockWaitingTimeMsStat() {
-    return "";
-  }
-
-  @Override
-  public long getLongestReadLockWaitingTimeMs() {
-    return 0;
-  }
-
-  @Override
-  public String getReadLockHeldTimeMsStat() {
-    return "";
-  }
-
-  @Override
-  public long getLongestReadLockHeldTimeMs() {
+  public int getWriteHoldCount(Resource resource, String... resources) {
     return 0;
   }
 
   @Override
-  public int getWriteHoldCount(String resourceName) {
-    return 0;
-  }
-
-  @Override
-  public boolean isWriteLockedByCurrentThread(String resourceName) {
+  public boolean isWriteLockedByCurrentThread(Resource resource,
+      String... resources) {
     return false;
   }
 
-  @Override
-  public String getWriteLockWaitingTimeMsStat() {
-    return "";
-  }
-
-  @Override
-  public long getLongestWriteLockWaitingTimeMs() {
-    return 0;
-  }
-
-  @Override
-  public String getWriteLockHeldTimeMsStat() {
-    return "";
-  }
-
-  @Override
-  public long getLongestWriteLockHeldTimeMs() {
-    return 0;
-  }
-
   @Override
   public void cleanup() {
   // Intentionally empty
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index b5cf45e043..39d74cb1ee 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -20,20 +20,26 @@ package org.apache.hadoop.ozone.om.lock;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Striped;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.ozone.lock.LockManager;
 
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_STRIPED_LOCK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_STRIPED_LOCK_SIZE_PREFIX;
+import static org.apache.hadoop.hdds.utils.CompositeKey.combineKeys;
 
 /**
  * Provides different locks to handle concurrency in OzoneMaster.
@@ -78,14 +84,11 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK;
  */
 
 public class OzoneManagerLock implements IOzoneManagerLock {
-
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneManagerLock.class);
 
-  private static final String READ_LOCK = "read";
-  private static final String WRITE_LOCK = "write";
+  private final Map<Resource, Striped<ReadWriteLock>> stripedLockByResource;
 
-  private final LockManager<String> manager;
   private OMLockMetrics omLockMetrics;
   private final ThreadLocal<Short> lockSet = ThreadLocal.withInitial(
       () -> Short.valueOf((short)0));
@@ -95,34 +98,36 @@ public class OzoneManagerLock implements IOzoneManagerLock {
    * @param conf Configuration object
    */
   public OzoneManagerLock(ConfigurationSource conf) {
-    boolean fair = conf.getBoolean(OZONE_MANAGER_FAIR_LOCK,
-        OZONE_MANAGER_FAIR_LOCK_DEFAULT);
-    manager = new LockManager<>(conf, fair);
     omLockMetrics = OMLockMetrics.create();
+
+    // TODO: for now, guava Striped doesn't allow create a striped
+    //  ReadWriteLock with fair-lock yet.
+    //  https://github.com/google/guava/issues/2514.
+    //  We may have to consider implement our own striped locks.
+    //boolean fair = conf.getBoolean(OZONE_MANAGER_FAIR_LOCK,
+    //    OZONE_MANAGER_FAIR_LOCK_DEFAULT);
+
+    Map<Resource, Striped<ReadWriteLock>> stripedLockMap =
+        new EnumMap<>(Resource.class);
+    for (Resource r : Resource.values()) {
+      stripedLockMap.put(r, createStripeLock(r, conf));
+    }
+    this.stripedLockByResource = Collections.unmodifiableMap(stripedLockMap);
   }
 
-  /**
-   * Acquire lock on resource.
-   *
-   * For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same
-   * thread acquiring lock again is allowed.
-   *
-   * For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread
-   * acquiring lock again is not allowed.
-   *
-   * Special Note for USER_LOCK: Single thread can acquire single user lock/
-   * multi user lock. But not both at the same time.
-   * @param resource - Type of the resource.
-   * @param resources - Resource names on which user want to acquire lock.
-   * For Resource type BUCKET_LOCK, first param should be volume, second param
-   * should be bucket name. For remaining all resource only one param should
-   * be passed.
-   */
-  @Override
-  @Deprecated
-  public boolean acquireLock(Resource resource, String... resources) {
-    String resourceName = generateResourceName(resource, resources);
-    return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
+  private Striped<ReadWriteLock> createStripeLock(Resource r,
+      ConfigurationSource conf) {
+    String stripeSizeKey = OZONE_MANAGER_STRIPED_LOCK_SIZE_PREFIX +
+        r.getName().toLowerCase();
+    int size = conf.getInt(stripeSizeKey,
+        OZONE_MANAGER_STRIPED_LOCK_SIZE_DEFAULT);
+    return Striped.readWriteLock(size);
+  }
+
+  private ReentrantReadWriteLock getLock(Resource resource, String... keys) {
+    Striped<ReadWriteLock> lockStriped = stripedLockByResource.get(resource);
+    Object key = combineKeys(keys);
+    return (ReentrantReadWriteLock) lockStriped.get(key);
   }
 
   /**
@@ -137,20 +142,14 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
    * Special Note for USER_LOCK: Single thread can acquire single user lock/
    * multi user lock. But not both at the same time.
    * @param resource - Type of the resource.
-   * @param resources - Resource names on which user want to acquire lock.
+   * @param keys - Resource names on which user want to acquire lock.
    * For Resource type BUCKET_LOCK, first param should be volume, second param
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
   @Override
-  public boolean acquireReadLock(Resource resource, String... resources) {
-    String resourceName = generateResourceName(resource, resources);
-    return lock(resource, resourceName, manager::readLock, READ_LOCK);
-  }
-
-  @Override
-  public boolean acquireReadHashedLock(Resource resource, String resourceName) 
{
-    return lock(resource, resourceName, manager::readLock, READ_LOCK);
+  public boolean acquireReadLock(Resource resource, String... keys) {
+    return acquireLock(resource, true, keys);
   }
 
   /**
@@ -165,60 +164,47 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
    * Special Note for USER_LOCK: Single thread can acquire single user lock/
    * multi user lock. But not both at the same time.
    * @param resource - Type of the resource.
-   * @param resources - Resource names on which user want to acquire lock.
+   * @param keys - Resource names on which user want to acquire lock.
    * For Resource type BUCKET_LOCK, first param should be volume, second param
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
   @Override
-  public boolean acquireWriteLock(Resource resource, String... resources) {
-    String resourceName = generateResourceName(resource, resources);
-    return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
+  public boolean acquireWriteLock(Resource resource, String... keys) {
+    return acquireLock(resource, false, keys);
   }
 
-  @Override
-  public boolean acquireWriteHashedLock(Resource resource,
-                                        String resourceName) {
-    return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
-  }
-
-  private boolean lock(Resource resource, String resourceName,
-      Consumer<String> lockFn, String lockType) {
+  private boolean acquireLock(Resource resource, boolean isReadLock,
+      String... keys) {
     if (!resource.canLock(lockSet.get())) {
       String errorMessage = getErrorMessage(resource);
       LOG.error(errorMessage);
       throw new RuntimeException(errorMessage);
-    } else {
-      long startWaitingTimeNanos = Time.monotonicNowNanos();
-      lockFn.accept(resourceName);
-
-      switch (lockType) {
-      case READ_LOCK:
-        updateReadLockMetrics(resource, resourceName, startWaitingTimeNanos);
-        break;
-      case WRITE_LOCK:
-        updateWriteLockMetrics(resource, resourceName, startWaitingTimeNanos);
-        break;
-      default:
-        break;
-      }
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Acquired {} {} lock on resource {}", lockType, 
resource.name,
-            resourceName);
-      }
-      lockSet.set(resource.setLock(lockSet.get()));
-      return true;
+    long startWaitingTimeNanos = Time.monotonicNowNanos();
+
+    ReentrantReadWriteLock lock = getLock(resource, keys);
+    if (isReadLock) {
+      lock.readLock().lock();
+      updateReadLockMetrics(resource, lock, startWaitingTimeNanos);
+    } else {
+      lock.writeLock().lock();
+      updateWriteLockMetrics(resource, lock, startWaitingTimeNanos);
     }
+
+    lockSet.set(resource.setLock(lockSet.get()));
+    return true;
   }
 
-  private void updateReadLockMetrics(Resource resource, String resourceName,
-                                     long startWaitingTimeNanos) {
-    /**
+  private void updateReadLockMetrics(Resource resource,
+      ReentrantReadWriteLock lock, long startWaitingTimeNanos) {
+
+    /*
      *  readHoldCount helps in metrics updation only once in case
      *  of reentrant locks.
      */
-    if (manager.getReadHoldCount(resourceName) == 1) {
+    if (lock.getReadHoldCount() == 1) {
       long readLockWaitingTimeNanos =
           Time.monotonicNowNanos() - startWaitingTimeNanos;
 
@@ -230,15 +216,15 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
     }
   }
 
-  private void updateWriteLockMetrics(Resource resource, String resourceName,
-                                      long startWaitingTimeNanos) {
-    /**
+  private void updateWriteLockMetrics(Resource resource,
+      ReentrantReadWriteLock lock, long startWaitingTimeNanos) {
+    /*
      *  writeHoldCount helps in metrics updation only once in case
      *  of reentrant locks. Metrics are updated only if the write lock is held
      *  by the current thread.
      */
-    if ((manager.getWriteHoldCount(resourceName) == 1) &&
-        manager.isWriteLockedByCurrentThread(resourceName)) {
+    if ((lock.getWriteHoldCount() == 1) &&
+        lock.isWriteLockedByCurrentThread()) {
       long writeLockWaitingTimeNanos =
           Time.monotonicNowNanos() - startWaitingTimeNanos;
 
@@ -250,31 +236,6 @@ public class OzoneManagerLock implements IOzoneManagerLock 
{
     }
   }
 
-  /**
-   * Generate resource name to be locked.
-   * @param resource
-   * @param resources
-   */
-  @Override
-  public String generateResourceName(Resource resource, String... resources) {
-    if (resources.length == 1 && resource != Resource.BUCKET_LOCK) {
-      return OzoneManagerLockUtil.generateResourceLockName(resource,
-          resources[0]);
-    } else if (resources.length == 2 && resource == Resource.BUCKET_LOCK) {
-      return OzoneManagerLockUtil.generateBucketLockName(resources[0],
-          resources[1]);
-    } else if (resources.length == 3 && resource == Resource.SNAPSHOT_LOCK) {
-      return OzoneManagerLockUtil.generateSnapshotLockName(resources[0],
-          resources[1], resources[2]);
-    } else if (resources.length == 3 && resource == Resource.KEY_PATH_LOCK) {
-      return OzoneManagerLockUtil.generateKeyPathLockName(resources[0],
-          resources[1], resources[2]);
-    } else {
-      throw new IllegalArgumentException("acquire lock is supported on single" 
+
-          " resource for all locks except for resource bucket/snapshot");
-    }
-  }
-
   private String getErrorMessage(Resource resource) {
     return "Thread '" + Thread.currentThread().getName() + "' cannot " +
         "acquire " + resource.name + " lock while holding " +
@@ -296,69 +257,32 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
 
   /**
    * Acquire lock on multiple users.
-   * @param firstUser
-   * @param secondUser
    */
   @Override
   public boolean acquireMultiUserLock(String firstUser, String secondUser) {
     Resource resource = Resource.USER_LOCK;
-    firstUser = generateResourceName(resource, firstUser);
-    secondUser = generateResourceName(resource, secondUser);
 
     if (!resource.canLock(lockSet.get())) {
       String errorMessage = getErrorMessage(resource);
       LOG.error(errorMessage);
       throw new RuntimeException(errorMessage);
     } else {
-      // When acquiring multiple user locks, the reason for doing lexical
-      // order comparison is to avoid deadlock scenario.
-
-      // Example: 1st thread acquire lock(ozone, hdfs)
-      // 2nd thread acquire lock(hdfs, ozone).
-      // If we don't acquire user locks in an order, there can be a deadlock.
-      // 1st thread acquired lock on ozone, waiting for lock on hdfs, 2nd
-      // thread acquired lock on hdfs, waiting for lock on ozone.
-      // To avoid this when we acquire lock on multiple users, we acquire
-      // locks in lexical order, which can help us to avoid dead locks.
-      // Now if first thread acquires lock on hdfs, 2nd thread wait for lock
-      // on hdfs, and first thread acquires lock on ozone. Once after first
-      // thread releases user locks, 2nd thread acquires them.
-
-      int compare = firstUser.compareTo(secondUser);
-      String temp;
-
-      // Order the user names in sorted order. Swap them.
-      if (compare > 0) {
-        temp = secondUser;
-        secondUser = firstUser;
-        firstUser = temp;
+      Striped<ReadWriteLock> stripedLocks =
+          stripedLockByResource.get(Resource.USER_LOCK);
+      // The result of bulkGet is always sorted in a consistent order.
+      // This prevents deadlocks.
+      Iterable<ReadWriteLock> locks =
+          stripedLocks.bulkGet(Arrays.asList(firstUser, secondUser));
+      for (ReadWriteLock lock : locks) {
+        lock.writeLock().lock();
       }
 
-      if (compare == 0) {
-        // both users are equal.
-        manager.writeLock(firstUser);
-      } else {
-        manager.writeLock(firstUser);
-        try {
-          manager.writeLock(secondUser);
-        } catch (Exception ex) {
-          // We got an exception acquiring 2nd user lock. Release already
-          // acquired user lock, and throw exception to the user.
-          manager.writeUnlock(firstUser);
-          throw ex;
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Acquired Write {} lock on resource {} and {}", 
resource.name,
-            firstUser, secondUser);
-      }
       lockSet.set(resource.setLock(lockSet.get()));
       return true;
     }
   }
 
 
-
   /**
    * Release lock on multiple users.
    * @param firstUser
@@ -366,120 +290,67 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
    */
   @Override
   public void releaseMultiUserLock(String firstUser, String secondUser) {
-    Resource resource = Resource.USER_LOCK;
-    firstUser = generateResourceName(resource, firstUser);
-    secondUser = generateResourceName(resource, secondUser);
-
-    int compare = firstUser.compareTo(secondUser);
-
-    String temp;
-    // Order the user names in sorted order. Swap them.
-    if (compare > 0) {
-      temp = secondUser;
-      secondUser = firstUser;
-      firstUser = temp;
+    Striped<ReadWriteLock> stripedLocks =
+        stripedLockByResource.get(Resource.USER_LOCK);
+    Iterable<ReadWriteLock> locks =
+        stripedLocks.bulkGet(Arrays.asList(firstUser, secondUser));
+    for (ReadWriteLock lock : locks) {
+      lock.writeLock().unlock();
     }
 
-    if (compare == 0) {
-      // both users are equal.
-      manager.writeUnlock(firstUser);
-    } else {
-      manager.writeUnlock(firstUser);
-      manager.writeUnlock(secondUser);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Release Write {} lock on resource {} and {}", resource.name,
-          firstUser, secondUser);
-    }
-    lockSet.set(resource.clearLock(lockSet.get()));
+    lockSet.set(Resource.USER_LOCK.clearLock(lockSet.get()));
   }
 
+
   /**
    * Release write lock on resource.
    * @param resource - Type of the resource.
-   * @param resources - Resource names on which user want to acquire lock.
+   * @param keys - Resource names on which user want to acquire lock.
    * For Resource type BUCKET_LOCK, first param should be volume, second param
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
   @Override
-  public void releaseWriteLock(Resource resource, String... resources) {
-    String resourceName = generateResourceName(resource, resources);
-    unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
-  }
-
-  @Override
-  public void releaseWriteHashedLock(Resource resource, String resourceName) {
-    unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
+  public void releaseWriteLock(Resource resource, String... keys) {
+    releaseLock(resource, false, keys);
   }
 
   /**
    * Release read lock on resource.
    * @param resource - Type of the resource.
-   * @param resources - Resource names on which user want to acquire lock.
+   * @param keys - Resource names on which user want to acquire lock.
    * For Resource type BUCKET_LOCK, first param should be volume, second param
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
   @Override
-  public void releaseReadLock(Resource resource, String... resources) {
-    String resourceName = generateResourceName(resource, resources);
-    unlock(resource, resourceName, manager::readUnlock, READ_LOCK);
+  public void releaseReadLock(Resource resource, String... keys) {
+    releaseLock(resource, true, keys);
   }
 
-  @Override
-  public void releaseReadHashedLock(Resource resource, String resourceName) {
-    unlock(resource, resourceName, manager::readUnlock, READ_LOCK);
-  }
+  private void releaseLock(Resource resource, boolean isReadLock,
+      String... keys) {
 
-  /**
-   * Release write lock on resource.
-   * @param resource - Type of the resource.
-   * @param resources - Resource names on which user want to acquire lock.
-   * For Resource type BUCKET_LOCK, first param should be volume, second param
-   * should be bucket name. For remaining all resource only one param should
-   * be passed.
-   */
-  @Override
-  @Deprecated
-  public void releaseLock(Resource resource, String... resources) {
-    String resourceName = generateResourceName(resource, resources);
-    unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
-  }
-
-  private void unlock(Resource resource, String resourceName,
-      Consumer<String> lockFn, String lockType) {
-    boolean isWriteLocked = manager.isWriteLockedByCurrentThread(resourceName);
-    // TODO: Not checking release of higher order level lock happened while
-    // releasing lower order level lock, as for that we need counter for
-    // locks, as some locks support acquiring lock again.
-    lockFn.accept(resourceName);
-
-    switch (lockType) {
-    case READ_LOCK:
-      updateReadUnlockMetrics(resource, resourceName);
-      break;
-    case WRITE_LOCK:
-      updateWriteUnlockMetrics(resource, resourceName, isWriteLocked);
-      break;
-    default:
-      break;
+    ReentrantReadWriteLock lock = getLock(resource, keys);
+    if (isReadLock) {
+      lock.readLock().unlock();
+      updateReadUnlockMetrics(resource, lock);
+    } else {
+      boolean isWriteLocked = lock.isWriteLockedByCurrentThread();
+      lock.writeLock().unlock();
+      updateWriteUnlockMetrics(resource, lock, isWriteLocked);
     }
 
-    // clear lock
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Release {} {}, lock on resource {}", lockType, resource.name,
-          resourceName);
-    }
     lockSet.set(resource.clearLock(lockSet.get()));
   }
 
-  private void updateReadUnlockMetrics(Resource resource, String resourceName) 
{
-    /**
+  private void updateReadUnlockMetrics(Resource resource,
+      ReentrantReadWriteLock lock) {
+    /*
      *  readHoldCount helps in metrics updation only once in case
      *  of reentrant locks.
      */
-    if (manager.getReadHoldCount(resourceName) == 0) {
+    if (lock.getReadHoldCount() == 0) {
       long readLockHeldTimeNanos =
           Time.monotonicNowNanos() - resource.getStartReadHeldTimeNanos();
 
@@ -489,14 +360,14 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
     }
   }
 
-  private void updateWriteUnlockMetrics(Resource resource, String resourceName,
-                                        boolean isWriteLocked) {
-    /**
+  private void updateWriteUnlockMetrics(Resource resource,
+      ReentrantReadWriteLock lock, boolean isWriteLocked) {
+    /*
      *  writeHoldCount helps in metrics updation only once in case
      *  of reentrant locks. Metrics are updated only if the write lock is held
      *  by the current thread.
      */
-    if ((manager.getWriteHoldCount(resourceName) == 0) && isWriteLocked) {
+    if ((lock.getWriteHoldCount() == 0) && isWriteLocked) {
       long writeLockHeldTimeNanos =
           Time.monotonicNowNanos() - resource.getStartWriteHeldTimeNanos();
 
@@ -509,139 +380,38 @@ public class OzoneManagerLock implements 
IOzoneManagerLock {
   /**
    * Returns readHoldCount for a given resource lock name.
    *
-   * @param resourceName resource lock name
    * @return readHoldCount
    */
   @Override
   @VisibleForTesting
-  public int getReadHoldCount(String resourceName) {
-    return manager.getReadHoldCount(resourceName);
+  public int getReadHoldCount(Resource resource, String... keys) {
+    return getLock(resource, keys).getReadHoldCount();
   }
 
-  /**
-   * Returns a string representation of the object. Provides information on the
-   * total number of samples, minimum value, maximum value, arithmetic mean,
-   * standard deviation of all the samples added.
-   *
-   * @return String representation of object
-   */
-  @Override
-  @VisibleForTesting
-  public String getReadLockWaitingTimeMsStat() {
-    return omLockMetrics.getReadLockWaitingTimeMsStat();
-  }
-
-  /**
-   * Returns the longest time (ms) a read lock was waiting since the last
-   * measurement.
-   *
-   * @return longest read lock waiting time (ms)
-   */
-  @Override
-  @VisibleForTesting
-  public long getLongestReadLockWaitingTimeMs() {
-    return omLockMetrics.getLongestReadLockWaitingTimeMs();
-  }
-
-  /**
-   * Returns a string representation of the object. Provides information on the
-   * total number of samples, minimum value, maximum value, arithmetic mean,
-   * standard deviation of all the samples added.
-   *
-   * @return String representation of object
-   */
-  @Override
-  @VisibleForTesting
-  public String getReadLockHeldTimeMsStat() {
-    return omLockMetrics.getReadLockHeldTimeMsStat();
-  }
-
-  /**
-   * Returns the longest time (ms) a read lock was held since the last
-   * measurement.
-   *
-   * @return longest read lock held time (ms)
-   */
-  @Override
-  @VisibleForTesting
-  public long getLongestReadLockHeldTimeMs() {
-    return omLockMetrics.getLongestReadLockHeldTimeMs();
-  }
 
   /**
    * Returns writeHoldCount for a given resource lock name.
    *
-   * @param resourceName resource lock name
    * @return writeHoldCount
    */
   @Override
   @VisibleForTesting
-  public int getWriteHoldCount(String resourceName) {
-    return manager.getWriteHoldCount(resourceName);
+  public int getWriteHoldCount(Resource resource, String... keys) {
+    return getLock(resource, keys).getWriteHoldCount();
   }
 
   /**
    * Queries if the write lock is held by the current thread for a given
    * resource lock name.
    *
-   * @param resourceName resource lock name
    * @return {@code true} if the current thread holds the write lock and
    *         {@code false} otherwise
    */
   @Override
   @VisibleForTesting
-  public boolean isWriteLockedByCurrentThread(String resourceName) {
-    return manager.isWriteLockedByCurrentThread(resourceName);
-  }
-
-  /**
-   * Returns a string representation of the object. Provides information on the
-   * total number of samples, minimum value, maximum value, arithmetic mean,
-   * standard deviation of all the samples added.
-   *
-   * @return String representation of object
-   */
-  @Override
-  @VisibleForTesting
-  public String getWriteLockWaitingTimeMsStat() {
-    return omLockMetrics.getWriteLockWaitingTimeMsStat();
-  }
-
-  /**
-   * Returns the longest time (ms) a write lock was waiting since the last
-   * measurement.
-   *
-   * @return longest write lock waiting time (ms)
-   */
-  @Override
-  @VisibleForTesting
-  public long getLongestWriteLockWaitingTimeMs() {
-    return omLockMetrics.getLongestWriteLockWaitingTimeMs();
-  }
-
-  /**
-   * Returns a string representation of the object. Provides information on the
-   * total number of samples, minimum value, maximum value, arithmetic mean,
-   * standard deviation of all the samples added.
-   *
-   * @return String representation of object
-   */
-  @Override
-  @VisibleForTesting
-  public String getWriteLockHeldTimeMsStat() {
-    return omLockMetrics.getWriteLockHeldTimeMsStat();
-  }
-
-  /**
-   * Returns the longest time (ms) a write lock was held since the last
-   * measurement.
-   *
-   * @return longest write lock held time (ms)
-   */
-  @Override
-  @VisibleForTesting
-  public long getLongestWriteLockHeldTimeMs() {
-    return omLockMetrics.getLongestWriteLockHeldTimeMs();
+  public boolean isWriteLockedByCurrentThread(Resource resource,
+      String... keys) {
+    return getLock(resource, keys).isWriteLockedByCurrentThread();
   }
 
   /**
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyPathLock.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyPathLock.java
index 3b2179e4ba..ee4c9b18ca 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyPathLock.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyPathLock.java
@@ -92,7 +92,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
 
         testSameKeyPathWriteLockMultiThreadingUtil(iterations, lock, counter,
             countDownLatch, listTokens, sampleResourceName);
-      });
+      }, "Thread-" + i);
 
       threads[i].start();
     }
@@ -136,8 +136,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
     // Now all threads have been instantiated.
     Assert.assertEquals(0, countDownLatch.getCount());
 
-    lock.acquireWriteHashedLock(resource,
-        generateResourceHashCode(resource, sampleResourceName));
+    lock.acquireWriteLock(resource, sampleResourceName);
     LOG.info("Write Lock Acquired by " + Thread.currentThread().getName());
 
     /**
@@ -153,8 +152,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
     //  Thread-2 -> 101 - 200 and so on.
     listTokens.add(counter.getCount());
 
-    lock.releaseWriteHashedLock(resource,
-        generateResourceHashCode(resource, sampleResourceName));
+    lock.releaseWriteLock(resource, sampleResourceName);
     LOG.info("Write Lock Released by " + Thread.currentThread().getName());
   }
 
@@ -187,8 +185,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
 
         testDiffKeyPathWriteLockMultiThreadingUtil(lock, countDown,
             sampleResourceName);
-      });
-
+      }, "Thread-" + i);
       threads[i].start();
     }
 
@@ -218,26 +215,15 @@ public class TestKeyPathLock extends TestOzoneManagerLock 
{
       OzoneManagerLock lock, CountDownLatch countDown,
       String[] sampleResourceName) {
 
-    lock.acquireWriteHashedLock(resource,
-        generateResourceHashCode(resource, sampleResourceName));
+    lock.acquireWriteLock(resource, sampleResourceName);
     LOG.info("Write Lock Acquired by " + Thread.currentThread().getName());
 
     // Waiting for all the threads to be instantiated/to reach
-    // acquireWriteHashedLock.
+    // acquireWriteLock.
     countDown.countDown();
-    while (countDown.getCount() > 0) {
-      try {
-        Thread.sleep(500);
-        LOG.info("countDown.getCount() -> " + countDown.getCount());
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
     Assert.assertEquals(1, lock.getCurrentLocks().size());
 
-    lock.releaseWriteHashedLock(resource,
-        generateResourceHashCode(resource, sampleResourceName));
+    lock.releaseWriteLock(resource, sampleResourceName);
     LOG.info("Write Lock Released by " + Thread.currentThread().getName());
   }
 
@@ -255,8 +241,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
     String[] resourceName = new String[]{volumeName, bucketName, keyName},
         higherResourceName = new String[]{volumeName, bucketName};
 
-    lock.acquireWriteHashedLock(resource,
-        generateResourceHashCode(resource, resourceName));
+    lock.acquireWriteLock(resource, resourceName);
     try {
       lock.acquireWriteLock(higherResource, higherResourceName);
       fail("testAcquireWriteBucketLockWhileAcquiredWriteKeyPathLock() failed");
@@ -281,8 +266,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
     String[] resourceName = new String[]{volumeName, bucketName, keyName},
         higherResourceName = new String[]{volumeName, bucketName};
 
-    lock.acquireReadHashedLock(resource,
-        generateResourceHashCode(resource, resourceName));
+    lock.acquireReadLock(resource, resourceName);
     try {
       lock.acquireWriteLock(higherResource, higherResourceName);
       fail("testAcquireWriteBucketLockWhileAcquiredReadKeyPathLock() failed");
@@ -307,8 +291,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
     String[] resourceName = new String[]{volumeName, bucketName, keyName},
         higherResourceName = new String[]{volumeName, bucketName};
 
-    lock.acquireReadHashedLock(resource,
-        generateResourceHashCode(resource, resourceName));
+    lock.acquireReadLock(resource, resourceName);
     try {
       lock.acquireReadLock(higherResource, higherResourceName);
       fail("testAcquireReadBucketLockWhileAcquiredReadKeyPathLock() failed");
@@ -333,8 +316,7 @@ public class TestKeyPathLock extends TestOzoneManagerLock {
     String[] resourceName = new String[]{volumeName, bucketName, keyName},
         higherResourceName = new String[]{volumeName, bucketName};
 
-    lock.acquireWriteHashedLock(resource,
-        generateResourceHashCode(resource, resourceName));
+    lock.acquireWriteLock(resource, resourceName);
     try {
       lock.acquireReadLock(higherResource, higherResourceName);
       fail("testAcquireReadBucketLockWhileAcquiredWriteKeyPathLock() failed");
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
index a87887111c..c230839136 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /**
@@ -51,15 +53,13 @@ public class TestOzoneManagerLock {
   @Test
   public void acquireResourceLock() {
     String[] resourceName;
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource : Resource.values()) {
       resourceName = generateResourceName(resource);
       testResourceLock(resourceName, resource);
     }
   }
 
-  private void testResourceLock(String[] resourceName,
-      OzoneManagerLock.Resource resource) {
+  private void testResourceLock(String[] resourceName, Resource resource) {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
     lock.acquireWriteLock(resource, resourceName);
     lock.releaseWriteLock(resource, resourceName);
@@ -69,81 +69,36 @@ public class TestOzoneManagerLock {
   @Test
   public void reacquireResourceLock() {
     String[] resourceName;
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource : Resource.values()) {
       resourceName = generateResourceName(resource);
       testResourceReacquireLock(resourceName, resource);
     }
   }
 
-  protected boolean acquireWriteLock(OzoneManagerLock lock,
-                                     OzoneManagerLock.Resource resource,
-                                     String[] resourceName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      return lock.acquireWriteHashedLock(resource,
-          generateResourceHashCode(resource, resourceName));
-    } else {
-      return lock.acquireWriteLock(resource, resourceName);
-    }
-  }
-
-  protected void releaseWriteLock(OzoneManagerLock lock,
-                                     OzoneManagerLock.Resource resource,
-                                     String[] resourceName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      lock.releaseWriteHashedLock(resource,
-          generateResourceHashCode(resource, resourceName));
-    } else {
-      lock.releaseWriteLock(resource, resourceName);
-    }
-  }
-
-  protected boolean acquireReadLock(OzoneManagerLock lock,
-                                     OzoneManagerLock.Resource resource,
-                                     String[] resourceName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      return lock.acquireReadHashedLock(resource,
-          generateResourceHashCode(resource, resourceName));
-    } else {
-      return lock.acquireReadLock(resource, resourceName);
-    }
-  }
-
-  protected void releaseReadLock(OzoneManagerLock lock,
-                                     OzoneManagerLock.Resource resource,
-                                     String[] resourceName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      lock.releaseReadHashedLock(resource,
-          generateResourceHashCode(resource, resourceName));
-    } else {
-      lock.releaseReadLock(resource, resourceName);
-    }
-  }
-
   private void testResourceReacquireLock(String[] resourceName,
-      OzoneManagerLock.Resource resource) {
+      Resource resource) {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
 
     // Lock re-acquire not allowed by same thread.
-    if (resource == OzoneManagerLock.Resource.USER_LOCK ||
-        resource == OzoneManagerLock.Resource.S3_SECRET_LOCK ||
-        resource == OzoneManagerLock.Resource.PREFIX_LOCK) {
-      acquireWriteLock(lock, resource, resourceName);
+    if (resource == Resource.USER_LOCK ||
+        resource == Resource.S3_SECRET_LOCK ||
+        resource == Resource.PREFIX_LOCK) {
+      lock.acquireWriteLock(resource, resourceName);
       try {
-        acquireWriteLock(lock, resource, resourceName);
+        lock.acquireWriteLock(resource, resourceName);
         fail("reacquireResourceLock failed");
       } catch (RuntimeException ex) {
         String message = "cannot acquire " + resource.getName() + " lock " +
             "while holding [" + resource.getName() + "] lock(s).";
         Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
       }
-      releaseWriteLock(lock, resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
       Assert.assertTrue(true);
     } else {
-      acquireWriteLock(lock, resource, resourceName);
-      acquireWriteLock(lock, resource, resourceName);
-      releaseWriteLock(lock, resource, resourceName);
-      releaseWriteLock(lock, resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
       Assert.assertTrue(true);
     }
   }
@@ -156,24 +111,22 @@ public class TestOzoneManagerLock {
     // What this test does is iterate all resources. For each resource
     // acquire lock, and then in inner loop acquire all locks with higher
     // lock level, finally release the locks.
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource : Resource.values()) {
       Stack<ResourceInfo> stack = new Stack<>();
       resourceName = generateResourceName(resource);
-      acquireWriteLock(lock, resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
       stack.push(new ResourceInfo(resourceName, resource));
-      for (OzoneManagerLock.Resource higherResource :
-          OzoneManagerLock.Resource.values()) {
+      for (Resource higherResource : Resource.values()) {
         if (higherResource.getMask() > resource.getMask()) {
           resourceName = generateResourceName(higherResource);
-          acquireWriteLock(lock, higherResource, resourceName);
+          lock.acquireWriteLock(higherResource, resourceName);
           stack.push(new ResourceInfo(resourceName, higherResource));
         }
       }
       // Now release locks
       while (!stack.empty()) {
         ResourceInfo resourceInfo = stack.pop();
-        releaseWriteLock(lock, resourceInfo.getResource(),
+        lock.releaseWriteLock(resourceInfo.getResource(),
             resourceInfo.getLockName());
       }
     }
@@ -183,15 +136,13 @@ public class TestOzoneManagerLock {
   @Test
   public void testLockViolationsWithOneHigherLevelLock() {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
-      for (OzoneManagerLock.Resource higherResource :
-          OzoneManagerLock.Resource.values()) {
+    for (Resource resource : Resource.values()) {
+      for (Resource higherResource : Resource.values()) {
         if (higherResource.getMask() > resource.getMask()) {
           String[] resourceName = generateResourceName(higherResource);
-          acquireWriteLock(lock, higherResource, resourceName);
+          lock.acquireWriteLock(higherResource, resourceName);
           try {
-            acquireWriteLock(lock, resource, generateResourceName(resource));
+            lock.acquireWriteLock(resource, generateResourceName(resource));
             fail("testLockViolationsWithOneHigherLevelLock failed");
           } catch (RuntimeException ex) {
             String message = "cannot acquire " + resource.getName() + " lock " 
+
@@ -199,7 +150,7 @@ public class TestOzoneManagerLock {
             Assert.assertTrue(ex.getMessage(),
                 ex.getMessage().contains(message));
           }
-          releaseWriteLock(lock, higherResource, resourceName);
+          lock.releaseWriteLock(higherResource, resourceName);
         }
       }
     }
@@ -213,21 +164,19 @@ public class TestOzoneManagerLock {
     // What this test does is iterate all resources. For each resource
     // acquire an higher level lock above the resource, and then take the the
     // lock. This should fail. Like that it tries all error combinations.
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource : Resource.values()) {
       Stack<ResourceInfo> stack = new Stack<>();
       List<String> currentLocks = new ArrayList<>();
-      for (OzoneManagerLock.Resource higherResource :
-          OzoneManagerLock.Resource.values()) {
+      for (Resource higherResource : Resource.values()) {
         if (higherResource.getMask() > resource.getMask()) {
           resourceName = generateResourceName(higherResource);
-          acquireWriteLock(lock, higherResource, resourceName);
+          lock.acquireWriteLock(higherResource, resourceName);
           stack.push(new ResourceInfo(resourceName, higherResource));
           currentLocks.add(higherResource.getName());
           // try to acquire lower level lock
           try {
             resourceName = generateResourceName(resource);
-            acquireWriteLock(lock, resource, resourceName);
+            lock.acquireWriteLock(resource, resourceName);
           } catch (RuntimeException ex) {
             String message = "cannot acquire " + resource.getName() + " lock " 
+
                 "while holding " + currentLocks.toString() + " lock(s).";
@@ -240,7 +189,7 @@ public class TestOzoneManagerLock {
       // Now release locks
       while (!stack.empty()) {
         ResourceInfo resourceInfo = stack.pop();
-        releaseWriteLock(lock, resourceInfo.getResource(),
+        lock.releaseWriteLock(resourceInfo.getResource(),
             resourceInfo.getLockName());
       }
     }
@@ -250,23 +199,17 @@ public class TestOzoneManagerLock {
   public void releaseLockWithOutAcquiringLock() {
     OzoneManagerLock lock =
         new OzoneManagerLock(new OzoneConfiguration());
-    try {
-      lock.releaseWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
-      fail("releaseLockWithOutAcquiringLock failed");
-    } catch (IllegalMonitorStateException ex) {
-      String message = "Releasing lock on resource $user3 without acquiring " +
-          "lock";
-      Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
-    }
+    assertThrows(IllegalMonitorStateException.class,
+        () -> lock.releaseWriteLock(Resource.USER_LOCK, "user3"));
   }
 
 
-  private String[] generateResourceName(OzoneManagerLock.Resource resource) {
-    if (resource == OzoneManagerLock.Resource.BUCKET_LOCK) {
+  private String[] generateResourceName(Resource resource) {
+    if (resource == Resource.BUCKET_LOCK) {
       return new String[]{UUID.randomUUID().toString(),
           UUID.randomUUID().toString()};
-    } else if ((resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) ||
-        (resource == OzoneManagerLock.Resource.SNAPSHOT_LOCK)) {
+    } else if ((resource == Resource.KEY_PATH_LOCK) ||
+        (resource == Resource.SNAPSHOT_LOCK)) {
       return new String[]{UUID.randomUUID().toString(),
           UUID.randomUUID().toString(), UUID.randomUUID().toString()};
     } else {
@@ -274,82 +217,14 @@ public class TestOzoneManagerLock {
     }
   }
 
-  protected String generateResourceLockName(OzoneManagerLock.Resource resource,
-                                          String... resources) {
-    if (resources.length == 1 &&
-        resource != OzoneManagerLock.Resource.BUCKET_LOCK) {
-      return OzoneManagerLockUtil.generateResourceLockName(resource,
-          resources[0]);
-    } else if (resources.length == 2 &&
-        resource == OzoneManagerLock.Resource.BUCKET_LOCK) {
-      return OzoneManagerLockUtil.generateBucketLockName(resources[0],
-          resources[1]);
-    } else if (resources.length == 3 &&
-        resource == OzoneManagerLock.Resource.SNAPSHOT_LOCK) {
-      return OzoneManagerLockUtil.generateSnapshotLockName(resources[0],
-          resources[1], resources[2]);
-    } else if (resources.length == 3 &&
-        resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      return OzoneManagerLockUtil.generateKeyPathLockName(resources[0],
-          resources[1], resources[2]);
-    } else {
-      throw new IllegalArgumentException("acquire lock is supported on single" 
+
-          " resource for all locks except for resource bucket/snapshot");
-    }
-  }
-
-  protected String generateResourceHashCode(OzoneManagerLock.Resource resource,
-                                          String[] resourceName) {
-    String resourceLockName = generateResourceLockName(resource, resourceName);
-    int resourceHashCode = resourceLockName.hashCode();
-    return String.valueOf(resourceHashCode);
-  }
-
-  protected String generateResourceHashCode(OzoneManagerLock.Resource resource,
-                                            String resourceLockName) {
-    return String.valueOf(resourceLockName.hashCode());
-  }
-
-  protected int getReadHoldCount(OzoneManagerLock lock,
-                                 OzoneManagerLock.Resource resource,
-                                 String resourceLockName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      return lock.getReadHoldCount(
-          generateResourceHashCode(resource, resourceLockName));
-    } else {
-      return lock.getReadHoldCount(resourceLockName);
-    }
-  }
-
-  protected int getWriteHoldCount(OzoneManagerLock lock,
-                                 OzoneManagerLock.Resource resource,
-                                 String resourceLockName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      return lock.getWriteHoldCount(
-          generateResourceHashCode(resource, resourceLockName));
-    } else {
-      return lock.getWriteHoldCount(resourceLockName);
-    }
-  }
-
-  protected boolean isWriteLockedByCurrentThread(OzoneManagerLock lock,
-      OzoneManagerLock.Resource resource, String resourceLockName) {
-    if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
-      return lock.isWriteLockedByCurrentThread(
-          generateResourceHashCode(resource, resourceLockName));
-    } else {
-      return lock.isWriteLockedByCurrentThread(resourceLockName);
-    }
-  }
-
   /**
    * Class used to store locked resource info.
    */
   public static class ResourceInfo {
     private String[] lockName;
-    private OzoneManagerLock.Resource resource;
+    private Resource resource;
 
-    ResourceInfo(String[] resourceName, OzoneManagerLock.Resource resource) {
+    ResourceInfo(String[] resourceName, Resource resource) {
       this.lockName = resourceName;
       this.resource = resource;
     }
@@ -358,7 +233,7 @@ public class TestOzoneManagerLock {
       return lockName.clone();
     }
 
-    public OzoneManagerLock.Resource getResource() {
+    public Resource getResource() {
       return resource;
     }
   }
@@ -389,7 +264,7 @@ public class TestOzoneManagerLock {
   @Test
   public void acquireMultiUserLockAfterUserLock() {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+    lock.acquireWriteLock(Resource.USER_LOCK, "user3");
     try {
       lock.acquireMultiUserLock("user1", "user2");
       fail("acquireMultiUserLockAfterUserLock failed");
@@ -398,7 +273,7 @@ public class TestOzoneManagerLock {
           "[USER_LOCK] lock(s).";
       Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
     }
-    lock.releaseWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+    lock.releaseWriteLock(Resource.USER_LOCK, "user3");
   }
 
   @Test
@@ -406,7 +281,7 @@ public class TestOzoneManagerLock {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
     lock.acquireMultiUserLock("user1", "user2");
     try {
-      lock.acquireWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+      lock.acquireWriteLock(Resource.USER_LOCK, "user3");
       fail("acquireUserLockAfterMultiUserLock failed");
     } catch (RuntimeException ex) {
       String message = "cannot acquire USER_LOCK lock while holding " +
@@ -420,23 +295,23 @@ public class TestOzoneManagerLock {
   public void testLockResourceParallel() throws Exception {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
 
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource :
+        Resource.values()) {
       final String[] resourceName = generateResourceName(resource);
-      acquireWriteLock(lock, resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
 
       AtomicBoolean gotLock = new AtomicBoolean(false);
       new Thread(() -> {
-        acquireWriteLock(lock, resource, resourceName);
+        lock.acquireWriteLock(resource, resourceName);
         gotLock.set(true);
-        releaseWriteLock(lock, resource, resourceName);
+        lock.releaseWriteLock(resource, resourceName);
       }).start();
       // Let's give some time for the new thread to run
       Thread.sleep(100);
       // Since the new thread is trying to get lock on same resource,
       // it will wait.
       Assert.assertFalse(gotLock.get());
-      releaseWriteLock(lock, resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
       // Since we have released the lock, the new thread should have the lock
       // now.
       // Let's give some time for the new thread to run
@@ -473,68 +348,64 @@ public class TestOzoneManagerLock {
   @Test
   public void testLockHoldCount() {
     String[] resourceName;
-    String resourceLockName;
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource : Resource.values()) {
       // USER_LOCK, S3_SECRET_LOCK and PREFIX_LOCK disallow lock re-acquire by
       // the same thread.
-      if (resource != OzoneManagerLock.Resource.USER_LOCK &&
-          resource != OzoneManagerLock.Resource.S3_SECRET_LOCK &&
-          resource != OzoneManagerLock.Resource.PREFIX_LOCK) {
+      if (resource != Resource.USER_LOCK &&
+          resource != Resource.S3_SECRET_LOCK &&
+          resource != Resource.PREFIX_LOCK) {
         resourceName = generateResourceName(resource);
-        resourceLockName = generateResourceLockName(resource, resourceName);
-        testLockHoldCountUtil(resource, resourceName, resourceLockName);
+        testLockHoldCountUtil(resource, resourceName);
       }
     }
   }
 
-  private void testLockHoldCountUtil(OzoneManagerLock.Resource resource,
-                                     String[] resourceName,
-                                     String resourceLockName) {
+  private void testLockHoldCountUtil(Resource resource,
+                                     String[] resourceName) {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
 
-    assertEquals(0, getReadHoldCount(lock, resource, resourceLockName));
-    acquireReadLock(lock, resource, resourceName);
-    assertEquals(1, getReadHoldCount(lock, resource, resourceLockName));
+    assertEquals(0, lock.getReadHoldCount(resource, resourceName));
+    lock.acquireReadLock(resource, resourceName);
+    assertEquals(1, lock.getReadHoldCount(resource, resourceName));
 
-    acquireReadLock(lock, resource, resourceName);
-    assertEquals(2, getReadHoldCount(lock, resource, resourceLockName));
+    lock.acquireReadLock(resource, resourceName);
+    assertEquals(2, lock.getReadHoldCount(resource, resourceName));
 
-    releaseReadLock(lock, resource, resourceName);
-    assertEquals(1, getReadHoldCount(lock, resource, resourceLockName));
+    lock.releaseReadLock(resource, resourceName);
+    assertEquals(1, lock.getReadHoldCount(resource, resourceName));
 
-    releaseReadLock(lock, resource, resourceName);
-    assertEquals(0, getReadHoldCount(lock, resource, resourceLockName));
+    lock.releaseReadLock(resource, resourceName);
+    assertEquals(0, lock.getReadHoldCount(resource, resourceName));
 
     Assert.assertFalse(
-        isWriteLockedByCurrentThread(lock, resource, resourceLockName));
-    assertEquals(0, getWriteHoldCount(lock, resource, resourceLockName));
-    acquireWriteLock(lock, resource, resourceName);
+        lock.isWriteLockedByCurrentThread(resource, resourceName));
+    assertEquals(0, lock.getWriteHoldCount(resource, resourceName));
+    lock.acquireWriteLock(resource, resourceName);
     Assert.assertTrue(
-        isWriteLockedByCurrentThread(lock, resource, resourceLockName));
-    assertEquals(1, getWriteHoldCount(lock, resource, resourceLockName));
+        lock.isWriteLockedByCurrentThread(resource, resourceName));
+    assertEquals(1, lock.getWriteHoldCount(resource, resourceName));
 
-    acquireWriteLock(lock, resource, resourceName);
+    lock.acquireWriteLock(resource, resourceName);
     Assert.assertTrue(
-        isWriteLockedByCurrentThread(lock, resource, resourceLockName));
-    assertEquals(2, getWriteHoldCount(lock, resource, resourceLockName));
+        lock.isWriteLockedByCurrentThread(resource, resourceName));
+    assertEquals(2, lock.getWriteHoldCount(resource, resourceName));
 
-    releaseWriteLock(lock, resource, resourceName);
+    lock.releaseWriteLock(resource, resourceName);
     Assert.assertTrue(
-        isWriteLockedByCurrentThread(lock, resource, resourceLockName));
-    assertEquals(1, getWriteHoldCount(lock, resource, resourceLockName));
+        lock.isWriteLockedByCurrentThread(resource, resourceName));
+    assertEquals(1, lock.getWriteHoldCount(resource, resourceName));
 
-    releaseWriteLock(lock, resource, resourceName);
+    lock.releaseWriteLock(resource, resourceName);
     Assert.assertFalse(
-        isWriteLockedByCurrentThread(lock, resource, resourceLockName));
-    assertEquals(0, getWriteHoldCount(lock, resource, resourceLockName));
+        lock.isWriteLockedByCurrentThread(resource, resourceName));
+    assertEquals(0, lock.getWriteHoldCount(resource, resourceName));
   }
 
   @Test
   public void testLockConcurrentStats() throws InterruptedException {
     String[] resourceName;
-    for (OzoneManagerLock.Resource resource :
-        OzoneManagerLock.Resource.values()) {
+    for (Resource resource :
+        Resource.values()) {
       resourceName = generateResourceName(resource);
       testReadLockConcurrentStats(resource, resourceName, 10);
       testWriteLockConcurrentStats(resource, resourceName, 5);
@@ -543,7 +414,7 @@ public class TestOzoneManagerLock {
   }
 
 
-  public void testReadLockConcurrentStats(OzoneManagerLock.Resource resource,
+  public void testReadLockConcurrentStats(Resource resource,
                                           String[] resourceName,
                                           int threadCount)
       throws InterruptedException {
@@ -552,13 +423,13 @@ public class TestOzoneManagerLock {
 
     for (int i = 0; i < threads.length; i++) {
       threads[i] = new Thread(() -> {
-        acquireReadLock(lock, resource, resourceName);
+        lock.acquireReadLock(resource, resourceName);
         try {
           Thread.sleep(500);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
-        releaseReadLock(lock, resource, resourceName);
+        lock.releaseReadLock(resource, resourceName);
       });
       threads[i].start();
     }
@@ -567,20 +438,21 @@ public class TestOzoneManagerLock {
       t.join();
     }
 
-    String readHeldStat = lock.getReadLockHeldTimeMsStat();
+    String readHeldStat = lock.getOMLockMetrics().getReadLockHeldTimeMsStat();
     Assert.assertTrue(
         "Expected " + threadCount +
             " samples in readLockHeldTimeMsStat: " + readHeldStat,
         readHeldStat.contains("Samples = " + threadCount));
 
-    String readWaitingStat = lock.getReadLockWaitingTimeMsStat();
+    String readWaitingStat =
+        lock.getOMLockMetrics().getReadLockWaitingTimeMsStat();
     Assert.assertTrue(
         "Expected " + threadCount +
             " samples in readLockWaitingTimeMsStat: " + readWaitingStat,
         readWaitingStat.contains("Samples = " + threadCount));
   }
 
-  public void testWriteLockConcurrentStats(OzoneManagerLock.Resource resource,
+  public void testWriteLockConcurrentStats(Resource resource,
                                            String[] resourceName,
                                            int threadCount)
       throws InterruptedException {
@@ -589,13 +461,13 @@ public class TestOzoneManagerLock {
 
     for (int i = 0; i < threads.length; i++) {
       threads[i] = new Thread(() -> {
-        acquireWriteLock(lock, resource, resourceName);
+        lock.acquireWriteLock(resource, resourceName);
         try {
           Thread.sleep(100);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
-        releaseWriteLock(lock, resource, resourceName);
+        lock.releaseWriteLock(resource, resourceName);
       });
       threads[i].start();
     }
@@ -604,13 +476,14 @@ public class TestOzoneManagerLock {
       t.join();
     }
 
-    String writeHeldStat = lock.getWriteLockHeldTimeMsStat();
+    String writeHeldStat = 
lock.getOMLockMetrics().getWriteLockHeldTimeMsStat();
     Assert.assertTrue(
         "Expected " + threadCount +
             " samples in writeLockHeldTimeMsStat: " + writeHeldStat,
         writeHeldStat.contains("Samples = " + threadCount));
 
-    String writeWaitingStat = lock.getWriteLockWaitingTimeMsStat();
+    String writeWaitingStat =
+        lock.getOMLockMetrics().getWriteLockWaitingTimeMsStat();
     Assert.assertTrue(
         "Expected " + threadCount +
             " samples in writeLockWaitingTimeMsStat" + writeWaitingStat,
@@ -618,7 +491,7 @@ public class TestOzoneManagerLock {
   }
 
   public void testSyntheticReadWriteLockConcurrentStats(
-      OzoneManagerLock.Resource resource, String[] resourceName,
+      Resource resource, String[] resourceName,
       int readThreadCount, int writeThreadCount)
       throws InterruptedException {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
@@ -627,13 +500,13 @@ public class TestOzoneManagerLock {
 
     for (int i = 0; i < readThreads.length; i++) {
       readThreads[i] = new Thread(() -> {
-        acquireReadLock(lock, resource, resourceName);
+        lock.acquireReadLock(resource, resourceName);
         try {
           Thread.sleep(500);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
-        releaseReadLock(lock, resource, resourceName);
+        lock.releaseReadLock(resource, resourceName);
       });
       readThreads[i].setName("ReadLockThread-" + i);
       readThreads[i].start();
@@ -641,13 +514,13 @@ public class TestOzoneManagerLock {
 
     for (int i = 0; i < writeThreads.length; i++) {
       writeThreads[i] = new Thread(() -> {
-        acquireWriteLock(lock, resource, resourceName);
+        lock.acquireWriteLock(resource, resourceName);
         try {
           Thread.sleep(100);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
-        releaseWriteLock(lock, resource, resourceName);
+        lock.releaseWriteLock(resource, resourceName);
       });
       writeThreads[i].setName("WriteLockThread-" + i);
       writeThreads[i].start();
@@ -661,25 +534,27 @@ public class TestOzoneManagerLock {
       w.join();
     }
 
-    String readHeldStat = lock.getReadLockHeldTimeMsStat();
+    String readHeldStat = lock.getOMLockMetrics().getReadLockHeldTimeMsStat();
     Assert.assertTrue(
         "Expected " + readThreadCount +
             " samples in readLockHeldTimeMsStat: " + readHeldStat,
         readHeldStat.contains("Samples = " + readThreadCount));
 
-    String readWaitingStat = lock.getReadLockWaitingTimeMsStat();
+    String readWaitingStat =
+        lock.getOMLockMetrics().getReadLockWaitingTimeMsStat();
     Assert.assertTrue(
         "Expected " + readThreadCount +
             " samples in readLockWaitingTimeMsStat: " + readWaitingStat,
         readWaitingStat.contains("Samples = " + readThreadCount));
 
-    String writeHeldStat = lock.getWriteLockHeldTimeMsStat();
+    String writeHeldStat = 
lock.getOMLockMetrics().getWriteLockHeldTimeMsStat();
     Assert.assertTrue(
         "Expected " + writeThreadCount +
             " samples in writeLockHeldTimeMsStat: " + writeHeldStat,
         writeHeldStat.contains("Samples = " + writeThreadCount));
 
-    String writeWaitingStat = lock.getWriteLockWaitingTimeMsStat();
+    String writeWaitingStat =
+        lock.getOMLockMetrics().getWriteLockWaitingTimeMsStat();
     Assert.assertTrue(
         "Expected " + writeThreadCount +
             " samples in writeLockWaitingTimeMsStat" + writeWaitingStat,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index bcd099e961..086f495612 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -76,11 +76,16 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public List<OmVolumeArgs> listVolumes(String userName,
       String prefix, String startKey, int maxKeys) throws IOException {
-    metadataManager.getLock().acquireReadLock(USER_LOCK, userName);
+    boolean acquired = false;
+    if (userName != null) {
+      acquired = metadataManager.getLock().acquireReadLock(USER_LOCK, 
userName);
+    }
     try {
       return metadataManager.listVolumes(userName, prefix, startKey, maxKeys);
     } finally {
-      metadataManager.getLock().releaseReadLock(USER_LOCK, userName);
+      if (acquired) {
+        metadataManager.getLock().releaseReadLock(USER_LOCK, userName);
+      }
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java
deleted file mode 100644
index 1a462c0a7a..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om.hashcodegenerator;
-
-/**
- * Interface for hash code generation implementations.
- */
-public interface OMHashCodeGenerator {
-
-  /**
-   * Returns the hash code of the resourceName passed.
-   *
-   * @param resourceName
-   * @return hash code of resourceName
-   */
-  long getHashCode(String resourceName);
-
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java
deleted file mode 100644
index 3560b05518..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om.hashcodegenerator;
-
-/**
- * Implementation of OMHashCodeGenerator interface. This implementation uses 
the
- * String hashCode function.
- */
-public class StringOMHashCodeGeneratorImpl implements OMHashCodeGenerator {
-
-  @Override
-  public long getHashCode(String resourceName) {
-    return resourceName.hashCode();
-  }
-
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/package-info.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/package-info.java
deleted file mode 100644
index 86dee4c8d5..0000000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.hashcodegenerator;
-
-/**
- * This package contains classes and interfaces for OM Hash Code Generation.
- */
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
index abc08db49f..143e318ada 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.om.lock;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import 
org.apache.hadoop.ozone.om.hashcodegenerator.StringOMHashCodeGeneratorImpl;
-import org.apache.hadoop.ozone.om.hashcodegenerator.OMHashCodeGenerator;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 
 import java.io.IOException;
@@ -34,11 +32,6 @@ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.KEY_PATH
  */
 public class OBSKeyPathLockStrategy implements OzoneLockStrategy {
 
-  // TODO: need to make this pluggable and allow users to configure the
-  //  preferred hash code generation mechanism.
-  private OMHashCodeGenerator omHashCodeGenerator =
-      new StringOMHashCodeGeneratorImpl();
-
   @Override
   public boolean acquireWriteLock(OMMetadataManager omMetadataManager,
                                   String volumeName, String bucketName,
@@ -51,12 +44,8 @@ public class OBSKeyPathLockStrategy implements 
OzoneLockStrategy {
     Preconditions.checkArgument(acquiredLock,
         "BUCKET_LOCK should be acquired!");
 
-    String resourceName = omMetadataManager.getLock()
-        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
-    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
     acquiredLock = omMetadataManager.getLock()
-        .acquireWriteHashedLock(KEY_PATH_LOCK,
-            String.valueOf(resourceHashCode));
+        .acquireWriteLock(KEY_PATH_LOCK, volumeName, bucketName, keyName);
 
     return acquiredLock;
   }
@@ -65,12 +54,8 @@ public class OBSKeyPathLockStrategy implements 
OzoneLockStrategy {
   public void releaseWriteLock(OMMetadataManager omMetadataManager,
                                String volumeName, String bucketName,
                                String keyName) {
-    String resourceName = omMetadataManager.getLock()
-        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
-    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
-    omMetadataManager.getLock().releaseWriteHashedLock(KEY_PATH_LOCK,
-        String.valueOf(resourceHashCode));
-
+    omMetadataManager.getLock().releaseWriteLock(KEY_PATH_LOCK,
+        volumeName, bucketName, keyName);
     omMetadataManager.getLock()
         .releaseReadLock(BUCKET_LOCK, volumeName, bucketName);
   }
@@ -87,11 +72,8 @@ public class OBSKeyPathLockStrategy implements 
OzoneLockStrategy {
     Preconditions.checkArgument(acquiredLock,
         "BUCKET_LOCK should be acquired!");
 
-    String resourceName = omMetadataManager.getLock()
-        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
-    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
     acquiredLock = omMetadataManager.getLock()
-        .acquireReadHashedLock(KEY_PATH_LOCK, 
String.valueOf(resourceHashCode));
+        .acquireReadLock(KEY_PATH_LOCK, volumeName, bucketName, keyName);
 
     return acquiredLock;
   }
@@ -100,15 +82,9 @@ public class OBSKeyPathLockStrategy implements 
OzoneLockStrategy {
   public void releaseReadLock(OMMetadataManager omMetadataManager,
                               String volumeName, String bucketName,
                               String keyName) {
-    String resourceName = omMetadataManager.getLock()
-        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
-    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
     omMetadataManager.getLock()
-        .releaseReadHashedLock(KEY_PATH_LOCK, 
String.valueOf(resourceHashCode));
-
+        .releaseReadLock(KEY_PATH_LOCK, volumeName, bucketName, keyName);
     omMetadataManager.getLock()
         .releaseReadLock(BUCKET_LOCK, volumeName, bucketName);
-
-    return;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to