HDDS-855. Move OMMetadataManager from hadoop-ozone/ozone-manager to hadoop-ozone/common. Contributed by Ajay Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f994b526 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f994b526 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f994b526 Branch: refs/heads/HDDS-4 Commit: f994b526a03738fea95583a9fccbac709e8ce47f Parents: a41b648 Author: Xiaoyu Yao <[email protected]> Authored: Tue Nov 20 20:31:07 2018 -0800 Committer: Xiaoyu Yao <[email protected]> Committed: Tue Nov 20 20:31:07 2018 -0800 ---------------------------------------------------------------------- .../hadoop/ozone/om/OMMetadataManager.java | 245 +++++++++++++++++++ .../hadoop/ozone/om/OzoneManagerLock.java | 222 +++++++++++++++++ .../hadoop/ozone/om/OMMetadataManager.java | 245 ------------------- .../hadoop/ozone/om/OzoneManagerLock.java | 222 ----------------- 4 files changed, 467 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f994b526/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java new file mode 100644 index 0000000..5f490ec --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.utils.db.DBStore; +import org.apache.hadoop.utils.db.Table; + +import java.io.IOException; +import java.util.List; + +/** + * OM metadata manager interface. + */ +public interface OMMetadataManager { + /** + * Start metadata manager. + */ + void start(); + + /** + * Stop metadata manager. + */ + void stop() throws Exception; + + /** + * Get metadata store. + * + * @return metadata store. + */ + @VisibleForTesting + DBStore getStore(); + + /** + * Returns the OzoneManagerLock used on Metadata DB. + * + * @return OzoneManagerLock + */ + OzoneManagerLock getLock(); + + /** + * Given a volume return the corresponding DB key. + * + * @param volume - Volume name + */ + byte[] getVolumeKey(String volume); + + /** + * Given a user return the corresponding DB key. + * + * @param user - User name + */ + byte[] getUserKey(String user); + + /** + * Given a volume and bucket, return the corresponding DB key. + * + * @param volume - User name + * @param bucket - Bucket name + */ + byte[] getBucketKey(String volume, String bucket); + + /** + * Given a volume, bucket and a key, return the corresponding DB key. + * + * @param volume - volume name + * @param bucket - bucket name + * @param key - key name + * @return bytes of DB key. + */ + byte[] getOzoneKeyBytes(String volume, String bucket, String key); + + /** + * Returns the DB key name of a open key in OM metadata store. Should be + * #open# prefix followed by actual key name. + * + * @param volume - volume name + * @param bucket - bucket name + * @param key - key name + * @param id - the id for this open + * @return bytes of DB key. + */ + byte[] getOpenKeyBytes(String volume, String bucket, String key, long id); + + /** + * Given a volume, check if it is empty, i.e there are no buckets inside it. + * + * @param volume - Volume name + */ + boolean isVolumeEmpty(String volume) throws IOException; + + /** + * Given a volume/bucket, check if it is empty, i.e there are no keys inside + * it. + * + * @param volume - Volume name + * @param bucket - Bucket name + * @return true if the bucket is empty + */ + boolean isBucketEmpty(String volume, String bucket) throws IOException; + + /** + * Returns a list of buckets represented by {@link OmBucketInfo} in the given + * volume. + * + * @param volumeName the name of the volume. This argument is required, this + * method returns buckets in this given volume. + * @param startBucket the start bucket name. Only the buckets whose name is + * after this value will be included in the result. This key is excluded from + * the result. + * @param bucketPrefix bucket name prefix. Only the buckets whose name has + * this prefix will be included in the result. + * @param maxNumOfBuckets the maximum number of buckets to return. It ensures + * the size of the result will not exceed this limit. + * @return a list of buckets. + * @throws IOException + */ + List<OmBucketInfo> listBuckets(String volumeName, String startBucket, + String bucketPrefix, int maxNumOfBuckets) + throws IOException; + + /** + * Returns a list of keys represented by {@link OmKeyInfo} in the given + * bucket. + * + * @param volumeName the name of the volume. + * @param bucketName the name of the bucket. + * @param startKey the start key name, only the keys whose name is after this + * value will be included in the result. This key is excluded from the + * result. + * @param keyPrefix key name prefix, only the keys whose name has this prefix + * will be included in the result. + * @param maxKeys the maximum number of keys to return. It ensures the size of + * the result will not exceed this limit. + * @return a list of keys. + * @throws IOException + */ + List<OmKeyInfo> listKeys(String volumeName, + String bucketName, String startKey, String keyPrefix, int maxKeys) + throws IOException; + + /** + * Returns a list of volumes owned by a given user; if user is null, returns + * all volumes. + * + * @param userName volume owner + * @param prefix the volume prefix used to filter the listing result. + * @param startKey the start volume name determines where to start listing + * from, this key is excluded from the result. + * @param maxKeys the maximum number of volumes to return. + * @return a list of {@link OmVolumeArgs} + * @throws IOException + */ + List<OmVolumeArgs> listVolumes(String userName, String prefix, + String startKey, int maxKeys) throws IOException; + + /** + * Returns a list of pending deletion key info that ups to the given count. + * Each entry is a {@link BlockGroup}, which contains the info about the key + * name and all its associated block IDs. A pending deletion key is stored + * with #deleting# prefix in OM DB. + * + * @param count max number of keys to return. + * @return a list of {@link BlockGroup} represent keys and blocks. + * @throws IOException + */ + List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; + + /** + * Returns a list of all still open key info. Which contains the info about + * the key name and all its associated block IDs. A pending open key has + * prefix #open# in OM DB. + * + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List<BlockGroup> getExpiredOpenKeys() throws IOException; + + /** + * Returns the user Table. + * + * @return UserTable. + */ + Table getUserTable(); + + /** + * Returns the Volume Table. + * + * @return VolumeTable. + */ + Table getVolumeTable(); + + /** + * Returns the BucketTable. + * + * @return BucketTable. + */ + Table getBucketTable(); + + /** + * Returns the KeyTable. + * + * @return KeyTable. + */ + Table getKeyTable(); + + /** + * Get Deleted Table. + * + * @return Deleted Table. + */ + Table getDeletedTable(); + + /** + * Gets the OpenKeyTable. + * + * @return Table. + */ + Table getOpenKeyTable(); + + /** + * Gets the S3Bucket to Ozone Volume/bucket mapping table. + * + * @return Table. + */ + Table getS3Table(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f994b526/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java new file mode 100644 index 0000000..c5ce9e2 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.lock.LockManager; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX; + +/** + * Provides different locks to handle concurrency in OzoneMaster. + * We also maintain lock hierarchy, based on the weight. + * + * <table> + * <caption></caption> + * <tr> + * <td><b> WEIGHT </b></td> <td><b> LOCK </b></td> + * </tr> + * <tr> + * <td> 0 </td> <td> User Lock </td> + * </tr> + * <tr> + * <td> 1 </td> <td> Volume Lock </td> + * </tr> + * <tr> + * <td> 2 </td> <td> Bucket Lock </td> + * </tr> + * </table> + * + * One cannot obtain a lower weight lock while holding a lock with higher + * weight. The other way around is possible. <br> + * <br> + * <p> + * For example: + * <br> + * {@literal ->} acquireVolumeLock (will work)<br> + * {@literal +->} acquireBucketLock (will work)<br> + * {@literal +-->} acquireUserLock (will throw Exception)<br> + * </p> + * <br> + * To acquire a user lock you should not hold any Volume/Bucket lock. Similarly + * to acquire a Volume lock you should not hold any Bucket lock. + */ +public final class OzoneManagerLock { + + private static final String VOLUME_LOCK = "volumeLock"; + private static final String BUCKET_LOCK = "bucketLock"; + private static final String S3_BUCKET_LOCK = "s3BucketLock"; + + private final LockManager<String> manager; + + // To maintain locks held by current thread. + private final ThreadLocal<Map<String, AtomicInteger>> myLocks = + ThreadLocal.withInitial( + () -> ImmutableMap.of( + VOLUME_LOCK, new AtomicInteger(0), + BUCKET_LOCK, new AtomicInteger(0), + S3_BUCKET_LOCK, new AtomicInteger(0) + ) + ); + + /** + * Creates new OzoneManagerLock instance. + * @param conf Configuration object + */ + public OzoneManagerLock(Configuration conf) { + manager = new LockManager<>(conf); + } + + /** + * Acquires user lock on the given resource. + * + * <p>If the lock is not available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until the + * lock has been acquired. + * + * @param user User on which the lock has to be acquired + */ + public void acquireUserLock(String user) { + // Calling thread should not hold any volume or bucket lock. + if (hasAnyVolumeLock() || hasAnyBucketLock() || hasAnyS3Lock()) { + throw new RuntimeException( + "Thread '" + Thread.currentThread().getName() + + "' cannot acquire user lock" + + " while holding volume, bucket or S3 bucket lock(s)."); + } + manager.lock(OM_USER_PREFIX + user); + } + + /** + * Releases the user lock on given resource. + */ + public void releaseUserLock(String user) { + manager.unlock(OM_USER_PREFIX + user); + } + + /** + * Acquires volume lock on the given resource. + * + * <p>If the lock is not available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until the + * lock has been acquired. + * + * @param volume Volume on which the lock has to be acquired + */ + public void acquireVolumeLock(String volume) { + // Calling thread should not hold any bucket lock. + // You can take an Volume while holding S3 bucket lock, since + // semantically an S3 bucket maps to the ozone volume. So we check here + // only if ozone bucket lock is taken. + if (hasAnyBucketLock()) { + throw new RuntimeException( + "Thread '" + Thread.currentThread().getName() + + "' cannot acquire volume lock while holding bucket lock(s)."); + } + manager.lock(OM_KEY_PREFIX + volume); + myLocks.get().get(VOLUME_LOCK).incrementAndGet(); + } + + /** + * Releases the volume lock on given resource. + */ + public void releaseVolumeLock(String volume) { + manager.unlock(OM_KEY_PREFIX + volume); + myLocks.get().get(VOLUME_LOCK).decrementAndGet(); + } + + /** + * Acquires S3 Bucket lock on the given resource. + * + * <p>If the lock is not available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until the lock has + * been acquired. + * + * @param s3BucketName S3Bucket Name on which the lock has to be acquired + */ + public void acquireS3Lock(String s3BucketName) { + // Calling thread should not hold any bucket lock. + // You can take an Volume while holding S3 bucket lock, since + // semantically an S3 bucket maps to the ozone volume. So we check here + // only if ozone bucket lock is taken. + if (hasAnyBucketLock()) { + throw new RuntimeException( + "Thread '" + Thread.currentThread().getName() + + "' cannot acquire S3 bucket lock while holding Ozone bucket " + + "lock(s)."); + } + manager.lock(OM_S3_PREFIX + s3BucketName); + myLocks.get().get(S3_BUCKET_LOCK).incrementAndGet(); + } + + /** + * Releases the volume lock on given resource. + */ + public void releaseS3Lock(String s3BucketName) { + manager.unlock(OM_S3_PREFIX + s3BucketName); + myLocks.get().get(S3_BUCKET_LOCK).decrementAndGet(); + } + + /** + * Acquires bucket lock on the given resource. + * + * <p>If the lock is not available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until the + * lock has been acquired. + * + * @param bucket Bucket on which the lock has to be acquired + */ + public void acquireBucketLock(String volume, String bucket) { + manager.lock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket); + myLocks.get().get(BUCKET_LOCK).incrementAndGet(); + } + + /** + * Releases the bucket lock on given resource. + */ + public void releaseBucketLock(String volume, String bucket) { + manager.unlock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket); + myLocks.get().get(BUCKET_LOCK).decrementAndGet(); + } + + /** + * Returns true if the current thread holds any volume lock. + * @return true if current thread holds volume lock, else false + */ + private boolean hasAnyVolumeLock() { + return myLocks.get().get(VOLUME_LOCK).get() != 0; + } + + /** + * Returns true if the current thread holds any bucket lock. + * @return true if current thread holds bucket lock, else false + */ + private boolean hasAnyBucketLock() { + return myLocks.get().get(BUCKET_LOCK).get() != 0; + } + + private boolean hasAnyS3Lock() { + return myLocks.get().get(S3_BUCKET_LOCK).get() != 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f994b526/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java deleted file mode 100644 index 5f490ec..0000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.om; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; -import org.apache.hadoop.utils.db.DBStore; -import org.apache.hadoop.utils.db.Table; - -import java.io.IOException; -import java.util.List; - -/** - * OM metadata manager interface. - */ -public interface OMMetadataManager { - /** - * Start metadata manager. - */ - void start(); - - /** - * Stop metadata manager. - */ - void stop() throws Exception; - - /** - * Get metadata store. - * - * @return metadata store. - */ - @VisibleForTesting - DBStore getStore(); - - /** - * Returns the OzoneManagerLock used on Metadata DB. - * - * @return OzoneManagerLock - */ - OzoneManagerLock getLock(); - - /** - * Given a volume return the corresponding DB key. - * - * @param volume - Volume name - */ - byte[] getVolumeKey(String volume); - - /** - * Given a user return the corresponding DB key. - * - * @param user - User name - */ - byte[] getUserKey(String user); - - /** - * Given a volume and bucket, return the corresponding DB key. - * - * @param volume - User name - * @param bucket - Bucket name - */ - byte[] getBucketKey(String volume, String bucket); - - /** - * Given a volume, bucket and a key, return the corresponding DB key. - * - * @param volume - volume name - * @param bucket - bucket name - * @param key - key name - * @return bytes of DB key. - */ - byte[] getOzoneKeyBytes(String volume, String bucket, String key); - - /** - * Returns the DB key name of a open key in OM metadata store. Should be - * #open# prefix followed by actual key name. - * - * @param volume - volume name - * @param bucket - bucket name - * @param key - key name - * @param id - the id for this open - * @return bytes of DB key. - */ - byte[] getOpenKeyBytes(String volume, String bucket, String key, long id); - - /** - * Given a volume, check if it is empty, i.e there are no buckets inside it. - * - * @param volume - Volume name - */ - boolean isVolumeEmpty(String volume) throws IOException; - - /** - * Given a volume/bucket, check if it is empty, i.e there are no keys inside - * it. - * - * @param volume - Volume name - * @param bucket - Bucket name - * @return true if the bucket is empty - */ - boolean isBucketEmpty(String volume, String bucket) throws IOException; - - /** - * Returns a list of buckets represented by {@link OmBucketInfo} in the given - * volume. - * - * @param volumeName the name of the volume. This argument is required, this - * method returns buckets in this given volume. - * @param startBucket the start bucket name. Only the buckets whose name is - * after this value will be included in the result. This key is excluded from - * the result. - * @param bucketPrefix bucket name prefix. Only the buckets whose name has - * this prefix will be included in the result. - * @param maxNumOfBuckets the maximum number of buckets to return. It ensures - * the size of the result will not exceed this limit. - * @return a list of buckets. - * @throws IOException - */ - List<OmBucketInfo> listBuckets(String volumeName, String startBucket, - String bucketPrefix, int maxNumOfBuckets) - throws IOException; - - /** - * Returns a list of keys represented by {@link OmKeyInfo} in the given - * bucket. - * - * @param volumeName the name of the volume. - * @param bucketName the name of the bucket. - * @param startKey the start key name, only the keys whose name is after this - * value will be included in the result. This key is excluded from the - * result. - * @param keyPrefix key name prefix, only the keys whose name has this prefix - * will be included in the result. - * @param maxKeys the maximum number of keys to return. It ensures the size of - * the result will not exceed this limit. - * @return a list of keys. - * @throws IOException - */ - List<OmKeyInfo> listKeys(String volumeName, - String bucketName, String startKey, String keyPrefix, int maxKeys) - throws IOException; - - /** - * Returns a list of volumes owned by a given user; if user is null, returns - * all volumes. - * - * @param userName volume owner - * @param prefix the volume prefix used to filter the listing result. - * @param startKey the start volume name determines where to start listing - * from, this key is excluded from the result. - * @param maxKeys the maximum number of volumes to return. - * @return a list of {@link OmVolumeArgs} - * @throws IOException - */ - List<OmVolumeArgs> listVolumes(String userName, String prefix, - String startKey, int maxKeys) throws IOException; - - /** - * Returns a list of pending deletion key info that ups to the given count. - * Each entry is a {@link BlockGroup}, which contains the info about the key - * name and all its associated block IDs. A pending deletion key is stored - * with #deleting# prefix in OM DB. - * - * @param count max number of keys to return. - * @return a list of {@link BlockGroup} represent keys and blocks. - * @throws IOException - */ - List<BlockGroup> getPendingDeletionKeys(int count) throws IOException; - - /** - * Returns a list of all still open key info. Which contains the info about - * the key name and all its associated block IDs. A pending open key has - * prefix #open# in OM DB. - * - * @return a list of {@link BlockGroup} representing keys and blocks. - * @throws IOException - */ - List<BlockGroup> getExpiredOpenKeys() throws IOException; - - /** - * Returns the user Table. - * - * @return UserTable. - */ - Table getUserTable(); - - /** - * Returns the Volume Table. - * - * @return VolumeTable. - */ - Table getVolumeTable(); - - /** - * Returns the BucketTable. - * - * @return BucketTable. - */ - Table getBucketTable(); - - /** - * Returns the KeyTable. - * - * @return KeyTable. - */ - Table getKeyTable(); - - /** - * Get Deleted Table. - * - * @return Deleted Table. - */ - Table getDeletedTable(); - - /** - * Gets the OpenKeyTable. - * - * @return Table. - */ - Table getOpenKeyTable(); - - /** - * Gets the S3Bucket to Ozone Volume/bucket mapping table. - * - * @return Table. - */ - Table getS3Table(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f994b526/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java deleted file mode 100644 index c5ce9e2..0000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.om; - -import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.lock.LockManager; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX; - -/** - * Provides different locks to handle concurrency in OzoneMaster. - * We also maintain lock hierarchy, based on the weight. - * - * <table> - * <caption></caption> - * <tr> - * <td><b> WEIGHT </b></td> <td><b> LOCK </b></td> - * </tr> - * <tr> - * <td> 0 </td> <td> User Lock </td> - * </tr> - * <tr> - * <td> 1 </td> <td> Volume Lock </td> - * </tr> - * <tr> - * <td> 2 </td> <td> Bucket Lock </td> - * </tr> - * </table> - * - * One cannot obtain a lower weight lock while holding a lock with higher - * weight. The other way around is possible. <br> - * <br> - * <p> - * For example: - * <br> - * {@literal ->} acquireVolumeLock (will work)<br> - * {@literal +->} acquireBucketLock (will work)<br> - * {@literal +-->} acquireUserLock (will throw Exception)<br> - * </p> - * <br> - * To acquire a user lock you should not hold any Volume/Bucket lock. Similarly - * to acquire a Volume lock you should not hold any Bucket lock. - */ -public final class OzoneManagerLock { - - private static final String VOLUME_LOCK = "volumeLock"; - private static final String BUCKET_LOCK = "bucketLock"; - private static final String S3_BUCKET_LOCK = "s3BucketLock"; - - private final LockManager<String> manager; - - // To maintain locks held by current thread. - private final ThreadLocal<Map<String, AtomicInteger>> myLocks = - ThreadLocal.withInitial( - () -> ImmutableMap.of( - VOLUME_LOCK, new AtomicInteger(0), - BUCKET_LOCK, new AtomicInteger(0), - S3_BUCKET_LOCK, new AtomicInteger(0) - ) - ); - - /** - * Creates new OzoneManagerLock instance. - * @param conf Configuration object - */ - public OzoneManagerLock(Configuration conf) { - manager = new LockManager<>(conf); - } - - /** - * Acquires user lock on the given resource. - * - * <p>If the lock is not available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until the - * lock has been acquired. - * - * @param user User on which the lock has to be acquired - */ - public void acquireUserLock(String user) { - // Calling thread should not hold any volume or bucket lock. - if (hasAnyVolumeLock() || hasAnyBucketLock() || hasAnyS3Lock()) { - throw new RuntimeException( - "Thread '" + Thread.currentThread().getName() + - "' cannot acquire user lock" + - " while holding volume, bucket or S3 bucket lock(s)."); - } - manager.lock(OM_USER_PREFIX + user); - } - - /** - * Releases the user lock on given resource. - */ - public void releaseUserLock(String user) { - manager.unlock(OM_USER_PREFIX + user); - } - - /** - * Acquires volume lock on the given resource. - * - * <p>If the lock is not available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until the - * lock has been acquired. - * - * @param volume Volume on which the lock has to be acquired - */ - public void acquireVolumeLock(String volume) { - // Calling thread should not hold any bucket lock. - // You can take an Volume while holding S3 bucket lock, since - // semantically an S3 bucket maps to the ozone volume. So we check here - // only if ozone bucket lock is taken. - if (hasAnyBucketLock()) { - throw new RuntimeException( - "Thread '" + Thread.currentThread().getName() + - "' cannot acquire volume lock while holding bucket lock(s)."); - } - manager.lock(OM_KEY_PREFIX + volume); - myLocks.get().get(VOLUME_LOCK).incrementAndGet(); - } - - /** - * Releases the volume lock on given resource. - */ - public void releaseVolumeLock(String volume) { - manager.unlock(OM_KEY_PREFIX + volume); - myLocks.get().get(VOLUME_LOCK).decrementAndGet(); - } - - /** - * Acquires S3 Bucket lock on the given resource. - * - * <p>If the lock is not available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until the lock has - * been acquired. - * - * @param s3BucketName S3Bucket Name on which the lock has to be acquired - */ - public void acquireS3Lock(String s3BucketName) { - // Calling thread should not hold any bucket lock. - // You can take an Volume while holding S3 bucket lock, since - // semantically an S3 bucket maps to the ozone volume. So we check here - // only if ozone bucket lock is taken. - if (hasAnyBucketLock()) { - throw new RuntimeException( - "Thread '" + Thread.currentThread().getName() + - "' cannot acquire S3 bucket lock while holding Ozone bucket " + - "lock(s)."); - } - manager.lock(OM_S3_PREFIX + s3BucketName); - myLocks.get().get(S3_BUCKET_LOCK).incrementAndGet(); - } - - /** - * Releases the volume lock on given resource. - */ - public void releaseS3Lock(String s3BucketName) { - manager.unlock(OM_S3_PREFIX + s3BucketName); - myLocks.get().get(S3_BUCKET_LOCK).decrementAndGet(); - } - - /** - * Acquires bucket lock on the given resource. - * - * <p>If the lock is not available then the current thread becomes - * disabled for thread scheduling purposes and lies dormant until the - * lock has been acquired. - * - * @param bucket Bucket on which the lock has to be acquired - */ - public void acquireBucketLock(String volume, String bucket) { - manager.lock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket); - myLocks.get().get(BUCKET_LOCK).incrementAndGet(); - } - - /** - * Releases the bucket lock on given resource. - */ - public void releaseBucketLock(String volume, String bucket) { - manager.unlock(OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket); - myLocks.get().get(BUCKET_LOCK).decrementAndGet(); - } - - /** - * Returns true if the current thread holds any volume lock. - * @return true if current thread holds volume lock, else false - */ - private boolean hasAnyVolumeLock() { - return myLocks.get().get(VOLUME_LOCK).get() != 0; - } - - /** - * Returns true if the current thread holds any bucket lock. - * @return true if current thread holds bucket lock, else false - */ - private boolean hasAnyBucketLock() { - return myLocks.get().get(BUCKET_LOCK).get() != 0; - } - - private boolean hasAnyS3Lock() { - return myLocks.get().get(S3_BUCKET_LOCK).get() != 0; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
